1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt).  A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10 
11 #include <ctype.h>
12 #include <stdarg.h>
13 #include <stdint.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 
18 #include <nng/nng.h>
19 #include <nng/supplemental/util/options.h>
20 #include <nng/supplemental/util/platform.h>
21 
22 static void die(const char *, ...);
23 static int
no_open(nng_socket * arg)24 no_open(nng_socket *arg)
25 {
26 	(void) arg;
27 	die("Protocol not supported in this build!");
28 	return (NNG_ENOTSUP);
29 }
30 
31 typedef int (*open_func)(nng_socket *);
32 
33 static open_func open_server = no_open;
34 static open_func open_client = no_open;
35 
36 #if defined(NNG_HAVE_PAIR1)
37 #include <nng/protocol/pair1/pair.h>
38 #else
39 #define nng_pair1_open no_open
40 #endif
41 
42 #if defined(NNG_HAVE_PAIR0)
43 #include <nng/protocol/pair0/pair.h>
44 #else
45 #define nng_pair0_open no_open
46 #endif
47 
48 #if defined(NNG_HAVE_REQ0)
49 #include <nng/protocol/reqrep0/req.h>
50 #else
51 #define nng_req0_open no_open
52 #endif
53 
54 #if defined(NNG_HAVE_REP0)
55 #include <nng/protocol/reqrep0/rep.h>
56 #else
57 #define nng_rep0_open no_open
58 #endif
59 
60 #if defined(NNG_HAVE_BUS0)
61 #include <nng/protocol/bus0/bus.h>
62 #else
63 #define nng_bus0_open no_open
64 #endif
65 
66 #if defined(NNG_HAVE_PULL0)
67 #include <nng/protocol/pipeline0/pull.h>
68 #else
69 #define nng_pull0_open no_open
70 #endif
71 
72 #if defined(NNG_HAVE_PUSH0)
73 #include <nng/protocol/pipeline0/push.h>
74 #else
75 #define nng_push0_open no_open
76 #endif
77 
78 #if defined(NNG_HAVE_PUB0)
79 #include <nng/protocol/pubsub0/pub.h>
80 #else
81 #define nng_pub0_open no_open
82 #endif
83 
84 #if defined(NNG_HAVE_SUB0)
85 #include <nng/protocol/pubsub0/sub.h>
86 #else
87 #define nng_sub0_open no_open
88 #endif
89 
90 enum options {
91 	OPT_PAIR0 = 1,
92 	OPT_PAIR1,
93 	OPT_REQREP0,
94 	OPT_PUBSUB0,
95 	OPT_PIPELINE0,
96 	OPT_SURVEY0,
97 	OPT_BUS0,
98 	OPT_URL,
99 };
100 
101 // These are not universally supported by the variants yet.
102 static nng_optspec opts[] = {
103 	{ .o_name = "pair1", .o_val = OPT_PAIR1 },
104 	{ .o_name = "pair0", .o_val = OPT_PAIR0 },
105 	{ .o_name = "reqrep0", .o_val = OPT_REQREP0 },
106 	{ .o_name = "bus0", .o_val = OPT_BUS0 },
107 	{ .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
108 	{ .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
109 	{ .o_name = "url", .o_val = OPT_URL, .o_arg = true },
110 	{ .o_name = NULL, .o_val = 0 },
111 };
112 
113 static void latency_client(const char *, size_t, int);
114 static void latency_server(const char *, size_t, int);
115 static void throughput_client(const char *, size_t, int);
116 static void throughput_server(const char *, size_t, int);
117 static void do_remote_lat(int argc, char **argv);
118 static void do_local_lat(int argc, char **argv);
119 static void do_remote_thr(int argc, char **argv);
120 static void do_local_thr(int argc, char **argv);
121 static void do_inproc_thr(int argc, char **argv);
122 static void do_inproc_lat(int argc, char **argv);
123 static void die(const char *, ...);
124 
125 // perf implements the same performance tests found in the standard
126 // nanomsg & mangos performance tests.  As with mangos, the decision
127 // about which test to run is determined by the program name (ARGV[0}])
128 // that it is run under.
129 //
130 // Options are:
131 //
132 // - remote_lat - remote latency side (client, aka latency_client)
133 // - local_lat  - local latency side (server, aka latency_server)
134 // - local_thr  - local throughput side
135 // - remote_thr - remote throughput side
136 // - inproc_lat - inproc latency
137 // - inproc_thr - inproc throughput
138 //
139 
140 bool
matches(const char * arg,const char * name)141 matches(const char *arg, const char *name)
142 {
143 	const char *ptr = arg;
144 	const char *x;
145 
146 	while (((x = strchr(ptr, '/')) != NULL) ||
147 	    ((x = strchr(ptr, '\\')) != NULL) ||
148 	    ((x = strchr(ptr, ':')) != NULL)) {
149 		ptr = x + 1;
150 	}
151 	for (;;) {
152 		if (*name == '\0') {
153 			break;
154 		}
155 		if (tolower(*ptr) != *name) {
156 			return (false);
157 		}
158 		ptr++;
159 		name++;
160 	}
161 
162 	switch (*ptr) {
163 	case '\0':
164 		/* FALLTHROUGH*/
165 	case '.': // extension; ignore it.
166 		return (true);
167 	default: // some other trailing bit.
168 		return (false);
169 	}
170 }
171 
172 const int PAIR0  = 0;
173 const int PAIR1  = 1;
174 const int REQREP = 2;
175 
176 int
main(int argc,char ** argv)177 main(int argc, char **argv)
178 {
179 	char *prog;
180 
181 #if defined(NNG_HAVE_PAIR1)
182 	open_server = nng_pair1_open;
183 	open_client = nng_pair1_open;
184 #elif defined(NNG_HAVE_PAIR0)
185 	open_server = nng_pair0_open;
186 	open_client = nng_pair0_open;
187 #endif
188 
189 	// Allow -m <remote_lat> or whatever to override argv[0].
190 	if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
191 		prog = argv[2];
192 		argv += 3;
193 		argc -= 3;
194 	} else {
195 		prog = argv[0];
196 		argc--;
197 		argv++;
198 	}
199 	if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
200 		do_remote_lat(argc, argv);
201 	} else if (matches(prog, "local_lat") ||
202 	    matches(prog, "latency_server")) {
203 		do_local_lat(argc, argv);
204 	} else if (matches(prog, "local_thr") ||
205 	    matches(prog, "throughput_server")) {
206 		do_local_thr(argc, argv);
207 	} else if (matches(prog, "remote_thr") ||
208 	    matches(prog, "throughput_client")) {
209 		do_remote_thr(argc, argv);
210 	} else if (matches(prog, "inproc_thr")) {
211 		do_inproc_thr(argc, argv);
212 	} else if (matches(prog, "inproc_lat")) {
213 		do_inproc_lat(argc, argv);
214 	} else {
215 		die("Unknown program mode? Use -m <mode>.");
216 	}
217 }
218 
219 static void
die(const char * fmt,...)220 die(const char *fmt, ...)
221 {
222 	va_list ap;
223 
224 	va_start(ap, fmt);
225 	vfprintf(stderr, fmt, ap);
226 	va_end(ap);
227 	fprintf(stderr, "\n");
228 	exit(2);
229 }
230 
231 static int
parse_int(const char * arg,const char * what)232 parse_int(const char *arg, const char *what)
233 {
234 	long  val;
235 	char *eptr;
236 
237 	val = strtol(arg, &eptr, 10);
238 	// Must be a positive number less than around a billion.
239 	if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
240 		die("Invalid %s", what);
241 	}
242 	return ((int) val);
243 }
244 
245 void
do_local_lat(int argc,char ** argv)246 do_local_lat(int argc, char **argv)
247 {
248 	long int msgsize;
249 	long int trips;
250 
251 	if (argc != 3) {
252 		die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
253 	}
254 
255 	msgsize = parse_int(argv[1], "message size");
256 	trips   = parse_int(argv[2], "round-trips");
257 
258 	latency_server(argv[0], msgsize, trips);
259 }
260 
261 void
do_remote_lat(int argc,char ** argv)262 do_remote_lat(int argc, char **argv)
263 {
264 	int msgsize;
265 	int trips;
266 
267 	if (argc != 3) {
268 		die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
269 	}
270 
271 	msgsize = parse_int(argv[1], "message size");
272 	trips   = parse_int(argv[2], "round-trips");
273 
274 	latency_client(argv[0], msgsize, trips);
275 }
276 
277 void
do_local_thr(int argc,char ** argv)278 do_local_thr(int argc, char **argv)
279 {
280 	int msgsize;
281 	int trips;
282 
283 	if (argc != 3) {
284 		die("Usage: local_thr <listen-addr> <msg-size> <count>");
285 	}
286 
287 	msgsize = parse_int(argv[1], "message size");
288 	trips   = parse_int(argv[2], "count");
289 
290 	throughput_server(argv[0], msgsize, trips);
291 }
292 
293 void
do_remote_thr(int argc,char ** argv)294 do_remote_thr(int argc, char **argv)
295 {
296 	int msgsize;
297 	int trips;
298 
299 	if (argc != 3) {
300 		die("Usage: remote_thr <connect-to> <msg-size> <count>");
301 	}
302 
303 	msgsize = parse_int(argv[1], "message size");
304 	trips   = parse_int(argv[2], "count");
305 
306 	throughput_client(argv[0], msgsize, trips);
307 }
308 
309 struct inproc_args {
310 	int         count;
311 	int         msgsize;
312 	const char *addr;
313 	void (*func)(const char *, size_t, int);
314 };
315 
316 static void
do_inproc(void * args)317 do_inproc(void *args)
318 {
319 	struct inproc_args *ia = args;
320 
321 	ia->func(ia->addr, ia->msgsize, ia->count);
322 }
323 
324 void
do_inproc_lat(int argc,char ** argv)325 do_inproc_lat(int argc, char **argv)
326 {
327 	nng_thread *       thr;
328 	struct inproc_args ia;
329 	int                rv;
330 	int                val;
331 	int                optidx;
332 	char *             arg;
333 	char *             addr;
334 
335 	addr = "inproc://latency_test";
336 
337 	optidx = 0;
338 	while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
339 	    0) {
340 		switch (val) {
341 		case OPT_REQREP0:
342 			open_client = nng_req0_open;
343 			open_server = nng_rep0_open;
344 			break;
345 		case OPT_PAIR0:
346 			open_client = nng_pair0_open;
347 			open_server = nng_pair0_open;
348 			break;
349 		case OPT_PAIR1:
350 			open_client = nng_pair1_open;
351 			open_server = nng_pair1_open;
352 			break;
353 		case OPT_BUS0:
354 			open_client = nng_bus0_open;
355 			open_server = nng_bus0_open;
356 			break;
357 
358 		case OPT_URL:
359 			addr = arg;
360 			break;
361 		default:
362 			die("bad option");
363 		}
364 	}
365 	argc -= optidx;
366 	argv += optidx;
367 
368 	if (argc != 2) {
369 		die("Usage: inproc_lat <msg-size> <count>");
370 	}
371 
372 	ia.addr    = addr;
373 	ia.msgsize = parse_int(argv[0], "message size");
374 	ia.count   = parse_int(argv[1], "count");
375 	ia.func    = latency_server;
376 
377 	if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
378 		die("Cannot create thread: %s", nng_strerror(rv));
379 	}
380 
381 	// Sleep a bit.
382 	nng_msleep(100);
383 
384 	latency_client(addr, ia.msgsize, ia.count);
385 	nng_thread_destroy(thr);
386 }
387 
388 void
do_inproc_thr(int argc,char ** argv)389 do_inproc_thr(int argc, char **argv)
390 {
391 	nng_thread *       thr;
392 	struct inproc_args ia;
393 	int                rv;
394 	int                optidx;
395 	int                val;
396 	char *             arg;
397 	char *             addr = "inproc://throughput-test";
398 
399 	optidx = 0;
400 	while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
401 	    0) {
402 		switch (val) {
403 #if 0
404 		// For now these protocols simply do not work with
405 		// throughput -- they don't work with backpressure properly.
406 		// In the future we should support synchronizing in the same
407 		// process, and alerting the sender both on completion of
408 		// a single message, and on completion of all messages.
409 		case OPT_REQREP0:
410 			open_client = nng_req0_open;
411 			open_server = nng_rep0_open;
412 			break;
413 #endif
414 		case OPT_PAIR0:
415 			open_client = nng_pair0_open;
416 			open_server = nng_pair0_open;
417 			break;
418 		case OPT_PAIR1:
419 			open_client = nng_pair1_open;
420 			open_server = nng_pair1_open;
421 			break;
422 		case OPT_PIPELINE0:
423 			open_client = nng_pull0_open;
424 			open_server = nng_push0_open;
425 			break;
426 		case OPT_URL:
427 			addr = arg;
428 			break;
429 		default:
430 			die("bad option");
431 		}
432 	}
433 	argc -= optidx;
434 	argv += optidx;
435 
436 	if (argc != 2) {
437 		die("Usage: inproc_thr <msg-size> <count>");
438 	}
439 
440 	ia.addr    = addr;
441 	ia.msgsize = parse_int(argv[0], "message size");
442 	ia.count   = parse_int(argv[1], "count");
443 	ia.func    = throughput_server;
444 
445 	if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
446 		die("Cannot create thread: %s", nng_strerror(rv));
447 	}
448 
449 	// Sleep a bit.
450 	nng_msleep(100);
451 
452 	throughput_client(addr, ia.msgsize, ia.count);
453 	nng_thread_destroy(thr);
454 }
455 
456 void
latency_client(const char * addr,size_t msgsize,int trips)457 latency_client(const char *addr, size_t msgsize, int trips)
458 {
459 	nng_socket s;
460 	nng_msg *  msg;
461 	nng_time   start, end;
462 	int        rv;
463 	int        i;
464 	float      total;
465 	float      latency;
466 	if ((rv = open_client(&s)) != 0) {
467 		die("nng_socket: %s", nng_strerror(rv));
468 	}
469 
470 	// XXX: set no delay
471 	// XXX: other options (TLS in the future?, Linger?)
472 
473 	if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
474 		die("nng_dial: %s", nng_strerror(rv));
475 	}
476 
477 	nng_msleep(100);
478 
479 	if (nng_msg_alloc(&msg, msgsize) != 0) {
480 		die("nng_msg_alloc: %s", nng_strerror(rv));
481 	}
482 
483 	start = nng_clock();
484 	for (i = 0; i < trips; i++) {
485 		if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
486 			die("nng_sendmsg: %s", nng_strerror(rv));
487 		}
488 
489 		if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
490 			die("nng_recvmsg: %s", nng_strerror(rv));
491 		}
492 	}
493 	end = nng_clock();
494 
495 	nng_msg_free(msg);
496 	nng_close(s);
497 
498 	total   = (float) ((end - start)) / 1000;
499 	latency = ((float) ((total * 1000000)) / (float) (trips * 2));
500 	printf("total time: %.3f [s]\n", total);
501 	printf("message size: %d [B]\n", (int) msgsize);
502 	printf("round trip count: %d\n", trips);
503 	printf("average latency: %.3f [us]\n", latency);
504 }
505 
506 void
latency_server(const char * addr,size_t msgsize,int trips)507 latency_server(const char *addr, size_t msgsize, int trips)
508 {
509 	nng_socket s;
510 	nng_msg *  msg;
511 	int        rv;
512 	int        i;
513 
514 	if ((rv = open_server(&s)) != 0) {
515 		die("nng_socket: %s", nng_strerror(rv));
516 	}
517 
518 	// XXX: set no delay
519 	// XXX: other options (TLS in the future?, Linger?)
520 
521 	if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
522 		die("nng_listen: %s", nng_strerror(rv));
523 	}
524 
525 	for (i = 0; i < trips; i++) {
526 		if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
527 			die("nng_recvmsg: %s", nng_strerror(rv));
528 		}
529 		if (nng_msg_len(msg) != msgsize) {
530 			die("wrong message size: %lu != %lu", nng_msg_len(msg),
531 			    msgsize);
532 		}
533 		if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
534 			die("nng_sendmsg: %s", nng_strerror(rv));
535 		}
536 	}
537 
538 	// Wait a bit for things to drain... linger should do this.
539 	// 100ms ought to be enough.
540 	nng_msleep(100);
541 	nng_close(s);
542 }
543 
544 // Our throughput story is quite a mess.  Mostly I think because of the poor
545 // caching and message reuse.  We should probably implement a message pooling
546 // API somewhere.
547 
548 void
throughput_server(const char * addr,size_t msgsize,int count)549 throughput_server(const char *addr, size_t msgsize, int count)
550 {
551 	nng_socket s;
552 	nng_msg *  msg;
553 	int        rv;
554 	int        i;
555 	uint64_t   start, end;
556 	float      msgpersec, mbps, total;
557 
558 	if ((rv = nng_pair_open(&s)) != 0) {
559 		die("nng_socket: %s", nng_strerror(rv));
560 	}
561 	rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
562 	if (rv != 0) {
563 		die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
564 	}
565 
566 	// XXX: set no delay
567 	// XXX: other options (TLS in the future?, Linger?)
568 
569 	if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
570 		die("nng_listen: %s", nng_strerror(rv));
571 	}
572 
573 	// Receive first synchronization message.
574 	if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
575 		die("nng_recvmsg: %s", nng_strerror(rv));
576 	}
577 	nng_msg_free(msg);
578 	start = nng_clock();
579 
580 	for (i = 0; i < count; i++) {
581 		if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
582 			die("nng_recvmsg: %s", nng_strerror(rv));
583 		}
584 		if (nng_msg_len(msg) != msgsize) {
585 			die("wrong message size: %lu != %lu", nng_msg_len(msg),
586 			    msgsize);
587 		}
588 		nng_msg_free(msg);
589 	}
590 	end = nng_clock();
591 	// Send a synchronization message (empty) to the other side,
592 	// and wait a bit to make sure it goes out the wire.
593 	nng_send(s, "", 0, 0);
594 	nng_msleep(200);
595 	nng_close(s);
596 	total     = (float) ((end - start)) / 1000;
597 	msgpersec = (float) (count) / total;
598 	mbps      = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
599 	printf("total time: %.3f [s]\n", total);
600 	printf("message size: %d [B]\n", (int) msgsize);
601 	printf("message count: %d\n", count);
602 	printf("throughput: %.f [msg/s]\n", msgpersec);
603 	printf("throughput: %.3f [Mb/s]\n", mbps);
604 }
605 
606 void
throughput_client(const char * addr,size_t msgsize,int count)607 throughput_client(const char *addr, size_t msgsize, int count)
608 {
609 	nng_socket s;
610 	nng_msg *  msg;
611 	int        rv;
612 	int        i;
613 
614 	// We send one extra zero length message to start the timer.
615 	count++;
616 
617 	if ((rv = nng_pair_open(&s)) != 0) {
618 		die("nng_socket: %s", nng_strerror(rv));
619 	}
620 
621 	// XXX: set no delay
622 	// XXX: other options (TLS in the future?, Linger?)
623 
624 	rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
625 	if (rv != 0) {
626 		die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
627 	}
628 
629 	rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
630 	if (rv != 0) {
631 		die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
632 	}
633 
634 	if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
635 		die("nng_dial: %s", nng_strerror(rv));
636 	}
637 
638 	if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
639 		die("nng_msg_alloc: %s", nng_strerror(rv));
640 	}
641 	if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
642 		die("nng_sendmsg: %s", nng_strerror(rv));
643 	}
644 
645 	for (i = 0; i < count; i++) {
646 		if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
647 			die("nng_msg_alloc: %s", nng_strerror(rv));
648 		}
649 
650 		if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
651 			die("nng_sendmsg: %s", nng_strerror(rv));
652 		}
653 	}
654 
655 	// Attempt to get the completion indication from the other
656 	// side.
657 	if (nng_recvmsg(s, &msg, 0) == 0) {
658 		nng_msg_free(msg);
659 	}
660 
661 	nng_close(s);
662 }
663