1 //
2 // Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
3 // Copyright 2018 Capitar IT Group BV <info@capitar.com>
4 //
5 // This software is supplied under the terms of the MIT License, a
6 // copy of which should be located in the distribution where this
7 // file was obtained (LICENSE.txt). A copy of the license may also be
8 // found online at https://opensource.org/licenses/MIT.
9 //
10
11 #include <ctype.h>
12 #include <stdarg.h>
13 #include <stdint.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17
18 #include <nng/nng.h>
19 #include <nng/supplemental/util/options.h>
20 #include <nng/supplemental/util/platform.h>
21
22 static void die(const char *, ...);
23 static int
no_open(nng_socket * arg)24 no_open(nng_socket *arg)
25 {
26 (void) arg;
27 die("Protocol not supported in this build!");
28 return (NNG_ENOTSUP);
29 }
30
31 typedef int (*open_func)(nng_socket *);
32
33 static open_func open_server = no_open;
34 static open_func open_client = no_open;
35
36 #if defined(NNG_HAVE_PAIR1)
37 #include <nng/protocol/pair1/pair.h>
38 #else
39 #define nng_pair1_open no_open
40 #endif
41
42 #if defined(NNG_HAVE_PAIR0)
43 #include <nng/protocol/pair0/pair.h>
44 #else
45 #define nng_pair0_open no_open
46 #endif
47
48 #if defined(NNG_HAVE_REQ0)
49 #include <nng/protocol/reqrep0/req.h>
50 #else
51 #define nng_req0_open no_open
52 #endif
53
54 #if defined(NNG_HAVE_REP0)
55 #include <nng/protocol/reqrep0/rep.h>
56 #else
57 #define nng_rep0_open no_open
58 #endif
59
60 #if defined(NNG_HAVE_BUS0)
61 #include <nng/protocol/bus0/bus.h>
62 #else
63 #define nng_bus0_open no_open
64 #endif
65
66 #if defined(NNG_HAVE_PULL0)
67 #include <nng/protocol/pipeline0/pull.h>
68 #else
69 #define nng_pull0_open no_open
70 #endif
71
72 #if defined(NNG_HAVE_PUSH0)
73 #include <nng/protocol/pipeline0/push.h>
74 #else
75 #define nng_push0_open no_open
76 #endif
77
78 #if defined(NNG_HAVE_PUB0)
79 #include <nng/protocol/pubsub0/pub.h>
80 #else
81 #define nng_pub0_open no_open
82 #endif
83
84 #if defined(NNG_HAVE_SUB0)
85 #include <nng/protocol/pubsub0/sub.h>
86 #else
87 #define nng_sub0_open no_open
88 #endif
89
90 enum options {
91 OPT_PAIR0 = 1,
92 OPT_PAIR1,
93 OPT_REQREP0,
94 OPT_PUBSUB0,
95 OPT_PIPELINE0,
96 OPT_SURVEY0,
97 OPT_BUS0,
98 OPT_URL,
99 };
100
101 // These are not universally supported by the variants yet.
102 static nng_optspec opts[] = {
103 { .o_name = "pair1", .o_val = OPT_PAIR1 },
104 { .o_name = "pair0", .o_val = OPT_PAIR0 },
105 { .o_name = "reqrep0", .o_val = OPT_REQREP0 },
106 { .o_name = "bus0", .o_val = OPT_BUS0 },
107 { .o_name = "pubsub0", .o_val = OPT_PUBSUB0 },
108 { .o_name = "pipeline0", .o_val = OPT_PIPELINE0 },
109 { .o_name = "url", .o_val = OPT_URL, .o_arg = true },
110 { .o_name = NULL, .o_val = 0 },
111 };
112
113 static void latency_client(const char *, size_t, int);
114 static void latency_server(const char *, size_t, int);
115 static void throughput_client(const char *, size_t, int);
116 static void throughput_server(const char *, size_t, int);
117 static void do_remote_lat(int argc, char **argv);
118 static void do_local_lat(int argc, char **argv);
119 static void do_remote_thr(int argc, char **argv);
120 static void do_local_thr(int argc, char **argv);
121 static void do_inproc_thr(int argc, char **argv);
122 static void do_inproc_lat(int argc, char **argv);
123 static void die(const char *, ...);
124
125 // perf implements the same performance tests found in the standard
126 // nanomsg & mangos performance tests. As with mangos, the decision
127 // about which test to run is determined by the program name (ARGV[0}])
128 // that it is run under.
129 //
130 // Options are:
131 //
132 // - remote_lat - remote latency side (client, aka latency_client)
133 // - local_lat - local latency side (server, aka latency_server)
134 // - local_thr - local throughput side
135 // - remote_thr - remote throughput side
136 // - inproc_lat - inproc latency
137 // - inproc_thr - inproc throughput
138 //
139
140 bool
matches(const char * arg,const char * name)141 matches(const char *arg, const char *name)
142 {
143 const char *ptr = arg;
144 const char *x;
145
146 while (((x = strchr(ptr, '/')) != NULL) ||
147 ((x = strchr(ptr, '\\')) != NULL) ||
148 ((x = strchr(ptr, ':')) != NULL)) {
149 ptr = x + 1;
150 }
151 for (;;) {
152 if (*name == '\0') {
153 break;
154 }
155 if (tolower(*ptr) != *name) {
156 return (false);
157 }
158 ptr++;
159 name++;
160 }
161
162 switch (*ptr) {
163 case '\0':
164 /* FALLTHROUGH*/
165 case '.': // extension; ignore it.
166 return (true);
167 default: // some other trailing bit.
168 return (false);
169 }
170 }
171
172 const int PAIR0 = 0;
173 const int PAIR1 = 1;
174 const int REQREP = 2;
175
176 int
main(int argc,char ** argv)177 main(int argc, char **argv)
178 {
179 char *prog;
180
181 #if defined(NNG_HAVE_PAIR1)
182 open_server = nng_pair1_open;
183 open_client = nng_pair1_open;
184 #elif defined(NNG_HAVE_PAIR0)
185 open_server = nng_pair0_open;
186 open_client = nng_pair0_open;
187 #endif
188
189 // Allow -m <remote_lat> or whatever to override argv[0].
190 if ((argc >= 3) && (strcmp(argv[1], "-m") == 0)) {
191 prog = argv[2];
192 argv += 3;
193 argc -= 3;
194 } else {
195 prog = argv[0];
196 argc--;
197 argv++;
198 }
199 if (matches(prog, "remote_lat") || matches(prog, "latency_client")) {
200 do_remote_lat(argc, argv);
201 } else if (matches(prog, "local_lat") ||
202 matches(prog, "latency_server")) {
203 do_local_lat(argc, argv);
204 } else if (matches(prog, "local_thr") ||
205 matches(prog, "throughput_server")) {
206 do_local_thr(argc, argv);
207 } else if (matches(prog, "remote_thr") ||
208 matches(prog, "throughput_client")) {
209 do_remote_thr(argc, argv);
210 } else if (matches(prog, "inproc_thr")) {
211 do_inproc_thr(argc, argv);
212 } else if (matches(prog, "inproc_lat")) {
213 do_inproc_lat(argc, argv);
214 } else {
215 die("Unknown program mode? Use -m <mode>.");
216 }
217 }
218
219 static void
die(const char * fmt,...)220 die(const char *fmt, ...)
221 {
222 va_list ap;
223
224 va_start(ap, fmt);
225 vfprintf(stderr, fmt, ap);
226 va_end(ap);
227 fprintf(stderr, "\n");
228 exit(2);
229 }
230
231 static int
parse_int(const char * arg,const char * what)232 parse_int(const char *arg, const char *what)
233 {
234 long val;
235 char *eptr;
236
237 val = strtol(arg, &eptr, 10);
238 // Must be a positive number less than around a billion.
239 if ((val < 0) || (val > 1000000000) || (*eptr != 0) || (eptr == arg)) {
240 die("Invalid %s", what);
241 }
242 return ((int) val);
243 }
244
245 void
do_local_lat(int argc,char ** argv)246 do_local_lat(int argc, char **argv)
247 {
248 long int msgsize;
249 long int trips;
250
251 if (argc != 3) {
252 die("Usage: local_lat <listen-addr> <msg-size> <roundtrips>");
253 }
254
255 msgsize = parse_int(argv[1], "message size");
256 trips = parse_int(argv[2], "round-trips");
257
258 latency_server(argv[0], msgsize, trips);
259 }
260
261 void
do_remote_lat(int argc,char ** argv)262 do_remote_lat(int argc, char **argv)
263 {
264 int msgsize;
265 int trips;
266
267 if (argc != 3) {
268 die("Usage: remote_lat <connect-to> <msg-size> <roundtrips>");
269 }
270
271 msgsize = parse_int(argv[1], "message size");
272 trips = parse_int(argv[2], "round-trips");
273
274 latency_client(argv[0], msgsize, trips);
275 }
276
277 void
do_local_thr(int argc,char ** argv)278 do_local_thr(int argc, char **argv)
279 {
280 int msgsize;
281 int trips;
282
283 if (argc != 3) {
284 die("Usage: local_thr <listen-addr> <msg-size> <count>");
285 }
286
287 msgsize = parse_int(argv[1], "message size");
288 trips = parse_int(argv[2], "count");
289
290 throughput_server(argv[0], msgsize, trips);
291 }
292
293 void
do_remote_thr(int argc,char ** argv)294 do_remote_thr(int argc, char **argv)
295 {
296 int msgsize;
297 int trips;
298
299 if (argc != 3) {
300 die("Usage: remote_thr <connect-to> <msg-size> <count>");
301 }
302
303 msgsize = parse_int(argv[1], "message size");
304 trips = parse_int(argv[2], "count");
305
306 throughput_client(argv[0], msgsize, trips);
307 }
308
309 struct inproc_args {
310 int count;
311 int msgsize;
312 const char *addr;
313 void (*func)(const char *, size_t, int);
314 };
315
316 static void
do_inproc(void * args)317 do_inproc(void *args)
318 {
319 struct inproc_args *ia = args;
320
321 ia->func(ia->addr, ia->msgsize, ia->count);
322 }
323
324 void
do_inproc_lat(int argc,char ** argv)325 do_inproc_lat(int argc, char **argv)
326 {
327 nng_thread * thr;
328 struct inproc_args ia;
329 int rv;
330 int val;
331 int optidx;
332 char * arg;
333 char * addr;
334
335 addr = "inproc://latency_test";
336
337 optidx = 0;
338 while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
339 0) {
340 switch (val) {
341 case OPT_REQREP0:
342 open_client = nng_req0_open;
343 open_server = nng_rep0_open;
344 break;
345 case OPT_PAIR0:
346 open_client = nng_pair0_open;
347 open_server = nng_pair0_open;
348 break;
349 case OPT_PAIR1:
350 open_client = nng_pair1_open;
351 open_server = nng_pair1_open;
352 break;
353 case OPT_BUS0:
354 open_client = nng_bus0_open;
355 open_server = nng_bus0_open;
356 break;
357
358 case OPT_URL:
359 addr = arg;
360 break;
361 default:
362 die("bad option");
363 }
364 }
365 argc -= optidx;
366 argv += optidx;
367
368 if (argc != 2) {
369 die("Usage: inproc_lat <msg-size> <count>");
370 }
371
372 ia.addr = addr;
373 ia.msgsize = parse_int(argv[0], "message size");
374 ia.count = parse_int(argv[1], "count");
375 ia.func = latency_server;
376
377 if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
378 die("Cannot create thread: %s", nng_strerror(rv));
379 }
380
381 // Sleep a bit.
382 nng_msleep(100);
383
384 latency_client(addr, ia.msgsize, ia.count);
385 nng_thread_destroy(thr);
386 }
387
388 void
do_inproc_thr(int argc,char ** argv)389 do_inproc_thr(int argc, char **argv)
390 {
391 nng_thread * thr;
392 struct inproc_args ia;
393 int rv;
394 int optidx;
395 int val;
396 char * arg;
397 char * addr = "inproc://throughput-test";
398
399 optidx = 0;
400 while ((rv = nng_opts_parse(argc, argv, opts, &val, &arg, &optidx)) ==
401 0) {
402 switch (val) {
403 #if 0
404 // For now these protocols simply do not work with
405 // throughput -- they don't work with backpressure properly.
406 // In the future we should support synchronizing in the same
407 // process, and alerting the sender both on completion of
408 // a single message, and on completion of all messages.
409 case OPT_REQREP0:
410 open_client = nng_req0_open;
411 open_server = nng_rep0_open;
412 break;
413 #endif
414 case OPT_PAIR0:
415 open_client = nng_pair0_open;
416 open_server = nng_pair0_open;
417 break;
418 case OPT_PAIR1:
419 open_client = nng_pair1_open;
420 open_server = nng_pair1_open;
421 break;
422 case OPT_PIPELINE0:
423 open_client = nng_pull0_open;
424 open_server = nng_push0_open;
425 break;
426 case OPT_URL:
427 addr = arg;
428 break;
429 default:
430 die("bad option");
431 }
432 }
433 argc -= optidx;
434 argv += optidx;
435
436 if (argc != 2) {
437 die("Usage: inproc_thr <msg-size> <count>");
438 }
439
440 ia.addr = addr;
441 ia.msgsize = parse_int(argv[0], "message size");
442 ia.count = parse_int(argv[1], "count");
443 ia.func = throughput_server;
444
445 if ((rv = nng_thread_create(&thr, do_inproc, &ia)) != 0) {
446 die("Cannot create thread: %s", nng_strerror(rv));
447 }
448
449 // Sleep a bit.
450 nng_msleep(100);
451
452 throughput_client(addr, ia.msgsize, ia.count);
453 nng_thread_destroy(thr);
454 }
455
456 void
latency_client(const char * addr,size_t msgsize,int trips)457 latency_client(const char *addr, size_t msgsize, int trips)
458 {
459 nng_socket s;
460 nng_msg * msg;
461 nng_time start, end;
462 int rv;
463 int i;
464 float total;
465 float latency;
466 if ((rv = open_client(&s)) != 0) {
467 die("nng_socket: %s", nng_strerror(rv));
468 }
469
470 // XXX: set no delay
471 // XXX: other options (TLS in the future?, Linger?)
472
473 if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
474 die("nng_dial: %s", nng_strerror(rv));
475 }
476
477 nng_msleep(100);
478
479 if (nng_msg_alloc(&msg, msgsize) != 0) {
480 die("nng_msg_alloc: %s", nng_strerror(rv));
481 }
482
483 start = nng_clock();
484 for (i = 0; i < trips; i++) {
485 if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
486 die("nng_sendmsg: %s", nng_strerror(rv));
487 }
488
489 if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
490 die("nng_recvmsg: %s", nng_strerror(rv));
491 }
492 }
493 end = nng_clock();
494
495 nng_msg_free(msg);
496 nng_close(s);
497
498 total = (float) ((end - start)) / 1000;
499 latency = ((float) ((total * 1000000)) / (float) (trips * 2));
500 printf("total time: %.3f [s]\n", total);
501 printf("message size: %d [B]\n", (int) msgsize);
502 printf("round trip count: %d\n", trips);
503 printf("average latency: %.3f [us]\n", latency);
504 }
505
506 void
latency_server(const char * addr,size_t msgsize,int trips)507 latency_server(const char *addr, size_t msgsize, int trips)
508 {
509 nng_socket s;
510 nng_msg * msg;
511 int rv;
512 int i;
513
514 if ((rv = open_server(&s)) != 0) {
515 die("nng_socket: %s", nng_strerror(rv));
516 }
517
518 // XXX: set no delay
519 // XXX: other options (TLS in the future?, Linger?)
520
521 if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
522 die("nng_listen: %s", nng_strerror(rv));
523 }
524
525 for (i = 0; i < trips; i++) {
526 if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
527 die("nng_recvmsg: %s", nng_strerror(rv));
528 }
529 if (nng_msg_len(msg) != msgsize) {
530 die("wrong message size: %lu != %lu", nng_msg_len(msg),
531 msgsize);
532 }
533 if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
534 die("nng_sendmsg: %s", nng_strerror(rv));
535 }
536 }
537
538 // Wait a bit for things to drain... linger should do this.
539 // 100ms ought to be enough.
540 nng_msleep(100);
541 nng_close(s);
542 }
543
544 // Our throughput story is quite a mess. Mostly I think because of the poor
545 // caching and message reuse. We should probably implement a message pooling
546 // API somewhere.
547
548 void
throughput_server(const char * addr,size_t msgsize,int count)549 throughput_server(const char *addr, size_t msgsize, int count)
550 {
551 nng_socket s;
552 nng_msg * msg;
553 int rv;
554 int i;
555 uint64_t start, end;
556 float msgpersec, mbps, total;
557
558 if ((rv = nng_pair_open(&s)) != 0) {
559 die("nng_socket: %s", nng_strerror(rv));
560 }
561 rv = nng_socket_set_int(s, NNG_OPT_RECVBUF, 128);
562 if (rv != 0) {
563 die("nng_socket_set(nng_opt_recvbuf): %s", nng_strerror(rv));
564 }
565
566 // XXX: set no delay
567 // XXX: other options (TLS in the future?, Linger?)
568
569 if ((rv = nng_listen(s, addr, NULL, 0)) != 0) {
570 die("nng_listen: %s", nng_strerror(rv));
571 }
572
573 // Receive first synchronization message.
574 if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
575 die("nng_recvmsg: %s", nng_strerror(rv));
576 }
577 nng_msg_free(msg);
578 start = nng_clock();
579
580 for (i = 0; i < count; i++) {
581 if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
582 die("nng_recvmsg: %s", nng_strerror(rv));
583 }
584 if (nng_msg_len(msg) != msgsize) {
585 die("wrong message size: %lu != %lu", nng_msg_len(msg),
586 msgsize);
587 }
588 nng_msg_free(msg);
589 }
590 end = nng_clock();
591 // Send a synchronization message (empty) to the other side,
592 // and wait a bit to make sure it goes out the wire.
593 nng_send(s, "", 0, 0);
594 nng_msleep(200);
595 nng_close(s);
596 total = (float) ((end - start)) / 1000;
597 msgpersec = (float) (count) / total;
598 mbps = (float) (msgpersec * 8 * msgsize) / (1024 * 1024);
599 printf("total time: %.3f [s]\n", total);
600 printf("message size: %d [B]\n", (int) msgsize);
601 printf("message count: %d\n", count);
602 printf("throughput: %.f [msg/s]\n", msgpersec);
603 printf("throughput: %.3f [Mb/s]\n", mbps);
604 }
605
606 void
throughput_client(const char * addr,size_t msgsize,int count)607 throughput_client(const char *addr, size_t msgsize, int count)
608 {
609 nng_socket s;
610 nng_msg * msg;
611 int rv;
612 int i;
613
614 // We send one extra zero length message to start the timer.
615 count++;
616
617 if ((rv = nng_pair_open(&s)) != 0) {
618 die("nng_socket: %s", nng_strerror(rv));
619 }
620
621 // XXX: set no delay
622 // XXX: other options (TLS in the future?, Linger?)
623
624 rv = nng_socket_set_int(s, NNG_OPT_SENDBUF, 128);
625 if (rv != 0) {
626 die("nng_socket_set(nng_opt_sendbuf): %s", nng_strerror(rv));
627 }
628
629 rv = nng_socket_set_ms(s, NNG_OPT_RECVTIMEO, 5000);
630 if (rv != 0) {
631 die("nng_socket_set(nng_opt_recvtimeo): %s", nng_strerror(rv));
632 }
633
634 if ((rv = nng_dial(s, addr, NULL, 0)) != 0) {
635 die("nng_dial: %s", nng_strerror(rv));
636 }
637
638 if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
639 die("nng_msg_alloc: %s", nng_strerror(rv));
640 }
641 if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
642 die("nng_sendmsg: %s", nng_strerror(rv));
643 }
644
645 for (i = 0; i < count; i++) {
646 if ((rv = nng_msg_alloc(&msg, msgsize)) != 0) {
647 die("nng_msg_alloc: %s", nng_strerror(rv));
648 }
649
650 if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
651 die("nng_sendmsg: %s", nng_strerror(rv));
652 }
653 }
654
655 // Attempt to get the completion indication from the other
656 // side.
657 if (nng_recvmsg(s, &msg, 0) == 0) {
658 nng_msg_free(msg);
659 }
660
661 nng_close(s);
662 }
663