1 /*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 #include "../include/zmq.h"
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <assert.h>
35 #include <time.h>
36 #include <stdarg.h>
37 #include <string.h>
38 #include <string>
39
40 #include "platform.hpp"
41
42 #if defined ZMQ_HAVE_WINDOWS
43 #include <windows.h>
44 #include <process.h>
45 #else
46 #include <pthread.h>
47 #include <unistd.h>
48 #endif
49
50
51 /*
52 Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
53
54 Topology:
55
56 XPUB SUB
57 | |
58 +-----> XSUB -> XPUB -----/
59 | ^^^^^^^^^^^^
60 XPUB ZMQ proxy
61
62 All connections use "inproc" transport. The two XPUB sockets start
63 flooding the proxy. The throughput is computed using the bytes received
64 in the SUB socket.
65 */
66
67
68 #define HWM 10000
69
70 #ifndef ARRAY_SIZE
71 #define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
72 #endif
73
74 #define TEST_ASSERT_SUCCESS_ERRNO(expr) \
75 test_assert_success_message_errno_helper (expr, NULL, #expr)
76
77 // This macro is used to avoid-variable warning. If used with an expression,
78 // the sizeof is not evaluated to avoid polluting the assembly code.
79 #ifdef NDEBUG
80 #define ASSERT_EXPR_SAFE(x) \
81 do { \
82 (void) sizeof (x); \
83 } while (0)
84 #else
85 #define ASSERT_EXPR_SAFE(x) assert (x)
86 #endif
87
88
89 static uint64_t message_count = 0;
90 static size_t message_size = 0;
91
92
93 typedef struct
94 {
95 void *context;
96 int thread_idx;
97 const char *frontend_endpoint[4];
98 const char *backend_endpoint[4];
99 const char *control_endpoint;
100 } proxy_hwm_cfg_t;
101
102
test_assert_success_message_errno_helper(int rc_,const char * msg_,const char * expr_)103 int test_assert_success_message_errno_helper (int rc_,
104 const char *msg_,
105 const char *expr_)
106 {
107 if (rc_ == -1) {
108 char buffer[512];
109 buffer[sizeof (buffer) - 1] =
110 0; // to ensure defined behavior with VC++ <= 2013
111 printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
112 msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
113 msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
114 exit (1);
115 }
116 return rc_;
117 }
118
set_hwm(void * skt)119 static void set_hwm (void *skt)
120 {
121 int hwm = HWM;
122
123 TEST_ASSERT_SUCCESS_ERRNO (
124 zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
125
126 TEST_ASSERT_SUCCESS_ERRNO (
127 zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
128 }
129
publisher_thread_main(void * pvoid)130 static void publisher_thread_main (void *pvoid)
131 {
132 const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
133 const int idx = cfg->thread_idx;
134 int optval;
135 int rc;
136
137 void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
138 assert (pubsocket);
139
140 set_hwm (pubsocket);
141
142 optval = 1;
143 TEST_ASSERT_SUCCESS_ERRNO (
144 zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
145
146 optval = 1;
147 TEST_ASSERT_SUCCESS_ERRNO (
148 zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));
149
150 TEST_ASSERT_SUCCESS_ERRNO (
151 zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));
152
153 // Wait before starting TX operations till 1 subscriber has subscribed
154 // (in this test there's 1 subscriber only)
155 char buffer[32] = {};
156 rc = TEST_ASSERT_SUCCESS_ERRNO (
157 zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
158 if (rc != 1) {
159 printf ("invalid response length: expected 1, received %d", rc);
160 exit (1);
161 }
162 if (buffer[0] != 1) {
163 printf ("invalid response value: expected 1, received %d",
164 (int) buffer[0]);
165 exit (1);
166 }
167
168 zmq_msg_t msg_orig;
169 rc = zmq_msg_init_size (&msg_orig, message_size);
170 assert (rc == 0);
171 memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));
172
173 uint64_t send_count = 0;
174 while (send_count < message_count) {
175 zmq_msg_t msg;
176 zmq_msg_init (&msg);
177 rc = zmq_msg_copy (&msg, &msg_orig);
178 assert (rc == 0);
179
180 // Send the message to the socket
181 rc = zmq_msg_send (&msg, pubsocket, 0);
182 if (rc != -1) {
183 send_count++;
184 } else {
185 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
186 }
187 }
188
189 zmq_close (pubsocket);
190 //printf ("publisher thread ended\n");
191 }
192
subscriber_thread_main(void * pvoid)193 static void subscriber_thread_main (void *pvoid)
194 {
195 const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
196 const int idx = cfg->thread_idx;
197
198 void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
199 assert (subsocket);
200
201 set_hwm (subsocket);
202
203 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
204
205 TEST_ASSERT_SUCCESS_ERRNO (
206 zmq_connect (subsocket, cfg->backend_endpoint[idx]));
207
208 // Receive message_count messages
209 uint64_t rxsuccess = 0;
210 bool success = true;
211 while (success) {
212 zmq_msg_t msg;
213 int rc = zmq_msg_init (&msg);
214 assert (rc == 0);
215
216 rc = zmq_msg_recv (&msg, subsocket, 0);
217 if (rc != -1) {
218 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
219 rxsuccess++;
220 }
221
222 if (rxsuccess == message_count)
223 break;
224 }
225
226 // Cleanup
227
228 zmq_close (subsocket);
229 //printf ("subscriber thread ended\n");
230 }
231
proxy_thread_main(void * pvoid)232 static void proxy_thread_main (void *pvoid)
233 {
234 const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
235 int rc;
236
237 // FRONTEND SUB
238
239 void *frontend_xsub = zmq_socket (
240 cfg->context,
241 ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
242 assert (frontend_xsub);
243
244 set_hwm (frontend_xsub);
245
246 // Bind FRONTEND
247 for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
248 const char *ep = cfg->frontend_endpoint[i];
249 if (ep != NULL) {
250 assert (strlen (ep) > 5);
251 rc = zmq_bind (frontend_xsub, ep);
252 ASSERT_EXPR_SAFE (rc == 0);
253 }
254 }
255
256 // BACKEND PUB
257
258 void *backend_xpub = zmq_socket (
259 cfg->context,
260 ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
261 assert (backend_xpub);
262
263 int optval = 1;
264 rc =
265 zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
266 ASSERT_EXPR_SAFE (rc == 0);
267
268 set_hwm (backend_xpub);
269
270 // Bind BACKEND
271 for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
272 const char *ep = cfg->backend_endpoint[i];
273 if (ep != NULL) {
274 assert (strlen (ep) > 5);
275 rc = zmq_bind (backend_xpub, ep);
276 ASSERT_EXPR_SAFE (rc == 0);
277 }
278 }
279
280 // CONTROL REP
281
282 void *control_rep = zmq_socket (
283 cfg->context,
284 ZMQ_REP); // This one is used by the proxy to receive&reply to commands
285 assert (control_rep);
286
287 // Bind CONTROL
288 rc = zmq_bind (control_rep, cfg->control_endpoint);
289 ASSERT_EXPR_SAFE (rc == 0);
290
291 // Start proxying!
292
293 zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
294
295 zmq_close (frontend_xsub);
296 zmq_close (backend_xpub);
297 zmq_close (control_rep);
298 //printf ("proxy thread ended\n");
299 }
300
terminate_proxy(const proxy_hwm_cfg_t * cfg)301 void terminate_proxy (const proxy_hwm_cfg_t *cfg)
302 {
303 // CONTROL REQ
304
305 void *control_req = zmq_socket (
306 cfg->context,
307 ZMQ_REQ); // This one can be used to send command to the proxy
308 assert (control_req);
309
310 // Connect CONTROL-REQ: a socket to which send commands
311 int rc = zmq_connect (control_req, cfg->control_endpoint);
312 ASSERT_EXPR_SAFE (rc == 0);
313
314 // Ask the proxy to exit: the subscriber has received all messages
315
316 rc = zmq_send (control_req, "TERMINATE", 9, 0);
317 ASSERT_EXPR_SAFE (rc == 9);
318
319 zmq_close (control_req);
320 }
321
322 // The main thread simply starts some publishers, a proxy,
323 // and a subscriber. Finish when all packets are received.
324
main(int argc,char * argv[])325 int main (int argc, char *argv[])
326 {
327 if (argc != 3) {
328 printf ("usage: proxy_thr <message-size> <message-count>\n");
329 return 1;
330 }
331
332 message_size = atoi (argv[1]);
333 message_count = atoi (argv[2]);
334 printf ("message size: %d [B]\n", (int) message_size);
335 printf ("message count: %d\n", (int) message_count);
336
337 void *context = zmq_ctx_new ();
338 assert (context);
339
340 int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
341 ASSERT_EXPR_SAFE (rv == 0);
342
343 // START ALL SECONDARY THREADS
344
345 const char *pub1 = "inproc://perf_pub1";
346 const char *pub2 = "inproc://perf_pub2";
347 const char *sub1 = "inproc://perf_backend";
348
349 proxy_hwm_cfg_t cfg_global = {};
350 cfg_global.context = context;
351 cfg_global.frontend_endpoint[0] = pub1;
352 cfg_global.frontend_endpoint[1] = pub2;
353 cfg_global.backend_endpoint[0] = sub1;
354 cfg_global.control_endpoint = "inproc://ctrl";
355
356 // Proxy
357 proxy_hwm_cfg_t cfg_proxy = cfg_global;
358 void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
359 assert (proxy != 0);
360
361 // Subscriber 1
362 proxy_hwm_cfg_t cfg_sub1 = cfg_global;
363 cfg_sub1.thread_idx = 0;
364 void *subscriber =
365 zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
366 assert (subscriber != 0);
367
368 // Start measuring
369 void *watch = zmq_stopwatch_start ();
370
371 // Publisher 1
372 proxy_hwm_cfg_t cfg_pub1 = cfg_global;
373 cfg_pub1.thread_idx = 0;
374 void *publisher1 =
375 zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
376 assert (publisher1 != 0);
377
378 // Publisher 2
379 proxy_hwm_cfg_t cfg_pub2 = cfg_global;
380 cfg_pub2.thread_idx = 1;
381 void *publisher2 =
382 zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
383 assert (publisher2 != 0);
384
385 // Wait for all packets to be received
386 zmq_threadclose (subscriber);
387
388 // Stop measuring
389 unsigned long elapsed = zmq_stopwatch_stop (watch);
390 if (elapsed == 0)
391 elapsed = 1;
392
393 unsigned long throughput =
394 (unsigned long) ((double) message_count / (double) elapsed * 1000000);
395 double megabits = (double) (throughput * message_size * 8) / 1000000;
396
397 printf ("mean throughput: %d [msg/s]\n", (int) throughput);
398 printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
399
400 // Wait for the end of publishers...
401 zmq_threadclose (publisher1);
402 zmq_threadclose (publisher2);
403
404 // ... then close the proxy
405 terminate_proxy (&cfg_proxy);
406 zmq_threadclose (proxy);
407
408 int rc = zmq_ctx_term (context);
409 ASSERT_EXPR_SAFE (rc == 0);
410
411 return 0;
412 }
413