1 /*
2     Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 #include "testutil_monitoring.hpp"
30 #include "testutil_unity.hpp"
31 
32 #include <stdlib.h>
33 #include <string.h>
34 
35 static int
receive_monitor_address(void * monitor_,char ** address_,bool expect_more_)36 receive_monitor_address (void *monitor_, char **address_, bool expect_more_)
37 {
38     zmq_msg_t msg;
39 
40     zmq_msg_init (&msg);
41     if (zmq_msg_recv (&msg, monitor_, 0) == -1)
42         return -1; //  Interrupted, presumably
43     TEST_ASSERT_EQUAL (expect_more_, zmq_msg_more (&msg));
44 
45     if (address_) {
46         const uint8_t *const data =
47           static_cast<const uint8_t *> (zmq_msg_data (&msg));
48         const size_t size = zmq_msg_size (&msg);
49         *address_ = static_cast<char *> (malloc (size + 1));
50         memcpy (*address_, data, size);
51         (*address_)[size] = 0;
52     }
53     zmq_msg_close (&msg);
54 
55     return 0;
56 }
57 
58 //  Read one event off the monitor socket; return value and address
59 //  by reference, if not null, and event number by value. Returns -1
60 //  in case of error.
get_monitor_event_internal(void * monitor_,int * value_,char ** address_,int recv_flag_)61 static int get_monitor_event_internal (void *monitor_,
62                                        int *value_,
63                                        char **address_,
64                                        int recv_flag_)
65 {
66     //  First frame in message contains event number and value
67     zmq_msg_t msg;
68     zmq_msg_init (&msg);
69     if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
70         TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
71         return -1; //  timed out or no message available
72     }
73     TEST_ASSERT_TRUE (zmq_msg_more (&msg));
74 
75     uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
76     uint16_t event = *reinterpret_cast<uint16_t *> (data);
77     if (value_)
78         memcpy (value_, data + 2, sizeof (uint32_t));
79 
80     //  Second frame in message contains event address
81     TEST_ASSERT_SUCCESS_ERRNO (
82       receive_monitor_address (monitor_, address_, false));
83 
84     return event;
85 }
86 
get_monitor_event_with_timeout(void * monitor_,int * value_,char ** address_,int timeout_)87 int get_monitor_event_with_timeout (void *monitor_,
88                                     int *value_,
89                                     char **address_,
90                                     int timeout_)
91 {
92     int res;
93     if (timeout_ == -1) {
94         // process infinite timeout in small steps to allow the user
95         // to see some information on the console
96 
97         int timeout_step = 250;
98         int wait_time = 0;
99         zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
100                         sizeof (timeout_step));
101         while (
102           (res = get_monitor_event_internal (monitor_, value_, address_, 0))
103           == -1) {
104             wait_time += timeout_step;
105             fprintf (stderr, "Still waiting for monitor event after %i ms\n",
106                      wait_time);
107         }
108     } else {
109         zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
110         res = get_monitor_event_internal (monitor_, value_, address_, 0);
111     }
112     int timeout_infinite = -1;
113     zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
114                     sizeof (timeout_infinite));
115     return res;
116 }
117 
get_monitor_event(void * monitor_,int * value_,char ** address_)118 int get_monitor_event (void *monitor_, int *value_, char **address_)
119 {
120     return get_monitor_event_with_timeout (monitor_, value_, address_, -1);
121 }
122 
expect_monitor_event(void * monitor_,int expected_event_)123 void expect_monitor_event (void *monitor_, int expected_event_)
124 {
125     TEST_ASSERT_EQUAL_HEX (expected_event_,
126                            get_monitor_event (monitor_, NULL, NULL));
127 }
128 
print_unexpected_event(char * buf_,size_t buf_size_,int event_,int err_,int expected_event_,int expected_err_)129 static void print_unexpected_event (char *buf_,
130                                     size_t buf_size_,
131                                     int event_,
132                                     int err_,
133                                     int expected_event_,
134                                     int expected_err_)
135 {
136     snprintf (buf_, buf_size_,
137               "Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value "
138               "= %i/0x%x)\n",
139               event_, err_, err_, expected_event_, expected_err_,
140               expected_err_);
141 }
142 
print_unexpected_event_stderr(int event_,int err_,int expected_event_,int expected_err_)143 void print_unexpected_event_stderr (int event_,
144                                     int err_,
145                                     int expected_event_,
146                                     int expected_err_)
147 {
148     char buf[256];
149     print_unexpected_event (buf, sizeof buf, event_, err_, expected_event_,
150                             expected_err_);
151     fputs (buf, stderr);
152 }
153 
expect_monitor_event_multiple(void * server_mon_,int expected_event_,int expected_err_,bool optional_)154 int expect_monitor_event_multiple (void *server_mon_,
155                                    int expected_event_,
156                                    int expected_err_,
157                                    bool optional_)
158 {
159     int count_of_expected_events = 0;
160     int client_closed_connection = 0;
161     int timeout = 250;
162     int wait_time = 0;
163 
164     int event;
165     int err;
166     while ((event =
167               get_monitor_event_with_timeout (server_mon_, &err, NULL, timeout))
168              != -1
169            || !count_of_expected_events) {
170         if (event == -1) {
171             if (optional_)
172                 break;
173             wait_time += timeout;
174             fprintf (stderr,
175                      "Still waiting for first event after %ims (expected event "
176                      "%x (value %i/0x%x))\n",
177                      wait_time, expected_event_, expected_err_, expected_err_);
178             continue;
179         }
180         // ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen
181         // ECONNRESET can happen on very slow machines, when the engine writes
182         // to the peer and then tries to read the socket before the peer reads
183         // ECONNABORTED happens when a client aborts a connection via RST/timeout
184         if (event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
185             && ((err == EPIPE && expected_err_ != EPIPE) || err == ECONNRESET
186                 || err == ECONNABORTED)) {
187             fprintf (stderr,
188                      "Ignored event (skipping any further events): %x (err = "
189                      "%i == %s)\n",
190                      event, err, zmq_strerror (err));
191             client_closed_connection = 1;
192             break;
193         }
194         if (event != expected_event_
195             || (-1 != expected_err_ && err != expected_err_)) {
196             char buf[256];
197             print_unexpected_event (buf, sizeof buf, event, err,
198                                     expected_event_, expected_err_);
199             TEST_FAIL_MESSAGE (buf);
200         }
201         ++count_of_expected_events;
202     }
203     TEST_ASSERT_TRUE (optional_ || count_of_expected_events > 0
204                       || client_closed_connection);
205 
206     return count_of_expected_events;
207 }
208 
get_monitor_event_internal_v2(void * monitor_,uint64_t * value_,char ** local_address_,char ** remote_address_,int recv_flag_)209 static int64_t get_monitor_event_internal_v2 (void *monitor_,
210                                               uint64_t *value_,
211                                               char **local_address_,
212                                               char **remote_address_,
213                                               int recv_flag_)
214 {
215     //  First frame in message contains event number
216     zmq_msg_t msg;
217     zmq_msg_init (&msg);
218     if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
219         TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
220         return -1; //  timed out or no message available
221     }
222     TEST_ASSERT_TRUE (zmq_msg_more (&msg));
223     TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
224 
225     uint64_t event;
226     memcpy (&event, zmq_msg_data (&msg), sizeof (event));
227     zmq_msg_close (&msg);
228 
229     //  Second frame in message contains the number of values
230     zmq_msg_init (&msg);
231     if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
232         TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
233         return -1; //  timed out or no message available
234     }
235     TEST_ASSERT_TRUE (zmq_msg_more (&msg));
236     TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
237 
238     uint64_t value_count;
239     memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
240     zmq_msg_close (&msg);
241 
242     for (uint64_t i = 0; i < value_count; ++i) {
243         //  Subsequent frames in message contain event values
244         zmq_msg_init (&msg);
245         if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
246             TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
247             return -1; //  timed out or no message available
248         }
249         TEST_ASSERT_TRUE (zmq_msg_more (&msg));
250         TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
251 
252         if (value_ && value_ + i)
253             memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
254         zmq_msg_close (&msg);
255     }
256 
257     //  Second-to-last frame in message contains local address
258     TEST_ASSERT_SUCCESS_ERRNO (
259       receive_monitor_address (monitor_, local_address_, true));
260 
261     //  Last frame in message contains remote address
262     TEST_ASSERT_SUCCESS_ERRNO (
263       receive_monitor_address (monitor_, remote_address_, false));
264 
265     return event;
266 }
267 
get_monitor_event_with_timeout_v2(void * monitor_,uint64_t * value_,char ** local_address_,char ** remote_address_,int timeout_)268 static int64_t get_monitor_event_with_timeout_v2 (void *monitor_,
269                                                   uint64_t *value_,
270                                                   char **local_address_,
271                                                   char **remote_address_,
272                                                   int timeout_)
273 {
274     int64_t res;
275     if (timeout_ == -1) {
276         // process infinite timeout in small steps to allow the user
277         // to see some information on the console
278 
279         int timeout_step = 250;
280         int wait_time = 0;
281         zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
282                         sizeof (timeout_step));
283         while ((res = get_monitor_event_internal_v2 (
284                   monitor_, value_, local_address_, remote_address_, 0))
285                == -1) {
286             wait_time += timeout_step;
287             fprintf (stderr, "Still waiting for monitor event after %i ms\n",
288                      wait_time);
289         }
290     } else {
291         zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
292         res = get_monitor_event_internal_v2 (monitor_, value_, local_address_,
293                                              remote_address_, 0);
294     }
295     int timeout_infinite = -1;
296     zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
297                     sizeof (timeout_infinite));
298     return res;
299 }
300 
get_monitor_event_v2(void * monitor_,uint64_t * value_,char ** local_address_,char ** remote_address_)301 int64_t get_monitor_event_v2 (void *monitor_,
302                               uint64_t *value_,
303                               char **local_address_,
304                               char **remote_address_)
305 {
306     return get_monitor_event_with_timeout_v2 (monitor_, value_, local_address_,
307                                               remote_address_, -1);
308 }
309 
expect_monitor_event_v2(void * monitor_,int64_t expected_event_,const char * expected_local_address_,const char * expected_remote_address_)310 void expect_monitor_event_v2 (void *monitor_,
311                               int64_t expected_event_,
312                               const char *expected_local_address_,
313                               const char *expected_remote_address_)
314 {
315     char *local_address = NULL;
316     char *remote_address = NULL;
317     int64_t event = get_monitor_event_v2 (
318       monitor_, NULL, expected_local_address_ ? &local_address : NULL,
319       expected_remote_address_ ? &remote_address : NULL);
320     bool failed = false;
321     char buf[256];
322     char *pos = buf;
323     if (event != expected_event_) {
324         pos += snprintf (pos, sizeof buf - (pos - buf),
325                          "Expected monitor event %llx, but received %llx\n",
326                          static_cast<long long> (expected_event_),
327                          static_cast<long long> (event));
328         failed = true;
329     }
330     if (expected_local_address_
331         && 0 != strcmp (local_address, expected_local_address_)) {
332         pos += snprintf (pos, sizeof buf - (pos - buf),
333                          "Expected local address %s, but received %s\n",
334                          expected_local_address_, local_address);
335     }
336     if (expected_remote_address_
337         && 0 != strcmp (remote_address, expected_remote_address_)) {
338         snprintf (pos, sizeof buf - (pos - buf),
339                   "Expected remote address %s, but received %s\n",
340                   expected_remote_address_, remote_address);
341     }
342     free (local_address);
343     free (remote_address);
344     TEST_ASSERT_FALSE_MESSAGE (failed, buf);
345 }
346 
347 
get_zmqEventName(uint64_t event)348 const char *get_zmqEventName (uint64_t event)
349 {
350     switch (event) {
351         case ZMQ_EVENT_CONNECTED:
352             return "CONNECTED";
353         case ZMQ_EVENT_CONNECT_DELAYED:
354             return "CONNECT_DELAYED";
355         case ZMQ_EVENT_CONNECT_RETRIED:
356             return "CONNECT_RETRIED";
357         case ZMQ_EVENT_LISTENING:
358             return "LISTENING";
359         case ZMQ_EVENT_BIND_FAILED:
360             return "BIND_FAILED";
361         case ZMQ_EVENT_ACCEPTED:
362             return "ACCEPTED";
363         case ZMQ_EVENT_ACCEPT_FAILED:
364             return "ACCEPT_FAILED";
365         case ZMQ_EVENT_CLOSED:
366             return "CLOSED";
367         case ZMQ_EVENT_CLOSE_FAILED:
368             return "CLOSE_FAILED";
369         case ZMQ_EVENT_DISCONNECTED:
370             return "DISCONNECTED";
371         case ZMQ_EVENT_MONITOR_STOPPED:
372             return "MONITOR_STOPPED";
373         case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
374             return "HANDSHAKE_FAILED_NO_DETAIL";
375         case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
376             return "HANDSHAKE_SUCCEEDED";
377         case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
378             return "HANDSHAKE_FAILED_PROTOCOL";
379         case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
380             return "HANDSHAKE_FAILED_AUTH";
381         default:
382             return "UNKNOWN";
383     }
384 }
385 
print_events(void * socket,int timeout,int limit)386 void print_events (void *socket, int timeout, int limit)
387 {
388     // print events received
389     int value;
390     char *event_address;
391     int event =
392       get_monitor_event_with_timeout (socket, &value, &event_address, timeout);
393     int i = 0;
394     ;
395     while ((event != -1) && (++i < limit)) {
396         const char *eventName = get_zmqEventName (event);
397         printf ("Got event: %s\n", eventName);
398         event = get_monitor_event_with_timeout (socket, &value, &event_address,
399                                                 timeout);
400     }
401 }
402