1 /*  =========================================================================
2     perf_remote - remote performance peer
3 
4     -------------------------------------------------------------------------
5     Copyright (c) the Contributors as noted in the AUTHORS file.
6 
7     This file is part of Zyre, an open-source framework for proximity-based
8     peer-to-peer applications -- See http://zyre.org.
9 
10     This Source Code Form is subject to the terms of the Mozilla Public
11     License, v. 2.0. If a copy of the MPL was not distributed with this
12     file, You can obtain one at http://mozilla.org/MPL/2.0/.
13     =========================================================================
14 */
15 
16 #include <czmq.h>
17 #include "zyre_classes.h"
18 
19 static bool
s_node_recv(zyre_t * node,char * command,char * expected)20 s_node_recv (zyre_t *node, char* command, char* expected)
21 {
22     bool result = false;
23     zmsg_t *incoming = zyre_recv (node);
24     assert (incoming);
25 
26     char *event = zmsg_popstr (incoming);
27     if (streq (event, command)) {
28         char *peer = zmsg_popstr (incoming);
29         free(zmsg_popstr(incoming)); // peer name
30         char *group = NULL;
31         if (streq (command, "SHOUT"))
32             group = zmsg_popstr (incoming);
33 
34         char *cookie = zmsg_popstr (incoming);
35         if (streq (cookie, expected))
36             result = true;
37 
38         free (peer);
39         if (group)
40             free (group);
41         free (cookie);
42     }
43     free (event);
44     zmsg_destroy (&incoming);
45 
46     return result;
47 }
48 
49 
50 int
main(int argc,char * argv[])51 main (int argc, char *argv [])
52 {
53     //  Get number of remote nodes to simulate, default 25
54     //  If we run multiple perf_remote on multiple machines,
55     //  max_node must be sum of all the remote node counts.
56     int max_node = 25;
57     int max_message = 10000;
58     int nbr_node = 0;
59     int nbr_hello_response = 0;
60     int nbr_message = 0;
61     int nbr_message_response = 0;
62 
63     if (argc > 1)
64         max_node = atoi (argv [1]);
65     if (argc > 2)
66         max_message = atoi (argv [2]);
67 
68     //  Set max sockets to system maximum
69     zsys_set_max_sockets(0);
70 
71     zyre_t *node = zyre_new (NULL);
72     zyre_start (node);
73     zyre_join (node, "GLOBAL");
74 
75     int64_t start = zclock_mono ();
76     int64_t elapse;
77 
78     char **peers = (char **) zmalloc (sizeof (char *) * max_node);
79 
80     while (true) {
81         zmsg_t *incoming = zyre_recv (node);
82         if (!incoming)
83             break;              //  Interrupted
84 
85         //  If new peer, say hello to it and wait for it to answer us
86         char *event = zmsg_popstr (incoming);
87         if (streq (event, "ENTER") && nbr_node < max_node) {
88             char *peer = zmsg_popstr (incoming);
89             peers [nbr_node++] = peer;
90 
91             if (nbr_node == max_node) {
92                 // got HELLO from the all remote nodes
93                 elapse = zclock_mono () - start;
94                 printf ("Took %ld ms to coordinate with all remote\n",
95                         (long) elapse);
96             }
97         }
98         else
99         if (streq (event, "WHISPER") && nbr_hello_response < max_node) {
100             char *peer = zmsg_popstr (incoming);
101             free(zmsg_popstr(incoming)); // peer name
102             char *cookie = zmsg_popstr (incoming);
103 
104             if (streq (cookie, "R:HELLO")) {
105                 if (++nbr_hello_response == max_node) {
106                     // got HELLO from the all remote nodes
107                     elapse = zclock_mono () - start;
108                     printf ("Took %ld ms to get greeting from all remote\n",
109                             (long) elapse);
110                 }
111             }
112             free (peer);
113             free (cookie);
114         }
115         free (event);
116         zmsg_destroy (&incoming);
117 
118         if (nbr_node == max_node && nbr_hello_response == max_node)
119             break;
120     }
121 
122     //  Send WHISPER message
123     start = zclock_mono ();
124     zpoller_t *poller = zpoller_new (zyre_socket (node), NULL);
125     for (nbr_message = 0; nbr_message < max_message; nbr_message++) {
126         zyre_whispers (node, peers [nbr_message % max_node], "S:WHISPER");
127         while (zpoller_wait (poller, 0))
128             if (s_node_recv (node, "WHISPER", "R:WHISPER"))
129                 nbr_message_response++;
130     }
131 
132     while (nbr_message_response < max_message)
133         if (s_node_recv (node, "WHISPER", "R:WHISPER"))
134             nbr_message_response++;
135 
136     //  Got WHISPER response from the all remote nodes
137     elapse = zclock_mono () - start;
138     printf ("Took %ld ms to send/receive %d message. %.2f msg/s \n", (long)elapse, max_message, (float) max_message * 1000 / elapse);
139 
140     //  send SHOUT message
141     start = zclock_mono ();
142     nbr_message = 0;
143     nbr_message_response = 0;
144 
145     max_message = max_message / max_node;
146 
147     for (nbr_message = 0; nbr_message < max_message; nbr_message++) {
148         zyre_shouts (node, "GLOBAL", "S:SHOUT");
149         while (zpoller_wait (poller, 0))
150             if (s_node_recv (node, "SHOUT", "R:SHOUT"))
151                 nbr_message_response++;
152     }
153 
154     while (nbr_message_response < max_message * max_node)
155         if (s_node_recv (node, "SHOUT", "R:SHOUT"))
156             nbr_message_response++;
157 
158     // got SHOUT response from the all remote nodes
159     elapse = zclock_mono () - start;
160     printf ("Took %ld ms to send %d, recv %d GROUP message. %.2f msg/s \n",
161             (long) elapse, max_message, max_node * max_message,
162             (float) max_node * max_message * 1000 / elapse);
163 
164     zyre_destroy (&node);
165     for (nbr_node = 0; nbr_node < max_node; nbr_node++) {
166         free (peers[nbr_node]);
167     }
168     zpoller_destroy (&poller);
169     free (peers);
170     return 0;
171 }
172