1 /*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "testutil.hpp"
31 #include "testutil_unity.hpp"
32
33 SETUP_TEARDOWN_TESTCONTEXT
34
test_basic()35 void test_basic ()
36 {
37 // Create a publisher
38 void *pub = test_context_socket (ZMQ_XPUB);
39 int manual = 1;
40 TEST_ASSERT_SUCCESS_ERRNO (
41 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
42 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
43
44 // Create a subscriber
45 void *sub = test_context_socket (ZMQ_XSUB);
46 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
47
48 // Subscribe for A
49 const char subscription[] = {1, 'A', 0};
50 send_string_expect_success (sub, subscription, 0);
51
52 // Receive subscriptions from subscriber
53 recv_string_expect_success (pub, subscription, 0);
54
55 // Subscribe socket for B instead
56 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1));
57
58 // Sending A message and B Message
59 send_string_expect_success (pub, "A", 0);
60 send_string_expect_success (pub, "B", 0);
61
62 recv_string_expect_success (sub, "B", ZMQ_DONTWAIT);
63
64 // Clean up.
65 test_context_socket_close (pub);
66 test_context_socket_close (sub);
67 }
68
test_unsubscribe_manual()69 void test_unsubscribe_manual ()
70 {
71 // Create a publisher
72 void *pub = test_context_socket (ZMQ_XPUB);
73 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
74
75 // set pub socket options
76 int manual = 1;
77 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE,
78 &manual, sizeof (manual)));
79
80 // Create a subscriber
81 void *sub = test_context_socket (ZMQ_XSUB);
82 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
83
84 // Subscribe for A
85 const uint8_t subscription1[] = {1, 'A'};
86 send_array_expect_success (sub, subscription1, 0);
87
88 // Subscribe for B
89 const uint8_t subscription2[] = {1, 'B'};
90 send_array_expect_success (sub, subscription2, 0);
91
92 char buffer[3];
93
94 // Receive subscription "A" from subscriber
95 recv_array_expect_success (pub, subscription1, 0);
96
97 // Subscribe socket for XA instead
98 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
99
100 // Receive subscription "B" from subscriber
101 recv_array_expect_success (pub, subscription2, 0);
102
103 // Subscribe socket for XB instead
104 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
105
106 // Unsubscribe from A
107 const uint8_t unsubscription1[2] = {0, 'A'};
108 send_array_expect_success (sub, unsubscription1, 0);
109
110 // Receive unsubscription "A" from subscriber
111 recv_array_expect_success (pub, unsubscription1, 0);
112
113 // Unsubscribe socket from XA instead
114 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
115
116 // Sending messages XA, XB
117 send_string_expect_success (pub, "XA", 0);
118 send_string_expect_success (pub, "XB", 0);
119
120 // Subscriber should receive XB only
121 recv_string_expect_success (sub, "XB", ZMQ_DONTWAIT);
122
123 // Close subscriber
124 test_context_socket_close (sub);
125
126 // Receive unsubscription "B"
127 const char unsubscription2[2] = {0, 'B'};
128 TEST_ASSERT_EQUAL_INT (
129 sizeof unsubscription2,
130 TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
131 TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
132 sizeof unsubscription2);
133
134 // Unsubscribe socket from XB instead
135 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2));
136
137 // Clean up.
138 test_context_socket_close (pub);
139 }
140
test_xpub_proxy_unsubscribe_on_disconnect()141 void test_xpub_proxy_unsubscribe_on_disconnect ()
142 {
143 const uint8_t topic_buff[] = {"1"};
144 const uint8_t payload_buff[] = {"X"};
145
146 char my_endpoint_backend[MAX_SOCKET_STRING];
147 char my_endpoint_frontend[MAX_SOCKET_STRING];
148
149 int manual = 1;
150
151 // proxy frontend
152 void *xsub_proxy = test_context_socket (ZMQ_XSUB);
153 bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
154 sizeof my_endpoint_frontend);
155
156 // proxy backend
157 void *xpub_proxy = test_context_socket (ZMQ_XPUB);
158 TEST_ASSERT_SUCCESS_ERRNO (
159 zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
160 bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
161 sizeof my_endpoint_backend);
162
163 // publisher
164 void *pub = test_context_socket (ZMQ_PUB);
165 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
166
167 // first subscriber subscribes
168 void *sub1 = test_context_socket (ZMQ_SUB);
169 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
170 TEST_ASSERT_SUCCESS_ERRNO (
171 zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
172
173 // wait
174 msleep (SETTLE_TIME);
175
176 // proxy reroutes and confirms subscriptions
177 const uint8_t subscription[2] = {1, *topic_buff};
178 recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
179 TEST_ASSERT_SUCCESS_ERRNO (
180 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
181 send_array_expect_success (xsub_proxy, subscription, 0);
182
183 // second subscriber subscribes
184 void *sub2 = test_context_socket (ZMQ_SUB);
185 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
186 TEST_ASSERT_SUCCESS_ERRNO (
187 zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
188
189 // wait
190 msleep (SETTLE_TIME);
191
192 // proxy reroutes
193 recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
194 TEST_ASSERT_SUCCESS_ERRNO (
195 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
196 send_array_expect_success (xsub_proxy, subscription, 0);
197
198 // wait
199 msleep (SETTLE_TIME);
200
201 // let publisher send a msg
202 send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
203 send_array_expect_success (pub, payload_buff, 0);
204
205 // wait
206 msleep (SETTLE_TIME);
207
208 // proxy reroutes data messages to subscribers
209 recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
210 recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
211
212 // send 2 messages
213 send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
214 send_array_expect_success (xpub_proxy, payload_buff, 0);
215 send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
216 send_array_expect_success (xpub_proxy, payload_buff, 0);
217
218 // wait
219 msleep (SETTLE_TIME);
220
221 // sub2 will get 2 messages because the last subscription is sub2.
222 recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
223 recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
224 recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
225 recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
226
227 recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
228 recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
229
230 // Disconnect both subscribers
231 test_context_socket_close (sub1);
232 test_context_socket_close (sub2);
233
234 // wait
235 msleep (SETTLE_TIME);
236
237 // unsubscribe messages are passed from proxy to publisher
238 const uint8_t unsubscription[] = {0, *topic_buff};
239 recv_array_expect_success (xpub_proxy, unsubscription, 0);
240 TEST_ASSERT_SUCCESS_ERRNO (
241 zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
242 send_array_expect_success (xsub_proxy, unsubscription, 0);
243
244 // should receive another unsubscribe msg
245 recv_array_expect_success (xpub_proxy, unsubscription, 0);
246 TEST_ASSERT_SUCCESS_ERRNO (
247 zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
248 send_array_expect_success (xsub_proxy, unsubscription, 0);
249
250 // wait
251 msleep (SETTLE_TIME);
252
253 // let publisher send a msg
254 send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
255 send_array_expect_success (pub, payload_buff, 0);
256
257 // wait
258 msleep (SETTLE_TIME);
259
260 // nothing should come to the proxy
261 char buffer[1];
262 TEST_ASSERT_FAILURE_ERRNO (
263 EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
264
265 test_context_socket_close (pub);
266 test_context_socket_close (xpub_proxy);
267 test_context_socket_close (xsub_proxy);
268 }
269
test_missing_subscriptions()270 void test_missing_subscriptions ()
271 {
272 const char *topic1 = "1";
273 const char *topic2 = "2";
274 const char *payload = "X";
275
276 char my_endpoint_backend[MAX_SOCKET_STRING];
277 char my_endpoint_frontend[MAX_SOCKET_STRING];
278
279 int manual = 1;
280
281 // proxy frontend
282 void *xsub_proxy = test_context_socket (ZMQ_XSUB);
283 bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
284 sizeof my_endpoint_frontend);
285
286 // proxy backend
287 void *xpub_proxy = test_context_socket (ZMQ_XPUB);
288 TEST_ASSERT_SUCCESS_ERRNO (
289 zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
290 bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
291 sizeof my_endpoint_backend);
292
293 // publisher
294 void *pub = test_context_socket (ZMQ_PUB);
295 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
296
297 // Here's the problem: because subscribers subscribe in quick succession,
298 // the proxy is unable to confirm the first subscription before receiving
299 // the second. This causes the first subscription to get lost.
300
301 // first subscriber
302 void *sub1 = test_context_socket (ZMQ_SUB);
303 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
304 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1));
305
306 // wait
307 msleep (SETTLE_TIME);
308
309 // proxy now reroutes and confirms subscriptions
310 const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
311 recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
312 TEST_ASSERT_SUCCESS_ERRNO (
313 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
314 send_array_expect_success (xsub_proxy, subscription1, 0);
315
316 // second subscriber
317 void *sub2 = test_context_socket (ZMQ_SUB);
318 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
319 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1));
320
321 // wait
322 msleep (SETTLE_TIME);
323
324 // proxy now reroutes and confirms subscriptions
325 const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
326 recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
327 TEST_ASSERT_SUCCESS_ERRNO (
328 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
329 send_array_expect_success (xsub_proxy, subscription2, 0);
330
331 // wait
332 msleep (SETTLE_TIME);
333
334 // let publisher send 2 msgs, each with its own topic_buff
335 send_string_expect_success (pub, topic1, ZMQ_SNDMORE);
336 send_string_expect_success (pub, payload, 0);
337 send_string_expect_success (pub, topic2, ZMQ_SNDMORE);
338 send_string_expect_success (pub, payload, 0);
339
340 // wait
341 msleep (SETTLE_TIME);
342
343 // proxy reroutes data messages to subscribers
344 recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
345 recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
346 send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
347 send_string_expect_success (xpub_proxy, payload, 0);
348
349 recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
350 recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
351 send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
352 send_string_expect_success (xpub_proxy, payload, 0);
353
354 // wait
355 msleep (SETTLE_TIME);
356
357 // only sub2 should now get a message
358 recv_string_expect_success (sub2, topic2, ZMQ_DONTWAIT);
359 recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
360
361 //recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
362 //recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
363
364 // Clean up
365 test_context_socket_close (sub1);
366 test_context_socket_close (sub2);
367 test_context_socket_close (pub);
368 test_context_socket_close (xpub_proxy);
369 test_context_socket_close (xsub_proxy);
370 }
371
test_unsubscribe_cleanup()372 void test_unsubscribe_cleanup ()
373 {
374 char my_endpoint[MAX_SOCKET_STRING];
375
376 // Create a publisher
377 void *pub = test_context_socket (ZMQ_XPUB);
378 int manual = 1;
379 TEST_ASSERT_SUCCESS_ERRNO (
380 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
381 bind_loopback_ipv4 (pub, my_endpoint, sizeof my_endpoint);
382
383 // Create a subscriber
384 void *sub = test_context_socket (ZMQ_XSUB);
385 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
386
387 // Subscribe for A
388 const uint8_t subscription1[2] = {1, 'A'};
389 send_array_expect_success (sub, subscription1, 0);
390
391
392 // Receive subscriptions from subscriber
393 recv_array_expect_success (pub, subscription1, 0);
394 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
395
396 // send 2 messages
397 send_string_expect_success (pub, "XA", 0);
398 send_string_expect_success (pub, "XB", 0);
399
400 // receive the single message
401 recv_string_expect_success (sub, "XA", 0);
402
403 // should be nothing left in the queue
404 char buffer[2];
405 TEST_ASSERT_FAILURE_ERRNO (
406 EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
407
408 // close the socket
409 test_context_socket_close (sub);
410
411 // closing the socket will result in an unsubscribe event
412 const uint8_t unsubscription[2] = {0, 'A'};
413 recv_array_expect_success (pub, unsubscription, 0);
414
415 // this doesn't really do anything
416 // there is no last_pipe set it will just fail silently
417 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
418
419 // reconnect
420 sub = test_context_socket (ZMQ_XSUB);
421 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
422
423 // send a subscription for B
424 const uint8_t subscription2[2] = {1, 'B'};
425 send_array_expect_success (sub, subscription2, 0);
426
427 // receive the subscription, overwrite it to XB
428 recv_array_expect_success (pub, subscription2, 0);
429 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
430
431 // send 2 messages
432 send_string_expect_success (pub, "XA", 0);
433 send_string_expect_success (pub, "XB", 0);
434
435 // receive the single message
436 recv_string_expect_success (sub, "XB", 0);
437
438 // should be nothing left in the queue
439 TEST_ASSERT_FAILURE_ERRNO (
440 EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
441
442 // Clean up.
443 test_context_socket_close (pub);
444 test_context_socket_close (sub);
445 }
446
test_manual_last_value()447 void test_manual_last_value ()
448 {
449 // Create a publisher
450 void *pub = test_context_socket (ZMQ_XPUB);
451
452 int hwm = 2000;
453 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SNDHWM, &hwm, 4));
454
455 // set pub socket options
456 int manual = 1;
457 TEST_ASSERT_SUCCESS_ERRNO (
458 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
459
460 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
461
462 // Create a subscriber
463 void *sub = test_context_socket (ZMQ_SUB);
464 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
465
466 // Create another subscriber
467 void *sub2 = test_context_socket (ZMQ_SUB);
468 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, "inproc://soname"));
469
470 // Subscribe for "A".
471 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1));
472
473 const uint8_t subscription[2] = {1, 'A'};
474 // we must wait for the subscription to be processed here, otherwise some
475 // or all published messages might be lost
476 recv_array_expect_success (pub, subscription, 0);
477
478 // manual subscribe message
479 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "A", 1));
480 send_string_expect_success (pub, "A", 0);
481 recv_string_expect_success (sub, "A", 0);
482
483 // Subscribe for "A".
484 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, "A", 1));
485 recv_array_expect_success (pub, subscription, 0);
486 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "A", 1));
487 send_string_expect_success (pub, "A", 0);
488 recv_string_expect_success (sub2, "A", 0);
489
490 char buffer[255];
491 // sub won't get a message because the last subscription pipe is sub2.
492 TEST_ASSERT_FAILURE_ERRNO (
493 EAGAIN, zmq_recv (sub, buffer, sizeof (buffer), ZMQ_DONTWAIT));
494
495 // Clean up.
496 test_context_socket_close (pub);
497 test_context_socket_close (sub);
498 test_context_socket_close (sub2);
499 }
500
main()501 int main ()
502 {
503 setup_test_environment ();
504
505 UNITY_BEGIN ();
506 RUN_TEST (test_basic);
507 RUN_TEST (test_unsubscribe_manual);
508 RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
509 RUN_TEST (test_missing_subscriptions);
510 RUN_TEST (test_unsubscribe_cleanup);
511 RUN_TEST (test_manual_last_value);
512
513 return UNITY_END ();
514 }
515