1 /*
2     Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 #include "testutil_unity.hpp"
32 
33 SETUP_TEARDOWN_TESTCONTEXT
34 
test_basic()35 void test_basic ()
36 {
37     //  Create a publisher
38     void *pub = test_context_socket (ZMQ_XPUB);
39     int manual = 1;
40     TEST_ASSERT_SUCCESS_ERRNO (
41       zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
42     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
43 
44     //  Create a subscriber
45     void *sub = test_context_socket (ZMQ_XSUB);
46     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
47 
48     //  Subscribe for A
49     const char subscription[] = {1, 'A', 0};
50     send_string_expect_success (sub, subscription, 0);
51 
52     // Receive subscriptions from subscriber
53     recv_string_expect_success (pub, subscription, 0);
54 
55     // Subscribe socket for B instead
56     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1));
57 
58     // Sending A message and B Message
59     send_string_expect_success (pub, "A", 0);
60     send_string_expect_success (pub, "B", 0);
61 
62     recv_string_expect_success (sub, "B", ZMQ_DONTWAIT);
63 
64     //  Clean up.
65     test_context_socket_close (pub);
66     test_context_socket_close (sub);
67 }
68 
test_unsubscribe_manual()69 void test_unsubscribe_manual ()
70 {
71     //  Create a publisher
72     void *pub = test_context_socket (ZMQ_XPUB);
73     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
74 
75     //  set pub socket options
76     int manual = 1;
77     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE,
78                                                &manual, sizeof (manual)));
79 
80     //  Create a subscriber
81     void *sub = test_context_socket (ZMQ_XSUB);
82     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
83 
84     //  Subscribe for A
85     const uint8_t subscription1[] = {1, 'A'};
86     send_array_expect_success (sub, subscription1, 0);
87 
88     //  Subscribe for B
89     const uint8_t subscription2[] = {1, 'B'};
90     send_array_expect_success (sub, subscription2, 0);
91 
92     char buffer[3];
93 
94     // Receive subscription "A" from subscriber
95     recv_array_expect_success (pub, subscription1, 0);
96 
97     // Subscribe socket for XA instead
98     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
99 
100     // Receive subscription "B" from subscriber
101     recv_array_expect_success (pub, subscription2, 0);
102 
103     // Subscribe socket for XB instead
104     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
105 
106     //  Unsubscribe from A
107     const uint8_t unsubscription1[2] = {0, 'A'};
108     send_array_expect_success (sub, unsubscription1, 0);
109 
110     // Receive unsubscription "A" from subscriber
111     recv_array_expect_success (pub, unsubscription1, 0);
112 
113     // Unsubscribe socket from XA instead
114     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
115 
116     // Sending messages XA, XB
117     send_string_expect_success (pub, "XA", 0);
118     send_string_expect_success (pub, "XB", 0);
119 
120     // Subscriber should receive XB only
121     recv_string_expect_success (sub, "XB", ZMQ_DONTWAIT);
122 
123     // Close subscriber
124     test_context_socket_close (sub);
125 
126     // Receive unsubscription "B"
127     const char unsubscription2[2] = {0, 'B'};
128     TEST_ASSERT_EQUAL_INT (
129       sizeof unsubscription2,
130       TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
131     TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
132                                   sizeof unsubscription2);
133 
134     // Unsubscribe socket from XB instead
135     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2));
136 
137     //  Clean up.
138     test_context_socket_close (pub);
139 }
140 
test_xpub_proxy_unsubscribe_on_disconnect()141 void test_xpub_proxy_unsubscribe_on_disconnect ()
142 {
143     const uint8_t topic_buff[] = {"1"};
144     const uint8_t payload_buff[] = {"X"};
145 
146     char my_endpoint_backend[MAX_SOCKET_STRING];
147     char my_endpoint_frontend[MAX_SOCKET_STRING];
148 
149     int manual = 1;
150 
151     // proxy frontend
152     void *xsub_proxy = test_context_socket (ZMQ_XSUB);
153     bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
154                         sizeof my_endpoint_frontend);
155 
156     // proxy backend
157     void *xpub_proxy = test_context_socket (ZMQ_XPUB);
158     TEST_ASSERT_SUCCESS_ERRNO (
159       zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
160     bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
161                         sizeof my_endpoint_backend);
162 
163     // publisher
164     void *pub = test_context_socket (ZMQ_PUB);
165     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
166 
167     // first subscriber subscribes
168     void *sub1 = test_context_socket (ZMQ_SUB);
169     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
170     TEST_ASSERT_SUCCESS_ERRNO (
171       zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
172 
173     // wait
174     msleep (SETTLE_TIME);
175 
176     // proxy reroutes and confirms subscriptions
177     const uint8_t subscription[2] = {1, *topic_buff};
178     recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
179     TEST_ASSERT_SUCCESS_ERRNO (
180       zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
181     send_array_expect_success (xsub_proxy, subscription, 0);
182 
183     // second subscriber subscribes
184     void *sub2 = test_context_socket (ZMQ_SUB);
185     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
186     TEST_ASSERT_SUCCESS_ERRNO (
187       zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
188 
189     // wait
190     msleep (SETTLE_TIME);
191 
192     // proxy reroutes
193     recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
194     TEST_ASSERT_SUCCESS_ERRNO (
195       zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
196     send_array_expect_success (xsub_proxy, subscription, 0);
197 
198     // wait
199     msleep (SETTLE_TIME);
200 
201     // let publisher send a msg
202     send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
203     send_array_expect_success (pub, payload_buff, 0);
204 
205     // wait
206     msleep (SETTLE_TIME);
207 
208     // proxy reroutes data messages to subscribers
209     recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
210     recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
211 
212     // send 2 messages
213     send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
214     send_array_expect_success (xpub_proxy, payload_buff, 0);
215     send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
216     send_array_expect_success (xpub_proxy, payload_buff, 0);
217 
218     // wait
219     msleep (SETTLE_TIME);
220 
221     // sub2 will get 2 messages because the last subscription is sub2.
222     recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
223     recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
224     recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
225     recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
226 
227     recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
228     recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
229 
230     //  Disconnect both subscribers
231     test_context_socket_close (sub1);
232     test_context_socket_close (sub2);
233 
234     // wait
235     msleep (SETTLE_TIME);
236 
237     // unsubscribe messages are passed from proxy to publisher
238     const uint8_t unsubscription[] = {0, *topic_buff};
239     recv_array_expect_success (xpub_proxy, unsubscription, 0);
240     TEST_ASSERT_SUCCESS_ERRNO (
241       zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
242     send_array_expect_success (xsub_proxy, unsubscription, 0);
243 
244     // should receive another unsubscribe msg
245     recv_array_expect_success (xpub_proxy, unsubscription, 0);
246     TEST_ASSERT_SUCCESS_ERRNO (
247       zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
248     send_array_expect_success (xsub_proxy, unsubscription, 0);
249 
250     // wait
251     msleep (SETTLE_TIME);
252 
253     // let publisher send a msg
254     send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
255     send_array_expect_success (pub, payload_buff, 0);
256 
257     // wait
258     msleep (SETTLE_TIME);
259 
260     // nothing should come to the proxy
261     char buffer[1];
262     TEST_ASSERT_FAILURE_ERRNO (
263       EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
264 
265     test_context_socket_close (pub);
266     test_context_socket_close (xpub_proxy);
267     test_context_socket_close (xsub_proxy);
268 }
269 
test_missing_subscriptions()270 void test_missing_subscriptions ()
271 {
272     const char *topic1 = "1";
273     const char *topic2 = "2";
274     const char *payload = "X";
275 
276     char my_endpoint_backend[MAX_SOCKET_STRING];
277     char my_endpoint_frontend[MAX_SOCKET_STRING];
278 
279     int manual = 1;
280 
281     // proxy frontend
282     void *xsub_proxy = test_context_socket (ZMQ_XSUB);
283     bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
284                         sizeof my_endpoint_frontend);
285 
286     // proxy backend
287     void *xpub_proxy = test_context_socket (ZMQ_XPUB);
288     TEST_ASSERT_SUCCESS_ERRNO (
289       zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
290     bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
291                         sizeof my_endpoint_backend);
292 
293     // publisher
294     void *pub = test_context_socket (ZMQ_PUB);
295     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
296 
297     // Here's the problem: because subscribers subscribe in quick succession,
298     // the proxy is unable to confirm the first subscription before receiving
299     // the second. This causes the first subscription to get lost.
300 
301     // first subscriber
302     void *sub1 = test_context_socket (ZMQ_SUB);
303     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
304     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1));
305 
306     // wait
307     msleep (SETTLE_TIME);
308 
309     // proxy now reroutes and confirms subscriptions
310     const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
311     recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
312     TEST_ASSERT_SUCCESS_ERRNO (
313       zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
314     send_array_expect_success (xsub_proxy, subscription1, 0);
315 
316     // second subscriber
317     void *sub2 = test_context_socket (ZMQ_SUB);
318     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
319     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1));
320 
321     // wait
322     msleep (SETTLE_TIME);
323 
324     // proxy now reroutes and confirms subscriptions
325     const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
326     recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
327     TEST_ASSERT_SUCCESS_ERRNO (
328       zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
329     send_array_expect_success (xsub_proxy, subscription2, 0);
330 
331     // wait
332     msleep (SETTLE_TIME);
333 
334     // let publisher send 2 msgs, each with its own topic_buff
335     send_string_expect_success (pub, topic1, ZMQ_SNDMORE);
336     send_string_expect_success (pub, payload, 0);
337     send_string_expect_success (pub, topic2, ZMQ_SNDMORE);
338     send_string_expect_success (pub, payload, 0);
339 
340     // wait
341     msleep (SETTLE_TIME);
342 
343     // proxy reroutes data messages to subscribers
344     recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
345     recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
346     send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
347     send_string_expect_success (xpub_proxy, payload, 0);
348 
349     recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
350     recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
351     send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
352     send_string_expect_success (xpub_proxy, payload, 0);
353 
354     // wait
355     msleep (SETTLE_TIME);
356 
357     // only sub2 should now get a message
358     recv_string_expect_success (sub2, topic2, ZMQ_DONTWAIT);
359     recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
360 
361     //recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
362     //recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
363 
364     //  Clean up
365     test_context_socket_close (sub1);
366     test_context_socket_close (sub2);
367     test_context_socket_close (pub);
368     test_context_socket_close (xpub_proxy);
369     test_context_socket_close (xsub_proxy);
370 }
371 
test_unsubscribe_cleanup()372 void test_unsubscribe_cleanup ()
373 {
374     char my_endpoint[MAX_SOCKET_STRING];
375 
376     //  Create a publisher
377     void *pub = test_context_socket (ZMQ_XPUB);
378     int manual = 1;
379     TEST_ASSERT_SUCCESS_ERRNO (
380       zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
381     bind_loopback_ipv4 (pub, my_endpoint, sizeof my_endpoint);
382 
383     //  Create a subscriber
384     void *sub = test_context_socket (ZMQ_XSUB);
385     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
386 
387     //  Subscribe for A
388     const uint8_t subscription1[2] = {1, 'A'};
389     send_array_expect_success (sub, subscription1, 0);
390 
391 
392     // Receive subscriptions from subscriber
393     recv_array_expect_success (pub, subscription1, 0);
394     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
395 
396     // send 2 messages
397     send_string_expect_success (pub, "XA", 0);
398     send_string_expect_success (pub, "XB", 0);
399 
400     // receive the single message
401     recv_string_expect_success (sub, "XA", 0);
402 
403     // should be nothing left in the queue
404     char buffer[2];
405     TEST_ASSERT_FAILURE_ERRNO (
406       EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
407 
408     // close the socket
409     test_context_socket_close (sub);
410 
411     // closing the socket will result in an unsubscribe event
412     const uint8_t unsubscription[2] = {0, 'A'};
413     recv_array_expect_success (pub, unsubscription, 0);
414 
415     // this doesn't really do anything
416     // there is no last_pipe set it will just fail silently
417     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
418 
419     // reconnect
420     sub = test_context_socket (ZMQ_XSUB);
421     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
422 
423     // send a subscription for B
424     const uint8_t subscription2[2] = {1, 'B'};
425     send_array_expect_success (sub, subscription2, 0);
426 
427     // receive the subscription, overwrite it to XB
428     recv_array_expect_success (pub, subscription2, 0);
429     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
430 
431     // send 2 messages
432     send_string_expect_success (pub, "XA", 0);
433     send_string_expect_success (pub, "XB", 0);
434 
435     // receive the single message
436     recv_string_expect_success (sub, "XB", 0);
437 
438     // should be nothing left in the queue
439     TEST_ASSERT_FAILURE_ERRNO (
440       EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
441 
442     //  Clean up.
443     test_context_socket_close (pub);
444     test_context_socket_close (sub);
445 }
446 
test_manual_last_value()447 void test_manual_last_value ()
448 {
449     //  Create a publisher
450     void *pub = test_context_socket (ZMQ_XPUB);
451 
452     int hwm = 2000;
453     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SNDHWM, &hwm, 4));
454 
455     //  set pub socket options
456     int manual = 1;
457     TEST_ASSERT_SUCCESS_ERRNO (
458       zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
459 
460     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
461 
462     //  Create a subscriber
463     void *sub = test_context_socket (ZMQ_SUB);
464     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
465 
466     //  Create another subscriber
467     void *sub2 = test_context_socket (ZMQ_SUB);
468     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, "inproc://soname"));
469 
470     //  Subscribe for "A".
471     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1));
472 
473     const uint8_t subscription[2] = {1, 'A'};
474     //  we must wait for the subscription to be processed here, otherwise some
475     //  or all published messages might be lost
476     recv_array_expect_success (pub, subscription, 0);
477 
478     //  manual subscribe message
479     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "A", 1));
480     send_string_expect_success (pub, "A", 0);
481     recv_string_expect_success (sub, "A", 0);
482 
483     //  Subscribe for "A".
484     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, "A", 1));
485     recv_array_expect_success (pub, subscription, 0);
486     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "A", 1));
487     send_string_expect_success (pub, "A", 0);
488     recv_string_expect_success (sub2, "A", 0);
489 
490     char buffer[255];
491     //  sub won't get a message because the last subscription pipe is sub2.
492     TEST_ASSERT_FAILURE_ERRNO (
493       EAGAIN, zmq_recv (sub, buffer, sizeof (buffer), ZMQ_DONTWAIT));
494 
495     //  Clean up.
496     test_context_socket_close (pub);
497     test_context_socket_close (sub);
498     test_context_socket_close (sub2);
499 }
500 
main()501 int main ()
502 {
503     setup_test_environment ();
504 
505     UNITY_BEGIN ();
506     RUN_TEST (test_basic);
507     RUN_TEST (test_unsubscribe_manual);
508     RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
509     RUN_TEST (test_missing_subscriptions);
510     RUN_TEST (test_unsubscribe_cleanup);
511     RUN_TEST (test_manual_last_value);
512 
513     return UNITY_END ();
514 }
515