1 /*
2     Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 #include "../include/zmq.h"
30 
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <assert.h>
35 #include <time.h>
36 #include <stdarg.h>
37 #include <string.h>
38 #include <string>
39 
40 #include "platform.hpp"
41 
42 #if defined ZMQ_HAVE_WINDOWS
43 #include <windows.h>
44 #include <process.h>
45 #else
46 #include <pthread.h>
47 #include <unistd.h>
48 #endif
49 
50 
51 /*
52    Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
53 
54    Topology:
55 
56      XPUB                      SUB
57       |                         |
58       +-----> XSUB -> XPUB -----/
59       |       ^^^^^^^^^^^^
60      XPUB      ZMQ proxy
61 
62    All connections use "inproc" transport. The two XPUB sockets start
63    flooding the proxy. The throughput is computed using the bytes received
64    in the SUB socket.
65 */
66 
67 
68 #define HWM 10000
69 
70 #ifndef ARRAY_SIZE
71 #define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
72 #endif
73 
74 #define TEST_ASSERT_SUCCESS_ERRNO(expr)                                        \
75     test_assert_success_message_errno_helper (expr, NULL, #expr)
76 
77 // This macro is used to avoid-variable warning. If used with an expression,
78 // the sizeof is not evaluated to avoid polluting the assembly code.
79 #ifdef NDEBUG
80 #define ASSERT_EXPR_SAFE(x)                                                    \
81     do {                                                                       \
82         (void) sizeof (x);                                                     \
83     } while (0)
84 #else
85 #define ASSERT_EXPR_SAFE(x) assert (x)
86 #endif
87 
88 
89 static uint64_t message_count = 0;
90 static size_t message_size = 0;
91 
92 
93 typedef struct
94 {
95     void *context;
96     int thread_idx;
97     const char *frontend_endpoint[4];
98     const char *backend_endpoint[4];
99     const char *control_endpoint;
100 } proxy_hwm_cfg_t;
101 
102 
test_assert_success_message_errno_helper(int rc_,const char * msg_,const char * expr_)103 int test_assert_success_message_errno_helper (int rc_,
104                                               const char *msg_,
105                                               const char *expr_)
106 {
107     if (rc_ == -1) {
108         char buffer[512];
109         buffer[sizeof (buffer) - 1] =
110           0; //  to ensure defined behavior with VC++ <= 2013
111         printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
112                 msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
113                 msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
114         exit (1);
115     }
116     return rc_;
117 }
118 
set_hwm(void * skt)119 static void set_hwm (void *skt)
120 {
121     int hwm = HWM;
122 
123     TEST_ASSERT_SUCCESS_ERRNO (
124       zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
125 
126     TEST_ASSERT_SUCCESS_ERRNO (
127       zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
128 }
129 
publisher_thread_main(void * pvoid)130 static void publisher_thread_main (void *pvoid)
131 {
132     const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
133     const int idx = cfg->thread_idx;
134     int optval;
135     int rc;
136 
137     void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
138     assert (pubsocket);
139 
140     set_hwm (pubsocket);
141 
142     optval = 1;
143     TEST_ASSERT_SUCCESS_ERRNO (
144       zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
145 
146     optval = 1;
147     TEST_ASSERT_SUCCESS_ERRNO (
148       zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));
149 
150     TEST_ASSERT_SUCCESS_ERRNO (
151       zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));
152 
153     //  Wait before starting TX operations till 1 subscriber has subscribed
154     //  (in this test there's 1 subscriber only)
155     char buffer[32] = {};
156     rc = TEST_ASSERT_SUCCESS_ERRNO (
157       zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
158     if (rc != 1) {
159         printf ("invalid response length: expected 1, received %d", rc);
160         exit (1);
161     }
162     if (buffer[0] != 1) {
163         printf ("invalid response value: expected 1, received %d",
164                 (int) buffer[0]);
165         exit (1);
166     }
167 
168     zmq_msg_t msg_orig;
169     rc = zmq_msg_init_size (&msg_orig, message_size);
170     assert (rc == 0);
171     memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));
172 
173     uint64_t send_count = 0;
174     while (send_count < message_count) {
175         zmq_msg_t msg;
176         zmq_msg_init (&msg);
177         rc = zmq_msg_copy (&msg, &msg_orig);
178         assert (rc == 0);
179 
180         //  Send the message to the socket
181         rc = zmq_msg_send (&msg, pubsocket, 0);
182         if (rc != -1) {
183             send_count++;
184         } else {
185             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
186         }
187     }
188 
189     zmq_close (pubsocket);
190     //printf ("publisher thread ended\n");
191 }
192 
subscriber_thread_main(void * pvoid)193 static void subscriber_thread_main (void *pvoid)
194 {
195     const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
196     const int idx = cfg->thread_idx;
197 
198     void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
199     assert (subsocket);
200 
201     set_hwm (subsocket);
202 
203     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
204 
205     TEST_ASSERT_SUCCESS_ERRNO (
206       zmq_connect (subsocket, cfg->backend_endpoint[idx]));
207 
208     //  Receive message_count messages
209     uint64_t rxsuccess = 0;
210     bool success = true;
211     while (success) {
212         zmq_msg_t msg;
213         int rc = zmq_msg_init (&msg);
214         assert (rc == 0);
215 
216         rc = zmq_msg_recv (&msg, subsocket, 0);
217         if (rc != -1) {
218             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
219             rxsuccess++;
220         }
221 
222         if (rxsuccess == message_count)
223             break;
224     }
225 
226     //  Cleanup
227 
228     zmq_close (subsocket);
229     //printf ("subscriber thread ended\n");
230 }
231 
proxy_thread_main(void * pvoid)232 static void proxy_thread_main (void *pvoid)
233 {
234     const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
235     int rc;
236 
237     //  FRONTEND SUB
238 
239     void *frontend_xsub = zmq_socket (
240       cfg->context,
241       ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
242     assert (frontend_xsub);
243 
244     set_hwm (frontend_xsub);
245 
246     //  Bind FRONTEND
247     for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
248         const char *ep = cfg->frontend_endpoint[i];
249         if (ep != NULL) {
250             assert (strlen (ep) > 5);
251             rc = zmq_bind (frontend_xsub, ep);
252             ASSERT_EXPR_SAFE (rc == 0);
253         }
254     }
255 
256     //  BACKEND PUB
257 
258     void *backend_xpub = zmq_socket (
259       cfg->context,
260       ZMQ_XPUB); //  the backend is the one exposed to the external world (TCP)
261     assert (backend_xpub);
262 
263     int optval = 1;
264     rc =
265       zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
266     ASSERT_EXPR_SAFE (rc == 0);
267 
268     set_hwm (backend_xpub);
269 
270     //  Bind BACKEND
271     for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
272         const char *ep = cfg->backend_endpoint[i];
273         if (ep != NULL) {
274             assert (strlen (ep) > 5);
275             rc = zmq_bind (backend_xpub, ep);
276             ASSERT_EXPR_SAFE (rc == 0);
277         }
278     }
279 
280     //  CONTROL REP
281 
282     void *control_rep = zmq_socket (
283       cfg->context,
284       ZMQ_REP); //  This one is used by the proxy to receive&reply to commands
285     assert (control_rep);
286 
287     //  Bind CONTROL
288     rc = zmq_bind (control_rep, cfg->control_endpoint);
289     ASSERT_EXPR_SAFE (rc == 0);
290 
291     //  Start proxying!
292 
293     zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
294 
295     zmq_close (frontend_xsub);
296     zmq_close (backend_xpub);
297     zmq_close (control_rep);
298     //printf ("proxy thread ended\n");
299 }
300 
terminate_proxy(const proxy_hwm_cfg_t * cfg)301 void terminate_proxy (const proxy_hwm_cfg_t *cfg)
302 {
303     //  CONTROL REQ
304 
305     void *control_req = zmq_socket (
306       cfg->context,
307       ZMQ_REQ); //  This one can be used to send command to the proxy
308     assert (control_req);
309 
310     //  Connect CONTROL-REQ: a socket to which send commands
311     int rc = zmq_connect (control_req, cfg->control_endpoint);
312     ASSERT_EXPR_SAFE (rc == 0);
313 
314     //  Ask the proxy to exit: the subscriber has received all messages
315 
316     rc = zmq_send (control_req, "TERMINATE", 9, 0);
317     ASSERT_EXPR_SAFE (rc == 9);
318 
319     zmq_close (control_req);
320 }
321 
322 //  The main thread simply starts some publishers, a proxy,
323 //  and a subscriber. Finish when all packets are received.
324 
main(int argc,char * argv[])325 int main (int argc, char *argv[])
326 {
327     if (argc != 3) {
328         printf ("usage: proxy_thr <message-size> <message-count>\n");
329         return 1;
330     }
331 
332     message_size = atoi (argv[1]);
333     message_count = atoi (argv[2]);
334     printf ("message size: %d [B]\n", (int) message_size);
335     printf ("message count: %d\n", (int) message_count);
336 
337     void *context = zmq_ctx_new ();
338     assert (context);
339 
340     int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
341     ASSERT_EXPR_SAFE (rv == 0);
342 
343     //  START ALL SECONDARY THREADS
344 
345     const char *pub1 = "inproc://perf_pub1";
346     const char *pub2 = "inproc://perf_pub2";
347     const char *sub1 = "inproc://perf_backend";
348 
349     proxy_hwm_cfg_t cfg_global = {};
350     cfg_global.context = context;
351     cfg_global.frontend_endpoint[0] = pub1;
352     cfg_global.frontend_endpoint[1] = pub2;
353     cfg_global.backend_endpoint[0] = sub1;
354     cfg_global.control_endpoint = "inproc://ctrl";
355 
356     //  Proxy
357     proxy_hwm_cfg_t cfg_proxy = cfg_global;
358     void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
359     assert (proxy != 0);
360 
361     //  Subscriber 1
362     proxy_hwm_cfg_t cfg_sub1 = cfg_global;
363     cfg_sub1.thread_idx = 0;
364     void *subscriber =
365       zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
366     assert (subscriber != 0);
367 
368     //  Start measuring
369     void *watch = zmq_stopwatch_start ();
370 
371     //  Publisher 1
372     proxy_hwm_cfg_t cfg_pub1 = cfg_global;
373     cfg_pub1.thread_idx = 0;
374     void *publisher1 =
375       zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
376     assert (publisher1 != 0);
377 
378     //  Publisher 2
379     proxy_hwm_cfg_t cfg_pub2 = cfg_global;
380     cfg_pub2.thread_idx = 1;
381     void *publisher2 =
382       zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
383     assert (publisher2 != 0);
384 
385     //  Wait for all packets to be received
386     zmq_threadclose (subscriber);
387 
388     //  Stop measuring
389     unsigned long elapsed = zmq_stopwatch_stop (watch);
390     if (elapsed == 0)
391         elapsed = 1;
392 
393     unsigned long throughput =
394       (unsigned long) ((double) message_count / (double) elapsed * 1000000);
395     double megabits = (double) (throughput * message_size * 8) / 1000000;
396 
397     printf ("mean throughput: %d [msg/s]\n", (int) throughput);
398     printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
399 
400     //  Wait for the end of publishers...
401     zmq_threadclose (publisher1);
402     zmq_threadclose (publisher2);
403 
404     //  ... then close the proxy
405     terminate_proxy (&cfg_proxy);
406     zmq_threadclose (proxy);
407 
408     int rc = zmq_ctx_term (context);
409     ASSERT_EXPR_SAFE (rc == 0);
410 
411     return 0;
412 }
413