1 /*
2     Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "testutil.hpp"
31 #include "testutil_unity.hpp"
32 
33 #include <string.h>
34 
35 SETUP_TEARDOWN_TESTCONTEXT
36 
37 static const int SERVER = 0;
38 static const int CLIENT = 1;
39 
40 struct test_message_t
41 {
42     int turn;
43     const char *text;
44 };
45 
46 // NOTE: messages are sent without null terminator.
47 const test_message_t dialog[] = {
48   {CLIENT, "i can haz cheez burger?"},
49   {SERVER, "y u no disonnect?"},
50   {CLIENT, ""},
51 };
52 const int steps = sizeof (dialog) / sizeof (dialog[0]);
53 
has_more(void * socket_)54 bool has_more (void *socket_)
55 {
56     int more = 0;
57     size_t more_size = sizeof (more);
58     int rc = zmq_getsockopt (socket_, ZMQ_RCVMORE, &more, &more_size);
59     if (rc != 0)
60         return false;
61     return more != 0;
62 }
63 
test_stream_disconnect()64 void test_stream_disconnect ()
65 {
66     size_t len = MAX_SOCKET_STRING;
67     char bind_endpoint[MAX_SOCKET_STRING];
68     char connect_endpoint[MAX_SOCKET_STRING];
69     void *sockets[2];
70 
71     sockets[SERVER] = test_context_socket (ZMQ_STREAM);
72     int enabled = 1;
73     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
74       sockets[SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
75     TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sockets[SERVER], "tcp://0.0.0.0:*"));
76     TEST_ASSERT_SUCCESS_ERRNO (
77       zmq_getsockopt (sockets[SERVER], ZMQ_LAST_ENDPOINT, bind_endpoint, &len));
78 
79     //  Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
80 #ifdef ZMQ_HAVE_WINDOWS
81     sprintf (connect_endpoint, "tcp://127.0.0.1:%s",
82              strrchr (bind_endpoint, ':') + 1);
83 #else
84     strcpy (connect_endpoint, bind_endpoint);
85 #endif
86 
87     sockets[CLIENT] = test_context_socket (ZMQ_STREAM);
88     TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
89       sockets[CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
90     TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sockets[CLIENT], connect_endpoint));
91 
92     // wait for connect notification
93     // Server: Grab the 1st frame (peer routing id).
94     zmq_msg_t peer_frame;
95     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
96     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
97     TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
98     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
99     TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
100 
101     // Server: Grab the 2nd frame (actual payload).
102     zmq_msg_t data_frame;
103     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
104     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[SERVER], 0));
105     TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
106     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
107 
108     // Client: Grab the 1st frame (peer routing id).
109     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
110     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
111     TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
112     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
113     TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
114 
115     // Client: Grab the 2nd frame (actual payload).
116     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
117     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
118     TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
119     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
120 
121     // Send initial message.
122     char blob_data[256];
123     size_t blob_size = sizeof (blob_data);
124     TEST_ASSERT_SUCCESS_ERRNO (
125       zmq_getsockopt (sockets[CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size));
126     TEST_ASSERT_GREATER_THAN (0, blob_size);
127     zmq_msg_t msg;
128     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, blob_size));
129     memcpy (zmq_msg_data (&msg), blob_data, blob_size);
130     TEST_ASSERT_SUCCESS_ERRNO (
131       zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
132     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
133     TEST_ASSERT_SUCCESS_ERRNO (
134       zmq_msg_init_size (&msg, strlen (dialog[0].text)));
135     memcpy (zmq_msg_data (&msg), dialog[0].text, strlen (dialog[0].text));
136     TEST_ASSERT_SUCCESS_ERRNO (
137       zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
138     TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
139 
140     // TODO: make sure this loop doesn't loop forever if something is wrong
141     //       with the test (or the implementation).
142 
143     int step = 0;
144     while (step < steps) {
145         // Wait until something happens.
146         zmq_pollitem_t items[] = {
147           {sockets[SERVER], 0, ZMQ_POLLIN, 0},
148           {sockets[CLIENT], 0, ZMQ_POLLIN, 0},
149         };
150         TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (items, 2, 100));
151 
152         // Check for data received by the server.
153         if (items[SERVER].revents & ZMQ_POLLIN) {
154             TEST_ASSERT_EQUAL_INT (CLIENT, dialog[step].turn);
155 
156             // Grab the 1st frame (peer routing id).
157             zmq_msg_t peer_frame;
158             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
159             TEST_ASSERT_SUCCESS_ERRNO (
160               zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
161             TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
162             TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
163 
164             // Grab the 2nd frame (actual payload).
165             zmq_msg_t data_frame;
166             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
167             TEST_ASSERT_SUCCESS_ERRNO (
168               zmq_msg_recv (&data_frame, sockets[SERVER], 0));
169 
170             // Make sure payload matches what we expect.
171             const char *const data =
172               static_cast<const char *> (zmq_msg_data (&data_frame));
173             const size_t size = zmq_msg_size (&data_frame);
174             // 0-length frame is a disconnection notification.  The server
175             // should receive it as the last step in the dialogue.
176             if (size == 0) {
177                 ++step;
178                 TEST_ASSERT_EQUAL_INT (steps, step);
179             } else {
180                 TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
181                 TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
182 
183                 ++step;
184 
185                 TEST_ASSERT_LESS_THAN_INT (steps, step);
186 
187                 // Prepare the response.
188                 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
189                 TEST_ASSERT_SUCCESS_ERRNO (
190                   zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
191                 memcpy (zmq_msg_data (&data_frame), dialog[step].text,
192                         zmq_msg_size (&data_frame));
193 
194                 // Send the response.
195                 TEST_ASSERT_SUCCESS_ERRNO (
196                   zmq_msg_send (&peer_frame, sockets[SERVER], ZMQ_SNDMORE));
197                 TEST_ASSERT_SUCCESS_ERRNO (
198                   zmq_msg_send (&data_frame, sockets[SERVER], ZMQ_SNDMORE));
199             }
200 
201             // Release resources.
202             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
203             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
204         }
205 
206         // Check for data received by the client.
207         if (items[CLIENT].revents & ZMQ_POLLIN) {
208             TEST_ASSERT_EQUAL_INT (SERVER, dialog[step].turn);
209 
210             // Grab the 1st frame (peer routing id).
211             zmq_msg_t peer_frame;
212             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
213             TEST_ASSERT_SUCCESS_ERRNO (
214               zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
215             TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
216             TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
217 
218             // Grab the 2nd frame (actual payload).
219             zmq_msg_t data_frame;
220             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
221             TEST_ASSERT_SUCCESS_ERRNO (
222               zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
223             TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&data_frame));
224 
225             // Make sure payload matches what we expect.
226             const char *const data =
227               static_cast<const char *> (zmq_msg_data (&data_frame));
228             const size_t size = zmq_msg_size (&data_frame);
229             TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
230             TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
231 
232             ++step;
233 
234             // Prepare the response (next line in the dialog).
235             TEST_ASSERT_LESS_THAN_INT (steps, step);
236             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
237             TEST_ASSERT_SUCCESS_ERRNO (
238               zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
239             memcpy (zmq_msg_data (&data_frame), dialog[step].text,
240                     zmq_msg_size (&data_frame));
241 
242             // Send the response.
243             TEST_ASSERT_SUCCESS_ERRNO (
244               zmq_msg_send (&peer_frame, sockets[CLIENT], ZMQ_SNDMORE));
245             TEST_ASSERT_SUCCESS_ERRNO (
246               zmq_msg_send (&data_frame, sockets[CLIENT], ZMQ_SNDMORE));
247 
248             // Release resources.
249             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
250             TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
251         }
252     }
253     TEST_ASSERT_EQUAL_INT (steps, step);
254     test_context_socket_close (sockets[CLIENT]);
255     test_context_socket_close (sockets[SERVER]);
256 }
257 
main(int,char **)258 int main (int, char **)
259 {
260     setup_test_environment ();
261 
262     UNITY_BEGIN ();
263     RUN_TEST (test_stream_disconnect);
264     return UNITY_END ();
265 }
266