1 /*
2     Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 #include "testutil_unity.hpp"
32 
33 #include <stdlib.h>
34 #include <string.h>
35 
36 SETUP_TEARDOWN_TESTCONTEXT
37 
38 #define CONTENT_SIZE 13
39 #define CONTENT_SIZE_MAX 32
40 #define ROUTING_ID_SIZE 10
41 #define ROUTING_ID_SIZE_MAX 32
42 #define QT_WORKERS 5
43 #define QT_CLIENTS 3
44 #define is_verbose 0
45 
46 struct thread_data
47 {
48     int id;
49 };
50 
51 typedef struct
52 {
53     uint64_t msg_in;
54     uint64_t bytes_in;
55     uint64_t msg_out;
56     uint64_t bytes_out;
57 } zmq_socket_stats_t;
58 
59 typedef struct
60 {
61     zmq_socket_stats_t frontend;
62     zmq_socket_stats_t backend;
63 } zmq_proxy_stats_t;
64 
65 void *g_clients_pkts_out = NULL;
66 void *g_workers_pkts_out = NULL;
67 
68 // Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
69 //
70 // While this example runs in a single process, that is to make
71 // it easier to start and stop the example. Each task may have its own
72 // context and conceptually acts as a separate process. To have this
73 // behaviour, it is necessary to replace the inproc transport of the
74 // control socket by a tcp transport.
75 
76 // This is our client task
77 // It connects to the server, and then sends a request once per second
78 // It collects responses as they arrive, and it prints them out. We will
79 // run several client tasks in parallel, each with a different random ID.
80 
client_task(void * db_)81 static void client_task (void *db_)
82 {
83     const thread_data *const databag = static_cast<const thread_data *> (db_);
84     // Endpoint socket gets random port to avoid test failing when port in use
85     void *endpoint = zmq_socket (get_test_context (), ZMQ_PAIR);
86     TEST_ASSERT_NOT_NULL (endpoint);
87     int linger = 0;
88     TEST_ASSERT_SUCCESS_ERRNO (
89       zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger)));
90     char endpoint_source[256];
91     sprintf (endpoint_source, "inproc://endpoint%d", databag->id);
92     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (endpoint, endpoint_source));
93     char *my_endpoint = s_recv (endpoint);
94     TEST_ASSERT_NOT_NULL (my_endpoint);
95 
96     void *client = zmq_socket (get_test_context (), ZMQ_DEALER);
97     TEST_ASSERT_NOT_NULL (client);
98 
99     // Control socket receives terminate command from main over inproc
100     void *control = zmq_socket (get_test_context (), ZMQ_SUB);
101     TEST_ASSERT_NOT_NULL (control);
102     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
103     TEST_ASSERT_SUCCESS_ERRNO (
104       zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
105     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
106 
107     char content[CONTENT_SIZE_MAX] = {};
108     // Set random routing id to make tracing easier
109     char routing_id[ROUTING_ID_SIZE] = {};
110     sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF);
111     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
112       client, ZMQ_ROUTING_ID, routing_id,
113       ROUTING_ID_SIZE)); // includes '\0' as an helper for printf
114     linger = 0;
115     TEST_ASSERT_SUCCESS_ERRNO (
116       zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger)));
117     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
118 
119     zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
120                               {control, 0, ZMQ_POLLIN, 0}};
121     int request_nbr = 0;
122     bool run = true;
123     bool keep_sending = true;
124     while (run) {
125         // Tick once per 200 ms, pulling in arriving messages
126         int centitick;
127         for (centitick = 0; centitick < 20; centitick++) {
128             zmq_poll (items, 2, 10);
129             if (items[0].revents & ZMQ_POLLIN) {
130                 int rcvmore;
131                 size_t sz = sizeof (rcvmore);
132                 int rc = TEST_ASSERT_SUCCESS_ERRNO (
133                   zmq_recv (client, content, CONTENT_SIZE_MAX, 0));
134                 TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc);
135                 if (is_verbose)
136                     printf (
137                       "client receive - routing_id = %s    content = %s\n",
138                       routing_id, content);
139                 //  Check that message is still the same
140                 TEST_ASSERT_EQUAL_STRING_LEN ("request #", content, 9);
141                 TEST_ASSERT_SUCCESS_ERRNO (
142                   zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz));
143                 TEST_ASSERT_FALSE (rcvmore);
144             }
145             if (items[1].revents & ZMQ_POLLIN) {
146                 int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
147 
148                 if (rc > 0) {
149                     content[rc] = 0; // NULL-terminate the command string
150                     if (is_verbose)
151                         printf (
152                           "client receive - routing_id = %s    command = %s\n",
153                           routing_id, content);
154                     if (memcmp (content, "TERMINATE", 9) == 0) {
155                         run = false;
156                         break;
157                     }
158                     if (memcmp (content, "STOP", 4) == 0) {
159                         keep_sending = false;
160                         break;
161                     }
162                 }
163             }
164         }
165 
166         if (keep_sending) {
167             sprintf (content, "request #%03d", ++request_nbr); // CONTENT_SIZE
168             if (is_verbose)
169                 printf ("client send - routing_id = %s    request #%03d\n",
170                         routing_id, request_nbr);
171             zmq_atomic_counter_inc (g_clients_pkts_out);
172 
173             TEST_ASSERT_EQUAL_INT (CONTENT_SIZE,
174                                    zmq_send (client, content, CONTENT_SIZE, 0));
175         }
176     }
177 
178     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (client));
179     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
180     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint));
181     free (my_endpoint);
182 }
183 
184 // This is our server task.
185 // It uses the multithreaded server model to deal requests out to a pool
186 // of workers and route replies back to clients. One worker can handle
187 // one request at a time but one client can talk to multiple workers at
188 // once.
189 
190 static void server_worker (void * /*unused_*/);
191 
server_task(void *)192 void server_task (void * /*unused_*/)
193 {
194     // Frontend socket talks to clients over TCP
195     char my_endpoint[MAX_SOCKET_STRING];
196     void *frontend = zmq_socket (get_test_context (), ZMQ_ROUTER);
197     TEST_ASSERT_NOT_NULL (frontend);
198     int linger = 0;
199     TEST_ASSERT_SUCCESS_ERRNO (
200       zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger)));
201     bind_loopback_ipv4 (frontend, my_endpoint, sizeof my_endpoint);
202 
203     // Backend socket talks to workers over inproc
204     void *backend = zmq_socket (get_test_context (), ZMQ_DEALER);
205     TEST_ASSERT_NOT_NULL (backend);
206     TEST_ASSERT_SUCCESS_ERRNO (
207       zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger)));
208     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend"));
209 
210     // Control socket receives terminate command from main over inproc
211     void *control = zmq_socket (get_test_context (), ZMQ_REP);
212     TEST_ASSERT_NOT_NULL (control);
213     TEST_ASSERT_SUCCESS_ERRNO (
214       zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
215     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control_proxy"));
216 
217     // Launch pool of worker threads, precise number is not critical
218     int thread_nbr;
219     void *threads[5];
220     for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
221         threads[thread_nbr] = zmq_threadstart (&server_worker, NULL);
222 
223     // Endpoint socket sends random port to avoid test failing when port in use
224     void *endpoint_receivers[QT_CLIENTS];
225     char endpoint_source[256];
226     for (int i = 0; i < QT_CLIENTS; ++i) {
227         endpoint_receivers[i] = zmq_socket (get_test_context (), ZMQ_PAIR);
228         TEST_ASSERT_NOT_NULL (endpoint_receivers[i]);
229         TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
230           endpoint_receivers[i], ZMQ_LINGER, &linger, sizeof (linger)));
231         sprintf (endpoint_source, "inproc://endpoint%d", i);
232         TEST_ASSERT_SUCCESS_ERRNO (
233           zmq_bind (endpoint_receivers[i], endpoint_source));
234     }
235 
236     for (int i = 0; i < QT_CLIENTS; ++i) {
237         send_string_expect_success (endpoint_receivers[i], my_endpoint, 0);
238     }
239 
240     // Connect backend to frontend via a proxy
241     TEST_ASSERT_SUCCESS_ERRNO (
242       zmq_proxy_steerable (frontend, backend, NULL, control));
243 
244     for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
245         zmq_threadclose (threads[thread_nbr]);
246 
247     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend));
248     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend));
249     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
250     for (int i = 0; i < QT_CLIENTS; ++i) {
251         TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i]));
252     }
253 }
254 
255 // Each worker task works on one request at a time and sends a random number
256 // of replies back, with random delays between replies:
257 // The comments in the first column, if suppressed, makes it a poller version
258 
server_worker(void *)259 static void server_worker (void * /*unused_*/)
260 {
261     void *worker = zmq_socket (get_test_context (), ZMQ_DEALER);
262     TEST_ASSERT_NOT_NULL (worker);
263     int linger = 0;
264     TEST_ASSERT_SUCCESS_ERRNO (
265       zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger)));
266     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend"));
267 
268     // Control socket receives terminate command from main over inproc
269     void *control = zmq_socket (get_test_context (), ZMQ_SUB);
270     TEST_ASSERT_NOT_NULL (control);
271     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
272     TEST_ASSERT_SUCCESS_ERRNO (
273       zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
274     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
275 
276     char content[CONTENT_SIZE_MAX] =
277       {}; // bigger than what we need to check that
278     char routing_id[ROUTING_ID_SIZE_MAX] =
279       {}; // the size received is the size sent
280 
281     bool run = true;
282     bool keep_sending = true;
283     while (run) {
284         int rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
285                            ZMQ_DONTWAIT); // usually, rc == -1 (no message)
286         if (rc > 0) {
287             content[rc] = 0; // NULL-terminate the command string
288             if (is_verbose)
289                 printf ("server_worker receives command = %s\n", content);
290             if (memcmp (content, "TERMINATE", 9) == 0)
291                 run = false;
292             if (memcmp (content, "STOP", 4) == 0)
293                 keep_sending = false;
294         }
295         // The DEALER socket gives us the reply envelope and message
296         // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
297         rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
298         if (rc == ROUTING_ID_SIZE) {
299             rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
300             TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc);
301             if (is_verbose)
302                 printf ("server receive - routing_id = %s    content = %s\n",
303                         routing_id, content);
304 
305             // Send 0..4 replies back
306             if (keep_sending) {
307                 int reply, replies = rand () % 5;
308                 for (reply = 0; reply < replies; reply++) {
309                     // Sleep for some fraction of a second
310                     msleep (rand () % 10 + 1);
311 
312                     //  Send message from server to client
313                     if (is_verbose)
314                         printf ("server send - routing_id = %s    reply\n",
315                                 routing_id);
316                     zmq_atomic_counter_inc (g_workers_pkts_out);
317 
318                     rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
319                                    ZMQ_SNDMORE);
320                     TEST_ASSERT_EQUAL_INT (ROUTING_ID_SIZE, rc);
321                     rc = zmq_send (worker, content, CONTENT_SIZE, 0);
322                     TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc);
323                 }
324             }
325         }
326     }
327     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (worker));
328     TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
329 }
330 
recv_stat(void * sock_,bool last_)331 uint64_t recv_stat (void *sock_, bool last_)
332 {
333     uint64_t res;
334     zmq_msg_t stats_msg;
335 
336     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
337     TEST_ASSERT_EQUAL_INT (sizeof (uint64_t),
338                            zmq_recvmsg (sock_, &stats_msg, 0));
339     memcpy (&res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
340     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
341 
342     int more;
343     size_t moresz = sizeof more;
344     TEST_ASSERT_SUCCESS_ERRNO (
345       zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz));
346     TEST_ASSERT_TRUE ((last_ && !more) || (!last_ && more));
347 
348     return res;
349 }
350 
351 // Utility function to interrogate the proxy:
352 
check_proxy_stats(void * control_proxy_)353 void check_proxy_stats (void *control_proxy_)
354 {
355     zmq_proxy_stats_t total_stats;
356 
357     send_string_expect_success (control_proxy_, "STATISTICS", 0);
358 
359     // first frame of the reply contains FRONTEND stats:
360     total_stats.frontend.msg_in = recv_stat (control_proxy_, false);
361     total_stats.frontend.bytes_in = recv_stat (control_proxy_, false);
362     total_stats.frontend.msg_out = recv_stat (control_proxy_, false);
363     total_stats.frontend.bytes_out = recv_stat (control_proxy_, false);
364 
365     // second frame of the reply contains BACKEND stats:
366     total_stats.backend.msg_in = recv_stat (control_proxy_, false);
367     total_stats.backend.bytes_in = recv_stat (control_proxy_, false);
368     total_stats.backend.msg_out = recv_stat (control_proxy_, false);
369     total_stats.backend.bytes_out = recv_stat (control_proxy_, true);
370 
371     // check stats
372 
373     if (is_verbose) {
374         printf (
375           "frontend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
376           static_cast<unsigned long int> (total_stats.frontend.msg_in),
377           static_cast<unsigned long int> (total_stats.frontend.bytes_in),
378           static_cast<unsigned long int> (total_stats.frontend.msg_out),
379           static_cast<unsigned long int> (total_stats.frontend.bytes_out));
380         printf (
381           "backend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
382           static_cast<unsigned long int> (total_stats.backend.msg_in),
383           static_cast<unsigned long int> (total_stats.backend.bytes_in),
384           static_cast<unsigned long int> (total_stats.backend.msg_out),
385           static_cast<unsigned long int> (total_stats.backend.bytes_out));
386 
387         printf ("clients sent out %d requests\n",
388                 zmq_atomic_counter_value (g_clients_pkts_out));
389         printf ("workers sent out %d replies\n",
390                 zmq_atomic_counter_value (g_workers_pkts_out));
391     }
392     TEST_ASSERT_EQUAL_UINT (
393       (unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
394       total_stats.frontend.msg_in);
395     TEST_ASSERT_EQUAL_UINT (
396       (unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
397       total_stats.frontend.msg_out);
398     TEST_ASSERT_EQUAL_UINT (
399       (unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
400       total_stats.backend.msg_in);
401     TEST_ASSERT_EQUAL_UINT (
402       (unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
403       total_stats.backend.msg_out);
404 }
405 
406 
407 // The main thread simply starts several clients and a server, and then
408 // waits for the server to finish.
409 
test_proxy()410 void test_proxy ()
411 {
412     g_clients_pkts_out = zmq_atomic_counter_new ();
413     g_workers_pkts_out = zmq_atomic_counter_new ();
414 
415     // Control socket receives terminate command from main over inproc
416     void *control = test_context_socket (ZMQ_PUB);
417     int linger = 0;
418     TEST_ASSERT_SUCCESS_ERRNO (
419       zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
420     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control"));
421 
422     // Control socket receives terminate command from main over inproc
423     void *control_proxy = test_context_socket (ZMQ_REQ);
424     TEST_ASSERT_SUCCESS_ERRNO (
425       zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger)));
426     TEST_ASSERT_SUCCESS_ERRNO (
427       zmq_bind (control_proxy, "inproc://control_proxy"));
428 
429     void *threads[QT_CLIENTS + 1];
430     struct thread_data databags[QT_CLIENTS + 1];
431     for (int i = 0; i < QT_CLIENTS; i++) {
432         databags[i].id = i;
433         threads[i] = zmq_threadstart (&client_task, &databags[i]);
434     }
435     threads[QT_CLIENTS] = zmq_threadstart (&server_task, NULL);
436     msleep (500); // Run for 500 ms then quit
437 
438     if (is_verbose)
439         printf ("stopping all clients and server workers\n");
440     send_string_expect_success (control, "STOP", 0);
441 
442     msleep (500); // Wait for all clients and workers to STOP
443 
444     if (is_verbose)
445         printf ("retrieving stats from the proxy\n");
446     check_proxy_stats (control_proxy);
447 
448     if (is_verbose)
449         printf ("shutting down all clients and server workers\n");
450     send_string_expect_success (control, "TERMINATE", 0);
451 
452     if (is_verbose)
453         printf ("shutting down the proxy\n");
454     send_string_expect_success (control_proxy, "TERMINATE", 0);
455 
456     test_context_socket_close (control);
457     test_context_socket_close (control_proxy);
458 
459     for (int i = 0; i < QT_CLIENTS + 1; i++)
460         zmq_threadclose (threads[i]);
461 }
462 
main(void)463 int main (void)
464 {
465     setup_test_environment ();
466 
467     UNITY_BEGIN ();
468     RUN_TEST (test_proxy);
469     return UNITY_END ();
470 }
471