1 /*
2 * ZeroMQ event test
3 * Copyright (C) 2017 David Lamparter, for NetDEF, Inc.
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
8 * any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20 #include <zebra.h>
21 #include "memory.h"
22 #include "sigevent.h"
23 #include "frr_zmq.h"
24
25 DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
26 DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message")
27
28 static struct thread_master *master;
29
msg_buf_free(void * data,void * hint)30 static void msg_buf_free(void *data, void *hint)
31 {
32 XFREE(MTYPE_TESTBUF, data);
33 }
34
recv_delim(void * zmqsock)35 static int recv_delim(void *zmqsock)
36 {
37 /* receive delim */
38 zmq_msg_t zdelim;
39 int more;
40 zmq_msg_init(&zdelim);
41 zmq_msg_recv(&zdelim, zmqsock, 0);
42 more = zmq_msg_more(&zdelim);
43 zmq_msg_close(&zdelim);
44 return more;
45 }
send_delim(void * zmqsock)46 static void send_delim(void *zmqsock)
47 {
48 /* Send delim */
49 zmq_msg_t zdelim;
50 zmq_msg_init(&zdelim);
51 zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
52 zmq_msg_close(&zdelim);
53 }
run_client(int syncfd)54 static void run_client(int syncfd)
55 {
56 int i, j;
57 char buf[32];
58 char dummy;
59 void *zmqctx = NULL;
60 void *zmqsock;
61 int more;
62
63 read(syncfd, &dummy, 1);
64
65 zmqctx = zmq_ctx_new();
66 zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
67
68 zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
69 if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
70 perror("zmq_connect");
71 exit(1);
72 }
73
74 /* single-part */
75 for (i = 0; i < 8; i++) {
76 snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
77 'b' + i, 'c' + i);
78 printf("client send: %s\n", buf);
79 fflush(stdout);
80 send_delim(zmqsock);
81 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
82 more = recv_delim(zmqsock);
83 while (more) {
84 zmq_recv(zmqsock, buf, sizeof(buf), 0);
85 printf("client recv: %s\n", buf);
86 size_t len = sizeof(more);
87 if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
88 break;
89 }
90 }
91
92 /* multipart */
93 for (i = 2; i < 5; i++) {
94 printf("---\n");
95 send_delim(zmqsock);
96 zmq_msg_t part;
97 for (j = 1; j <= i; j++) {
98 char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
99
100 snprintf(dyn, 32, "part %d/%d", j, i);
101 printf("client send: %s\n", dyn);
102 fflush(stdout);
103
104 zmq_msg_init_data(&part, dyn, strlen(dyn) + 1,
105 msg_buf_free, NULL);
106 zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
107 }
108
109 recv_delim(zmqsock);
110 do {
111 char *data;
112
113 zmq_msg_recv(&part, zmqsock, 0);
114 data = zmq_msg_data(&part);
115 more = zmq_msg_more(&part);
116 printf("client recv (more: %d): %s\n", more, data);
117 } while (more);
118 zmq_msg_close(&part);
119 }
120
121 /* write callback */
122 printf("---\n");
123 snprintf(buf, sizeof(buf), "Done receiving");
124 printf("client send: %s\n", buf);
125 fflush(stdout);
126 send_delim(zmqsock);
127 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
128 /* wait for message from server */
129 more = recv_delim(zmqsock);
130 while (more) {
131 zmq_recv(zmqsock, buf, sizeof(buf), 0);
132 printf("client recv: %s\n", buf);
133 size_t len = sizeof(more);
134 if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
135 break;
136 }
137
138 zmq_close(zmqsock);
139 zmq_ctx_term(zmqctx);
140 }
141
142 static struct frrzmq_cb *cb;
143
recv_id_and_delim(void * zmqsock,zmq_msg_t * msg_id)144 static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
145 {
146 /* receive id */
147 zmq_msg_init(msg_id);
148 zmq_msg_recv(msg_id, zmqsock, 0);
149 /* receive delim */
150 recv_delim(zmqsock);
151 }
send_id_and_delim(void * zmqsock,zmq_msg_t * msg_id)152 static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
153 {
154 /* Send Id */
155 zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
156 send_delim(zmqsock);
157 }
serverwritefn(void * arg,void * zmqsock)158 static void serverwritefn(void *arg, void *zmqsock)
159 {
160 zmq_msg_t *msg_id = (zmq_msg_t *)arg;
161 char buf[32] = "Test write callback";
162 size_t i;
163
164 for (i = 0; i < strlen(buf); i++)
165 buf[i] = toupper(buf[i]);
166 printf("server send: %s\n", buf);
167 fflush(stdout);
168 send_id_and_delim(zmqsock, msg_id);
169 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
170
171 /* send just once */
172 frrzmq_thread_cancel(&cb, &cb->write);
173
174 zmq_msg_close(msg_id);
175 XFREE(MTYPE_ZMQMSG, msg_id);
176 }
serverpartfn(void * arg,void * zmqsock,zmq_msg_t * msg,unsigned partnum)177 static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
178 unsigned partnum)
179 {
180 static int num = 0;
181 int more = zmq_msg_more(msg);
182 char *in = zmq_msg_data(msg);
183 size_t i;
184 zmq_msg_t reply;
185 char *out;
186
187 /* Id */
188 if (partnum == 0) {
189 send_id_and_delim(zmqsock, msg);
190 return;
191 }
192 /* Delim */
193 if (partnum == 1)
194 return;
195
196
197 printf("server recv part %u (more: %d): %s\n", partnum, more, in);
198 fflush(stdout);
199
200 out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
201 for (i = 0; i < strlen(in); i++)
202 out[i] = toupper(in[i]);
203 out[i] = '\0';
204 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
205 zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
206
207 if (more)
208 return;
209
210 out = XMALLOC(MTYPE_TESTBUF, 32);
211 snprintf(out, 32, "msg# was %u", partnum);
212 zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
213 zmq_msg_send(&reply, zmqsock, 0);
214
215 zmq_msg_close(&reply);
216
217 if (++num < 7)
218 return;
219
220 /* write callback test */
221 char buf[32];
222 zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
223 recv_id_and_delim(zmqsock, msg_id);
224 zmq_recv(zmqsock, buf, sizeof(buf), 0);
225 printf("server recv: %s\n", buf);
226 fflush(stdout);
227
228 frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
229 zmqsock, &cb);
230 }
231
serverfn(void * arg,void * zmqsock)232 static void serverfn(void *arg, void *zmqsock)
233 {
234 static int num = 0;
235
236 zmq_msg_t msg_id;
237 char buf[32];
238 size_t i;
239
240 recv_id_and_delim(zmqsock, &msg_id);
241 zmq_recv(zmqsock, buf, sizeof(buf), 0);
242
243 printf("server recv: %s\n", buf);
244 fflush(stdout);
245 for (i = 0; i < strlen(buf); i++)
246 buf[i] = toupper(buf[i]);
247 send_id_and_delim(zmqsock, &msg_id);
248 zmq_msg_close(&msg_id);
249 zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
250
251 if (++num < 4)
252 return;
253
254 /* change to multipart callback */
255 frrzmq_thread_cancel(&cb, &cb->read);
256 frrzmq_thread_cancel(&cb, &cb->write);
257
258 frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
259 &cb);
260 }
261
sigchld(void)262 static void sigchld(void)
263 {
264 printf("child exited.\n");
265 frrzmq_thread_cancel(&cb, &cb->read);
266 frrzmq_thread_cancel(&cb, &cb->write);
267 }
268
269 static struct quagga_signal_t sigs[] = {
270 {
271 .signal = SIGCHLD,
272 .handler = sigchld,
273 },
274 };
275
run_server(int syncfd)276 static void run_server(int syncfd)
277 {
278 void *zmqsock;
279 char dummy = 0;
280 struct thread t;
281
282 master = thread_master_create(NULL);
283 signal_init(master, array_size(sigs), sigs);
284 frrzmq_init();
285
286 zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
287 if (zmq_bind(zmqsock, "tcp://*:17171")) {
288 perror("zmq_bind");
289 exit(1);
290 }
291
292 frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
293
294 write(syncfd, &dummy, sizeof(dummy));
295 while (thread_fetch(master, &t))
296 thread_call(&t);
297
298 zmq_close(zmqsock);
299 frrzmq_finish();
300 thread_master_free(master);
301 log_memstats_stderr("test");
302 }
303
main(void)304 int main(void)
305 {
306 int syncpipe[2];
307 pid_t child;
308
309 if (pipe(syncpipe)) {
310 perror("pipe");
311 exit(1);
312 }
313
314 child = fork();
315 if (child < 0) {
316 perror("fork");
317 exit(1);
318 } else if (child == 0) {
319 run_client(syncpipe[0]);
320 exit(0);
321 }
322
323 run_server(syncpipe[1]);
324 exit(0);
325 }
326