1 /* =========================================================================
2 perf_remote - remote performance peer
3
4 -------------------------------------------------------------------------
5 Copyright (c) the Contributors as noted in the AUTHORS file.
6
7 This file is part of Zyre, an open-source framework for proximity-based
8 peer-to-peer applications -- See http://zyre.org.
9
10 This Source Code Form is subject to the terms of the Mozilla Public
11 License, v. 2.0. If a copy of the MPL was not distributed with this
12 file, You can obtain one at http://mozilla.org/MPL/2.0/.
13 =========================================================================
14 */
15
16 #include <czmq.h>
17 #include "zyre_classes.h"
18
19 static bool
s_node_recv(zyre_t * node,char * command,char * expected)20 s_node_recv (zyre_t *node, char* command, char* expected)
21 {
22 bool result = false;
23 zmsg_t *incoming = zyre_recv (node);
24 assert (incoming);
25
26 char *event = zmsg_popstr (incoming);
27 if (streq (event, command)) {
28 char *peer = zmsg_popstr (incoming);
29 free(zmsg_popstr(incoming)); // peer name
30 char *group = NULL;
31 if (streq (command, "SHOUT"))
32 group = zmsg_popstr (incoming);
33
34 char *cookie = zmsg_popstr (incoming);
35 if (streq (cookie, expected))
36 result = true;
37
38 free (peer);
39 if (group)
40 free (group);
41 free (cookie);
42 }
43 free (event);
44 zmsg_destroy (&incoming);
45
46 return result;
47 }
48
49
50 int
main(int argc,char * argv[])51 main (int argc, char *argv [])
52 {
53 // Get number of remote nodes to simulate, default 25
54 // If we run multiple perf_remote on multiple machines,
55 // max_node must be sum of all the remote node counts.
56 int max_node = 25;
57 int max_message = 10000;
58 int nbr_node = 0;
59 int nbr_hello_response = 0;
60 int nbr_message = 0;
61 int nbr_message_response = 0;
62
63 if (argc > 1)
64 max_node = atoi (argv [1]);
65 if (argc > 2)
66 max_message = atoi (argv [2]);
67
68 // Set max sockets to system maximum
69 zsys_set_max_sockets(0);
70
71 zyre_t *node = zyre_new (NULL);
72 zyre_start (node);
73 zyre_join (node, "GLOBAL");
74
75 int64_t start = zclock_mono ();
76 int64_t elapse;
77
78 char **peers = (char **) zmalloc (sizeof (char *) * max_node);
79
80 while (true) {
81 zmsg_t *incoming = zyre_recv (node);
82 if (!incoming)
83 break; // Interrupted
84
85 // If new peer, say hello to it and wait for it to answer us
86 char *event = zmsg_popstr (incoming);
87 if (streq (event, "ENTER") && nbr_node < max_node) {
88 char *peer = zmsg_popstr (incoming);
89 peers [nbr_node++] = peer;
90
91 if (nbr_node == max_node) {
92 // got HELLO from the all remote nodes
93 elapse = zclock_mono () - start;
94 printf ("Took %ld ms to coordinate with all remote\n",
95 (long) elapse);
96 }
97 }
98 else
99 if (streq (event, "WHISPER") && nbr_hello_response < max_node) {
100 char *peer = zmsg_popstr (incoming);
101 free(zmsg_popstr(incoming)); // peer name
102 char *cookie = zmsg_popstr (incoming);
103
104 if (streq (cookie, "R:HELLO")) {
105 if (++nbr_hello_response == max_node) {
106 // got HELLO from the all remote nodes
107 elapse = zclock_mono () - start;
108 printf ("Took %ld ms to get greeting from all remote\n",
109 (long) elapse);
110 }
111 }
112 free (peer);
113 free (cookie);
114 }
115 free (event);
116 zmsg_destroy (&incoming);
117
118 if (nbr_node == max_node && nbr_hello_response == max_node)
119 break;
120 }
121
122 // Send WHISPER message
123 start = zclock_mono ();
124 zpoller_t *poller = zpoller_new (zyre_socket (node), NULL);
125 for (nbr_message = 0; nbr_message < max_message; nbr_message++) {
126 zyre_whispers (node, peers [nbr_message % max_node], "S:WHISPER");
127 while (zpoller_wait (poller, 0))
128 if (s_node_recv (node, "WHISPER", "R:WHISPER"))
129 nbr_message_response++;
130 }
131
132 while (nbr_message_response < max_message)
133 if (s_node_recv (node, "WHISPER", "R:WHISPER"))
134 nbr_message_response++;
135
136 // Got WHISPER response from the all remote nodes
137 elapse = zclock_mono () - start;
138 printf ("Took %ld ms to send/receive %d message. %.2f msg/s \n", (long)elapse, max_message, (float) max_message * 1000 / elapse);
139
140 // send SHOUT message
141 start = zclock_mono ();
142 nbr_message = 0;
143 nbr_message_response = 0;
144
145 max_message = max_message / max_node;
146
147 for (nbr_message = 0; nbr_message < max_message; nbr_message++) {
148 zyre_shouts (node, "GLOBAL", "S:SHOUT");
149 while (zpoller_wait (poller, 0))
150 if (s_node_recv (node, "SHOUT", "R:SHOUT"))
151 nbr_message_response++;
152 }
153
154 while (nbr_message_response < max_message * max_node)
155 if (s_node_recv (node, "SHOUT", "R:SHOUT"))
156 nbr_message_response++;
157
158 // got SHOUT response from the all remote nodes
159 elapse = zclock_mono () - start;
160 printf ("Took %ld ms to send %d, recv %d GROUP message. %.2f msg/s \n",
161 (long) elapse, max_message, max_node * max_message,
162 (float) max_node * max_message * 1000 / elapse);
163
164 zyre_destroy (&node);
165 for (nbr_node = 0; nbr_node < max_node; nbr_node++) {
166 free (peers[nbr_node]);
167 }
168 zpoller_destroy (&poller);
169 free (peers);
170 return 0;
171 }
172