1 /*
2  * Copyright (c) 2011 Intel Corporation.  All rights reserved.
3  *
4  * This software is available to you under the OpenIB.org BSD license
5  * below:
6  *
7  *     Redistribution and use in source and binary forms, with or
8  *     without modification, are permitted provided that the following
9  *     conditions are met:
10  *
11  *      - Redistributions of source code must retain the above
12  *        copyright notice, this list of conditions and the following
13  *        disclaimer.
14  *
15  *      - Redistributions in binary form must reproduce the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer in the documentation and/or other materials
18  *        provided with the distribution.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
23  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
24  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
26  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE.
28  */
29 
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <strings.h>
34 #include <errno.h>
35 #include <getopt.h>
36 #include <arpa/inet.h>
37 #include <sys/mman.h>
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <sys/time.h>
41 #include <sys/stat.h>
42 #include <fcntl.h>
43 #include <netdb.h>
44 #include <unistd.h>
45 
46 #include <rdma/rsocket.h>
47 
48 union rsocket_address {
49 	struct sockaddr		sa;
50 	struct sockaddr_in	sin;
51 	struct sockaddr_in6	sin6;
52 	struct sockaddr_storage storage;
53 };
54 
55 static const char *port = "7427";
56 static char *dst_addr;
57 static char *dst_file;
58 static char *src_file;
59 static struct timeval start, end;
60 //static void buf[1024 * 1024];
61 static uint64_t bytes;
62 static int fd;
63 static void *file_addr;
64 
65 enum {
66 	CMD_NOOP,
67 	CMD_OPEN,
68 	CMD_CLOSE,
69 	CMD_WRITE,
70 	CMD_RESP = 0x80,
71 };
72 
73 /* TODO: handle byte swapping */
74 struct msg_hdr {
75 	uint8_t  version;
76 	uint8_t  command;
77 	uint16_t len;
78 	uint32_t data;
79 	uint64_t id;
80 };
81 
82 struct msg_open {
83 	struct msg_hdr hdr;
84 	char path[0];
85 };
86 
87 struct msg_write {
88 	struct msg_hdr hdr;
89 	uint64_t size;
90 };
91 
92 static void show_perf(void)
93 {
94 	float usec;
95 
96 	usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
97 
98 	printf("%lld bytes in %.2f seconds = %.2f Gb/sec\n",
99 	       (long long) bytes, usec / 1000000., (bytes * 8) / (1000. * usec));
100 }
101 
102 static char *_ntop(union rsocket_address *rsa)
103 {
104 	static char addr[32];
105 
106 	switch (rsa->sa.sa_family) {
107 	case AF_INET:
108 		inet_ntop(AF_INET, &rsa->sin.sin_addr, addr, sizeof addr);
109 		break;
110 	case AF_INET6:
111 		inet_ntop(AF_INET6, &rsa->sin6.sin6_addr, addr, sizeof addr);
112 		break;
113 	default:
114 		addr[0] = '\0';
115 		break;
116 	}
117 
118 	return addr;
119 }
120 
121 static size_t _recv(int rs, char *msg, size_t len)
122 {
123 	size_t ret, offset;
124 
125 	for (offset = 0; offset < len; offset += ret) {
126 		ret = rrecv(rs, msg + offset, len - offset, 0);
127 		if (ret <= 0)
128 			return ret;
129 	}
130 
131 	return len;
132 }
133 
134 static int msg_recv_hdr(int rs, struct msg_hdr *hdr)
135 {
136 	int ret;
137 
138 	ret = _recv(rs, (char *) hdr, sizeof *hdr);
139 	if (ret != sizeof *hdr)
140 		return -1;
141 
142 	if (hdr->version || hdr->len < sizeof *hdr) {
143 		printf("invalid version %d or length %d\n",
144 		       hdr->version, hdr->len);
145 		return -1;
146 	}
147 
148 	return sizeof *hdr;
149 }
150 
151 static int msg_get_resp(int rs, struct msg_hdr *msg, uint8_t cmd)
152 {
153 	int ret;
154 
155 	ret = msg_recv_hdr(rs, msg);
156 	if (ret != sizeof *msg)
157 		return ret;
158 
159 	if ((msg->len != sizeof *msg) || (msg->command != (cmd | CMD_RESP))) {
160 		printf("invalid length %d or bad command response %x:%x\n",
161 		       msg->len, msg->command, cmd | CMD_RESP);
162 		return -1;
163 	}
164 
165 	return msg->data;
166 }
167 
168 static void msg_send_resp(int rs, struct msg_hdr *msg, uint32_t status)
169 {
170 	struct msg_hdr resp;
171 
172 	resp.version = 0;
173 	resp.command = msg->command | CMD_RESP;
174 	resp.len = sizeof resp;
175 	resp.data = status;
176 	resp.id = msg->id;
177 	rsend(rs, (char *) &resp, sizeof resp, 0);
178 }
179 
180 static int server_listen(void)
181 {
182 	struct addrinfo hints, *res;
183 	int ret, rs;
184 
185 	memset(&hints, 0, sizeof hints);
186 	hints.ai_flags = RAI_PASSIVE;
187  	ret = getaddrinfo(NULL, port, &hints, &res);
188 	if (ret) {
189 		printf("getaddrinfo failed: %s\n", gai_strerror(ret));
190 		return ret;
191 	}
192 
193 	rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
194 	if (rs < 0) {
195 		perror("rsocket failed\n");
196 		ret = rs;
197 		goto free;
198 	}
199 
200 	ret = 1;
201 	ret = rsetsockopt(rs, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof ret);
202 	if (ret) {
203 		perror("rsetsockopt failed");
204 		goto close;
205 	}
206 
207 	ret = rbind(rs, res->ai_addr, res->ai_addrlen);
208 	if (ret) {
209 		perror("rbind failed");
210 		goto close;
211 	}
212 
213 	ret = rlisten(rs, 1);
214 	if (ret) {
215 		perror("rlisten failed");
216 		goto close;
217 	}
218 
219 	ret = rs;
220 	goto free;
221 
222 close:
223 	rclose(rs);
224 free:
225 	freeaddrinfo(res);
226 	return ret;
227 }
228 
229 static int server_open(int rs, struct msg_hdr *msg)
230 {
231 	char *path = NULL;
232 	int ret, len;
233 
234 	printf("opening: ");
235 	fflush(NULL);
236 	if (file_addr || fd > 0) {
237 		printf("cannot open another file\n");
238 		ret = EBUSY;
239 		goto out;
240 	}
241 
242 	len = msg->len - sizeof *msg;
243 	path = malloc(len);
244 	if (!path) {
245 		printf("cannot allocate path name\n");
246 		ret = ENOMEM;
247 		goto out;
248 	}
249 
250 	ret = _recv(rs, path, len);
251 	if (ret != len) {
252 		printf("error receiving path\n");
253 		goto out;
254 	}
255 
256 	printf("%s, ", path);
257 	fflush(NULL);
258 	fd = open(path, O_RDWR | O_CREAT | O_TRUNC, msg->data);
259 	if (fd < 0) {
260 		printf("unable to open destination file\n");
261 		ret = errno;
262 	}
263 
264 	ret = 0;
265 out:
266 	if (path)
267 		free(path);
268 
269 	msg_send_resp(rs, msg, ret);
270 	return ret;
271 }
272 
273 static void server_close(int rs, struct msg_hdr *msg)
274 {
275 	printf("closing...");
276 	fflush(NULL);
277 	msg_send_resp(rs, msg, 0);
278 
279 	if (file_addr) {
280 		munmap(file_addr, bytes);
281 		file_addr = NULL;
282 	}
283 
284 	if (fd > 0) {
285 		close(fd);
286 		fd = 0;
287 	}
288 	printf("done\n");
289 }
290 
291 static int server_write(int rs, struct msg_hdr *msg)
292 {
293 	size_t len;
294 	int ret;
295 
296 	printf("transferring");
297 	fflush(NULL);
298 	if (fd <= 0) {
299 		printf("...file not opened\n");
300 		ret = EINVAL;
301 		goto out;
302 	}
303 
304 	if (msg->len != sizeof(struct msg_write)) {
305 		printf("...invalid message length %d\n", msg->len);
306 		ret = EINVAL;
307 		goto out;
308 	}
309 
310 	ret = _recv(rs, (char *) &bytes, sizeof bytes);
311 	if (ret != sizeof bytes)
312 		goto out;
313 
314 	ret = ftruncate(fd, bytes);
315 	if (ret)
316 		goto out;
317 
318 	file_addr = mmap(NULL, bytes, PROT_WRITE, MAP_SHARED, fd, 0);
319 	if (file_addr == (void *) -1) {
320 		printf("...error mapping file\n");
321 		ret = errno;
322 		goto out;
323 	}
324 
325 	printf("...%lld bytes...", (long long) bytes);
326 	fflush(NULL);
327 	len = _recv(rs, file_addr, bytes);
328 	if (len != bytes) {
329 		printf("...error receiving data\n");
330 		ret = (int) len;
331 	}
332 out:
333 	msg_send_resp(rs, msg, ret);
334 	return ret;
335 }
336 
337 static void server_process(int rs)
338 {
339 	struct msg_hdr msg;
340 	int ret;
341 
342 	do {
343 		ret = msg_recv_hdr(rs, &msg);
344 		if (ret != sizeof msg)
345 			break;
346 
347 		switch (msg.command) {
348 		case CMD_OPEN:
349 			ret = server_open(rs, &msg);
350 			break;
351 		case CMD_CLOSE:
352 			server_close(rs, &msg);
353 			ret = 0;
354 			break;
355 		case CMD_WRITE:
356 			ret = server_write(rs, &msg);
357 			break;
358 		default:
359 			msg_send_resp(rs, &msg, EINVAL);
360 			ret = -1;
361 			break;
362 		}
363 
364 	} while (!ret);
365 }
366 
367 static int server_run(void)
368 {
369 	int lrs, rs;
370 	union rsocket_address rsa;
371 	socklen_t len;
372 
373 	lrs = server_listen();
374 	if (lrs < 0)
375 		return lrs;
376 
377 	while (1) {
378 		len = sizeof rsa;
379 		printf("waiting for connection...");
380 		fflush(NULL);
381 		rs = raccept(lrs, &rsa.sa, &len);
382 
383 		printf("client: %s\n", _ntop(&rsa));
384 		server_process(rs);
385 
386 		rshutdown(rs, SHUT_RDWR);
387 		rclose(rs);
388 	}
389 	return 0;
390 }
391 
392 static int client_connect(void)
393 {
394 	struct addrinfo *res;
395 	int ret, rs;
396 
397  	ret = getaddrinfo(dst_addr, port, NULL, &res);
398 	if (ret) {
399 		printf("getaddrinfo failed: %s\n", gai_strerror(ret));
400 		return ret;
401 	}
402 
403 	rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
404 	if (rs < 0) {
405 		perror("rsocket failed\n");
406 		goto free;
407 	}
408 
409 	ret = rconnect(rs, res->ai_addr, res->ai_addrlen);
410 	if (ret) {
411 		perror("rconnect failed\n");
412 		rclose(rs);
413 		rs = ret;
414 	}
415 
416 free:
417 	freeaddrinfo(res);
418 	return rs;
419 }
420 
421 static int client_open(int rs)
422 {
423 	struct msg_open *msg;
424 	struct stat stats;
425 	uint32_t len;
426 	int ret;
427 
428 	printf("opening...");
429 	fflush(NULL);
430 	fd = open(src_file, O_RDONLY);
431 	if (fd < 0)
432 		return fd;
433 
434 	ret = fstat(fd, &stats);
435 	if (ret < 0)
436 		goto err1;
437 
438 	bytes = (uint64_t) stats.st_size;
439 	file_addr = mmap(NULL, bytes, PROT_READ, MAP_SHARED, fd, 0);
440 	if (file_addr == (void *) -1) {
441 		ret = errno;
442 		goto err1;
443 	}
444 
445 	len = (((uint32_t) strlen(dst_file)) + 8) & 0xFFFFFFF8;
446 	msg = calloc(1, sizeof(*msg) + len);
447 	if (!msg) {
448 		ret = -1;
449 		goto err2;
450 	}
451 
452 	msg->hdr.command = CMD_OPEN;
453 	msg->hdr.len = sizeof(*msg) + len;
454 	msg->hdr.data = (uint32_t) stats.st_mode;
455 	strcpy(msg->path, dst_file);
456 	ret = rsend(rs, msg, msg->hdr.len, 0);
457 	if (ret != msg->hdr.len)
458 		goto err3;
459 
460 	ret = msg_get_resp(rs, &msg->hdr, CMD_OPEN);
461 	if (ret)
462 		goto err3;
463 
464 	return 0;
465 
466 err3:
467 	free(msg);
468 err2:
469 	munmap(file_addr, bytes);
470 err1:
471 	close(fd);
472 	return ret;
473 }
474 
475 static int client_start_write(int rs)
476 {
477 	struct msg_write msg;
478 	int ret;
479 
480 	printf("transferring");
481 	fflush(NULL);
482 	memset(&msg, 0, sizeof msg);
483 	msg.hdr.command = CMD_WRITE;
484 	msg.hdr.len = sizeof(msg);
485 	msg.size = bytes;
486 
487 	ret = rsend(rs, &msg, sizeof msg, 0);
488 	if (ret != msg.hdr.len)
489 		return ret;
490 
491 	return 0;
492 }
493 
494 static int client_close(int rs)
495 {
496 	struct msg_hdr msg;
497 	int ret;
498 
499 	printf("closing...");
500 	fflush(NULL);
501 	memset(&msg, 0, sizeof msg);
502 	msg.command = CMD_CLOSE;
503 	msg.len = sizeof msg;
504 	ret = rsend(rs, (char *) &msg, msg.len, 0);
505 	if (ret != msg.len)
506 		goto out;
507 
508 	ret = msg_get_resp(rs, &msg, CMD_CLOSE);
509 	if (ret)
510 		goto out;
511 
512 	printf("done\n");
513 out:
514 	munmap(file_addr, bytes);
515 	close(fd);
516 	return ret;
517 }
518 
519 static int client_run(void)
520 {
521 	struct msg_hdr ack;
522 	int ret, rs;
523 	size_t len;
524 
525 	rs = client_connect();
526 	if (rs < 0)
527 		return rs;
528 
529 	ret = client_open(rs);
530 	if (ret)
531 		goto shutdown;
532 
533 	ret = client_start_write(rs);
534 	if (ret)
535 		goto close;
536 
537 	printf("...");
538 	fflush(NULL);
539 	gettimeofday(&start, NULL);
540 	len = rsend(rs, file_addr, bytes, 0);
541 	if (len == bytes)
542 		ret = msg_get_resp(rs, &ack, CMD_WRITE);
543 	else
544 		ret = (int) len;
545 
546 	gettimeofday(&end, NULL);
547 
548 close:
549 	client_close(rs);
550 shutdown:
551 	rshutdown(rs, SHUT_RDWR);
552 	rclose(rs);
553 	if (!ret)
554 		show_perf();
555 	return ret;
556 }
557 
558 static void show_usage(char *program)
559 {
560 	printf("usage 1: %s [options]\n", program);
561 	printf("\t     starts the server application\n");
562 	printf("\t[-p  port_number]\n");
563 	printf("usage 2: %s source server[:destination] [options]\n", program);
564 	printf("\t     source - file name and path\n");
565 	printf("\t     server - name or address\n");
566 	printf("\t     destination - file name and path\n");
567 	printf("\t[-p  port_number]\n");
568 	exit(1);
569 }
570 
571 static void server_opts(int argc, char **argv)
572 {
573 	int op;
574 
575 	while ((op = getopt(argc, argv, "p:")) != -1) {
576 		switch (op) {
577 		case 'p':
578 			port = optarg;
579 			break;
580 		default:
581 			show_usage(argv[0]);
582 		}
583 	}
584 }
585 
586 static void client_opts(int argc, char **argv)
587 {
588 	int op;
589 
590 	if (argc < 3)
591 		show_usage(argv[0]);
592 
593 	src_file = argv[1];
594 	dst_addr = argv[2];
595 	dst_file = strchr(dst_addr, ':');
596 	if (dst_file) {
597 		*dst_file = '\0';
598 		dst_file++;
599 	}
600 	if (!dst_file)
601 		dst_file = src_file;
602 
603 	while ((op = getopt(argc, argv, "p:")) != -1) {
604 		switch (op) {
605 		case 'p':
606 			port = optarg;
607 			break;
608 		default:
609 			show_usage(argv[0]);
610 		}
611 	}
612 
613 }
614 
615 int main(int argc, char **argv)
616 {
617 	int ret;
618 
619 	if (argc == 1 || argv[1][0] == '-') {
620 		server_opts(argc, argv);
621 		ret = server_run();
622 	} else {
623 		client_opts(argc, argv);
624 		ret = client_run();
625 	}
626 
627 	return ret;
628 }
629