1 /*
2 Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29 #include "testutil_monitoring.hpp"
30 #include "testutil_unity.hpp"
31
32 #include <stdlib.h>
33 #include <string.h>
34
35 static int
receive_monitor_address(void * monitor_,char ** address_,bool expect_more_)36 receive_monitor_address (void *monitor_, char **address_, bool expect_more_)
37 {
38 zmq_msg_t msg;
39
40 zmq_msg_init (&msg);
41 if (zmq_msg_recv (&msg, monitor_, 0) == -1)
42 return -1; // Interrupted, presumably
43 TEST_ASSERT_EQUAL (expect_more_, zmq_msg_more (&msg));
44
45 if (address_) {
46 const uint8_t *const data =
47 static_cast<const uint8_t *> (zmq_msg_data (&msg));
48 const size_t size = zmq_msg_size (&msg);
49 *address_ = static_cast<char *> (malloc (size + 1));
50 memcpy (*address_, data, size);
51 (*address_)[size] = 0;
52 }
53 zmq_msg_close (&msg);
54
55 return 0;
56 }
57
58 // Read one event off the monitor socket; return value and address
59 // by reference, if not null, and event number by value. Returns -1
60 // in case of error.
get_monitor_event_internal(void * monitor_,int * value_,char ** address_,int recv_flag_)61 static int get_monitor_event_internal (void *monitor_,
62 int *value_,
63 char **address_,
64 int recv_flag_)
65 {
66 // First frame in message contains event number and value
67 zmq_msg_t msg;
68 zmq_msg_init (&msg);
69 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
70 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
71 return -1; // timed out or no message available
72 }
73 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
74
75 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
76 uint16_t event = *reinterpret_cast<uint16_t *> (data);
77 if (value_)
78 memcpy (value_, data + 2, sizeof (uint32_t));
79
80 // Second frame in message contains event address
81 TEST_ASSERT_SUCCESS_ERRNO (
82 receive_monitor_address (monitor_, address_, false));
83
84 return event;
85 }
86
get_monitor_event_with_timeout(void * monitor_,int * value_,char ** address_,int timeout_)87 int get_monitor_event_with_timeout (void *monitor_,
88 int *value_,
89 char **address_,
90 int timeout_)
91 {
92 int res;
93 if (timeout_ == -1) {
94 // process infinite timeout in small steps to allow the user
95 // to see some information on the console
96
97 int timeout_step = 250;
98 int wait_time = 0;
99 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
100 sizeof (timeout_step));
101 while (
102 (res = get_monitor_event_internal (monitor_, value_, address_, 0))
103 == -1) {
104 wait_time += timeout_step;
105 fprintf (stderr, "Still waiting for monitor event after %i ms\n",
106 wait_time);
107 }
108 } else {
109 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
110 res = get_monitor_event_internal (monitor_, value_, address_, 0);
111 }
112 int timeout_infinite = -1;
113 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
114 sizeof (timeout_infinite));
115 return res;
116 }
117
get_monitor_event(void * monitor_,int * value_,char ** address_)118 int get_monitor_event (void *monitor_, int *value_, char **address_)
119 {
120 return get_monitor_event_with_timeout (monitor_, value_, address_, -1);
121 }
122
expect_monitor_event(void * monitor_,int expected_event_)123 void expect_monitor_event (void *monitor_, int expected_event_)
124 {
125 TEST_ASSERT_EQUAL_HEX (expected_event_,
126 get_monitor_event (monitor_, NULL, NULL));
127 }
128
print_unexpected_event(char * buf_,size_t buf_size_,int event_,int err_,int expected_event_,int expected_err_)129 static void print_unexpected_event (char *buf_,
130 size_t buf_size_,
131 int event_,
132 int err_,
133 int expected_event_,
134 int expected_err_)
135 {
136 snprintf (buf_, buf_size_,
137 "Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value "
138 "= %i/0x%x)\n",
139 event_, err_, err_, expected_event_, expected_err_,
140 expected_err_);
141 }
142
print_unexpected_event_stderr(int event_,int err_,int expected_event_,int expected_err_)143 void print_unexpected_event_stderr (int event_,
144 int err_,
145 int expected_event_,
146 int expected_err_)
147 {
148 char buf[256];
149 print_unexpected_event (buf, sizeof buf, event_, err_, expected_event_,
150 expected_err_);
151 fputs (buf, stderr);
152 }
153
expect_monitor_event_multiple(void * server_mon_,int expected_event_,int expected_err_,bool optional_)154 int expect_monitor_event_multiple (void *server_mon_,
155 int expected_event_,
156 int expected_err_,
157 bool optional_)
158 {
159 int count_of_expected_events = 0;
160 int client_closed_connection = 0;
161 int timeout = 250;
162 int wait_time = 0;
163
164 int event;
165 int err;
166 while ((event =
167 get_monitor_event_with_timeout (server_mon_, &err, NULL, timeout))
168 != -1
169 || !count_of_expected_events) {
170 if (event == -1) {
171 if (optional_)
172 break;
173 wait_time += timeout;
174 fprintf (stderr,
175 "Still waiting for first event after %ims (expected event "
176 "%x (value %i/0x%x))\n",
177 wait_time, expected_event_, expected_err_, expected_err_);
178 continue;
179 }
180 // ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen
181 // ECONNRESET can happen on very slow machines, when the engine writes
182 // to the peer and then tries to read the socket before the peer reads
183 // ECONNABORTED happens when a client aborts a connection via RST/timeout
184 if (event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
185 && ((err == EPIPE && expected_err_ != EPIPE) || err == ECONNRESET
186 || err == ECONNABORTED)) {
187 fprintf (stderr,
188 "Ignored event (skipping any further events): %x (err = "
189 "%i == %s)\n",
190 event, err, zmq_strerror (err));
191 client_closed_connection = 1;
192 break;
193 }
194 if (event != expected_event_
195 || (-1 != expected_err_ && err != expected_err_)) {
196 char buf[256];
197 print_unexpected_event (buf, sizeof buf, event, err,
198 expected_event_, expected_err_);
199 TEST_FAIL_MESSAGE (buf);
200 }
201 ++count_of_expected_events;
202 }
203 TEST_ASSERT_TRUE (optional_ || count_of_expected_events > 0
204 || client_closed_connection);
205
206 return count_of_expected_events;
207 }
208
get_monitor_event_internal_v2(void * monitor_,uint64_t * value_,char ** local_address_,char ** remote_address_,int recv_flag_)209 static int64_t get_monitor_event_internal_v2 (void *monitor_,
210 uint64_t *value_,
211 char **local_address_,
212 char **remote_address_,
213 int recv_flag_)
214 {
215 // First frame in message contains event number
216 zmq_msg_t msg;
217 zmq_msg_init (&msg);
218 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
219 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
220 return -1; // timed out or no message available
221 }
222 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
223 TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
224
225 uint64_t event;
226 memcpy (&event, zmq_msg_data (&msg), sizeof (event));
227 zmq_msg_close (&msg);
228
229 // Second frame in message contains the number of values
230 zmq_msg_init (&msg);
231 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
232 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
233 return -1; // timed out or no message available
234 }
235 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
236 TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
237
238 uint64_t value_count;
239 memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
240 zmq_msg_close (&msg);
241
242 for (uint64_t i = 0; i < value_count; ++i) {
243 // Subsequent frames in message contain event values
244 zmq_msg_init (&msg);
245 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
246 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
247 return -1; // timed out or no message available
248 }
249 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
250 TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
251
252 if (value_ && value_ + i)
253 memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
254 zmq_msg_close (&msg);
255 }
256
257 // Second-to-last frame in message contains local address
258 TEST_ASSERT_SUCCESS_ERRNO (
259 receive_monitor_address (monitor_, local_address_, true));
260
261 // Last frame in message contains remote address
262 TEST_ASSERT_SUCCESS_ERRNO (
263 receive_monitor_address (monitor_, remote_address_, false));
264
265 return event;
266 }
267
get_monitor_event_with_timeout_v2(void * monitor_,uint64_t * value_,char ** local_address_,char ** remote_address_,int timeout_)268 static int64_t get_monitor_event_with_timeout_v2 (void *monitor_,
269 uint64_t *value_,
270 char **local_address_,
271 char **remote_address_,
272 int timeout_)
273 {
274 int64_t res;
275 if (timeout_ == -1) {
276 // process infinite timeout in small steps to allow the user
277 // to see some information on the console
278
279 int timeout_step = 250;
280 int wait_time = 0;
281 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
282 sizeof (timeout_step));
283 while ((res = get_monitor_event_internal_v2 (
284 monitor_, value_, local_address_, remote_address_, 0))
285 == -1) {
286 wait_time += timeout_step;
287 fprintf (stderr, "Still waiting for monitor event after %i ms\n",
288 wait_time);
289 }
290 } else {
291 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
292 res = get_monitor_event_internal_v2 (monitor_, value_, local_address_,
293 remote_address_, 0);
294 }
295 int timeout_infinite = -1;
296 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
297 sizeof (timeout_infinite));
298 return res;
299 }
300
get_monitor_event_v2(void * monitor_,uint64_t * value_,char ** local_address_,char ** remote_address_)301 int64_t get_monitor_event_v2 (void *monitor_,
302 uint64_t *value_,
303 char **local_address_,
304 char **remote_address_)
305 {
306 return get_monitor_event_with_timeout_v2 (monitor_, value_, local_address_,
307 remote_address_, -1);
308 }
309
expect_monitor_event_v2(void * monitor_,int64_t expected_event_,const char * expected_local_address_,const char * expected_remote_address_)310 void expect_monitor_event_v2 (void *monitor_,
311 int64_t expected_event_,
312 const char *expected_local_address_,
313 const char *expected_remote_address_)
314 {
315 char *local_address = NULL;
316 char *remote_address = NULL;
317 int64_t event = get_monitor_event_v2 (
318 monitor_, NULL, expected_local_address_ ? &local_address : NULL,
319 expected_remote_address_ ? &remote_address : NULL);
320 bool failed = false;
321 char buf[256];
322 char *pos = buf;
323 if (event != expected_event_) {
324 pos += snprintf (pos, sizeof buf - (pos - buf),
325 "Expected monitor event %llx, but received %llx\n",
326 static_cast<long long> (expected_event_),
327 static_cast<long long> (event));
328 failed = true;
329 }
330 if (expected_local_address_
331 && 0 != strcmp (local_address, expected_local_address_)) {
332 pos += snprintf (pos, sizeof buf - (pos - buf),
333 "Expected local address %s, but received %s\n",
334 expected_local_address_, local_address);
335 }
336 if (expected_remote_address_
337 && 0 != strcmp (remote_address, expected_remote_address_)) {
338 snprintf (pos, sizeof buf - (pos - buf),
339 "Expected remote address %s, but received %s\n",
340 expected_remote_address_, remote_address);
341 }
342 free (local_address);
343 free (remote_address);
344 TEST_ASSERT_FALSE_MESSAGE (failed, buf);
345 }
346
347
get_zmqEventName(uint64_t event)348 const char *get_zmqEventName (uint64_t event)
349 {
350 switch (event) {
351 case ZMQ_EVENT_CONNECTED:
352 return "CONNECTED";
353 case ZMQ_EVENT_CONNECT_DELAYED:
354 return "CONNECT_DELAYED";
355 case ZMQ_EVENT_CONNECT_RETRIED:
356 return "CONNECT_RETRIED";
357 case ZMQ_EVENT_LISTENING:
358 return "LISTENING";
359 case ZMQ_EVENT_BIND_FAILED:
360 return "BIND_FAILED";
361 case ZMQ_EVENT_ACCEPTED:
362 return "ACCEPTED";
363 case ZMQ_EVENT_ACCEPT_FAILED:
364 return "ACCEPT_FAILED";
365 case ZMQ_EVENT_CLOSED:
366 return "CLOSED";
367 case ZMQ_EVENT_CLOSE_FAILED:
368 return "CLOSE_FAILED";
369 case ZMQ_EVENT_DISCONNECTED:
370 return "DISCONNECTED";
371 case ZMQ_EVENT_MONITOR_STOPPED:
372 return "MONITOR_STOPPED";
373 case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
374 return "HANDSHAKE_FAILED_NO_DETAIL";
375 case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
376 return "HANDSHAKE_SUCCEEDED";
377 case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
378 return "HANDSHAKE_FAILED_PROTOCOL";
379 case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
380 return "HANDSHAKE_FAILED_AUTH";
381 default:
382 return "UNKNOWN";
383 }
384 }
385
print_events(void * socket,int timeout,int limit)386 void print_events (void *socket, int timeout, int limit)
387 {
388 // print events received
389 int value;
390 char *event_address;
391 int event =
392 get_monitor_event_with_timeout (socket, &value, &event_address, timeout);
393 int i = 0;
394 ;
395 while ((event != -1) && (++i < limit)) {
396 const char *eventName = get_zmqEventName (event);
397 printf ("Got event: %s\n", eventName);
398 event = get_monitor_event_with_timeout (socket, &value, &event_address,
399 timeout);
400 }
401 }
402