1 /*  =========================================================================
2     zstr - sending and receiving strings
3 
4     Copyright (c) the Contributors as noted in the AUTHORS file.
5     This file is part of CZMQ, the high-level C binding for 0MQ:
6     http://czmq.zeromq.org.
7 
8     This Source Code Form is subject to the terms of the Mozilla Public
9     License, v. 2.0. If a copy of the MPL was not distributed with this
10     file, You can obtain one at http://mozilla.org/MPL/2.0/.
11     =========================================================================
12 */
13 
14 /*
15 @header
16     The zstr class provides utility functions for sending and receiving C
17     strings across 0MQ sockets. It sends strings without a terminating null,
18     and appends a null byte on received strings. This class is for simple
19     message sending.
20 @discuss
21            Memory                       Wire
22            +-------------+---+          +---+-------------+
23     Send   | S t r i n g | 0 |  ---->   | 6 | S t r i n g |
24            +-------------+---+          +---+-------------+
25 
26            Wire                         Heap
27            +---+-------------+          +-------------+---+
28     Recv   | 6 | S t r i n g |  ---->   | S t r i n g | 0 |
29            +---+-------------+          +-------------+---+
30 @end
31 */
32 
33 #include "czmq_classes.h"
34 #ifdef HAVE_LIBLZ4
35 #include <lz4.h>
36 
37 static void
s_free_compress(void * data,void * hint)38 s_free_compress (void *data, void *hint)
39 {
40     free (data);
41 }
42 #endif
43 
44 static int
s_send_string(void * dest,bool more,char * string,bool compress)45 s_send_string (void *dest, bool more, char *string, bool compress)
46 {
47     assert (dest);
48     void *handle = zsock_resolve (dest);
49 
50     size_t len = strlen (string);
51     zmq_msg_t message;
52 
53     if (compress) {
54 #ifdef HAVE_LIBLZ4
55         size_t compressed_len = LZ4_compressBound (len);
56         if (compressed_len == 0)
57             return -1;
58 
59         char *buffer = (char *)malloc (compressed_len );
60         if (!buffer)
61             return -1;
62 
63         //  LZ4_compress has been deprecated in newer versions, but
64         //  the new APIs are not available for older distros
65 #if LZ4_VERSION_MAJOR >= 1 && LZ4_VERSION_MINOR >= 7
66         int rc = LZ4_compress_default (string, buffer, len, compressed_len);
67 #else
68         int rc = LZ4_compress (string, buffer, len);
69 #endif
70         if (rc == 0) {
71             free (buffer);
72             return -1;
73         }
74 
75         zmq_msg_t size_frame;
76         zmq_msg_init_size (&size_frame, sizeof (size_t));
77         memcpy (zmq_msg_data (&size_frame), &len, sizeof (size_t));
78 
79 #if defined (ZMQ_SERVER)
80         //  Set routing ID if we're sending to a SERVER socket (ZMQ 4.2 and later)
81         if (zsock_is (dest) && zsock_type (dest) == ZMQ_SERVER)
82             zmq_msg_set_routing_id (&size_frame, zsock_routing_id ((zsock_t *) dest));
83 #endif
84         if (zmq_sendmsg (handle, &size_frame, ZMQ_SNDMORE) == -1) {
85             free (buffer);
86             zmq_msg_close (&size_frame);
87             return -1;
88         }
89 
90         //  Optimisation: use zero-copy send. The common use case for
91         //  compressed data is large buffers, so avoiding an extra malloc + copy
92         //  is worth the extra few lines of code
93         rc = zmq_msg_init_data (&message, buffer, rc, s_free_compress, NULL);
94         //  Assert on OOM
95         assert (rc != -1);
96 #else
97         return -1;
98 #endif
99     } else {
100         int rc = zmq_msg_init_size (&message, len);
101         assert (rc == 0);
102         memcpy (zmq_msg_data (&message), string, len);
103     }
104 
105 #if defined (ZMQ_SERVER)
106     //  Set routing ID if we're sending to a SERVER socket (ZMQ 4.2 and later)
107     if (zsock_is (dest) && zsock_type (dest) == ZMQ_SERVER)
108         zmq_msg_set_routing_id (&message, zsock_routing_id ((zsock_t *) dest));
109 #endif
110     if (zmq_sendmsg (handle, &message, more? ZMQ_SNDMORE: 0) == -1) {
111         zmq_msg_close (&message);
112         return -1;
113     }
114     else
115         return 0;
116 }
117 
118 
119 //  --------------------------------------------------------------------------
120 //  Receive C string from socket. Caller must free returned string using
121 //  zstr_free(). Returns NULL if the context is being terminated or the
122 //  process was interrupted.
123 
124 char *
zstr_recv(void * source)125 zstr_recv (void *source)
126 {
127     assert (source);
128     void *handle = zsock_resolve (source);
129 
130     zmq_msg_t message;
131     zmq_msg_init (&message);
132     if (zmq_recvmsg (handle, &message, 0) < 0)
133         return NULL;
134 
135 #if defined (ZMQ_SERVER)
136     //  Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
137     if (zsock_is (source) && zsock_type (source) == ZMQ_SERVER)
138         zsock_set_routing_id ((zsock_t *) source, zmq_msg_routing_id (&message));
139 #endif
140     size_t size = zmq_msg_size (&message);
141     char *string = (char *) malloc (size + 1);
142     if (string) {
143         memcpy (string, zmq_msg_data (&message), size);
144         string [size] = 0;
145     }
146     zmq_msg_close (&message);
147     return string;
148 }
149 
150 
151 //  --------------------------------------------------------------------------
152 //  Receive a series of strings (until NULL) from multipart data.
153 //  Each string is allocated and filled with string data; if there
154 //  are not enough frames, unallocated strings are set to NULL.
155 //  Returns -1 if the message could not be read, else returns the
156 //  number of strings filled, zero or more. Free each returned string
157 //  using zstr_free(). If not enough strings are provided, remaining
158 //  multipart frames in the message are dropped.
159 
160 int
zstr_recvx(void * source,char ** string_p,...)161 zstr_recvx (void *source, char **string_p, ...)
162 {
163     assert (source);
164     void *handle = zsock_resolve (source);
165 
166     zmsg_t *msg = zmsg_recv (handle);
167     if (!msg)
168         return -1;
169 
170 #if defined (ZMQ_SERVER)
171     //  Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
172     if (zsock_is (source) && zsock_type (source) == ZMQ_SERVER)
173         zsock_set_routing_id ((zsock_t *) source, zmsg_routing_id (msg));
174 #endif
175     //  Filter a signal that may come from a dying actor
176     if (zmsg_signal (msg) >= 0) {
177         zmsg_destroy (&msg);
178         return -1;
179     }
180     int count = 0;
181     va_list args;
182     va_start (args, string_p);
183     while (string_p) {
184         *string_p = zmsg_popstr (msg);
185         string_p = va_arg (args, char **);
186         count++;
187     }
188     va_end (args);
189     zmsg_destroy (&msg);
190     return count;
191 }
192 
193 
194 //  --------------------------------------------------------------------------
195 //  De-compress and receive C string from socket, received as a message
196 //  with two frames: size of the uncompressed string, and the string itself.
197 //  Caller must free returned string using zstr_free(). Returns NULL if the
198 //  context is being terminated or the process was interrupted.
199 //  Caller owns return value and must destroy it when done.
200 
201 char *
zstr_recv_compress(void * source)202 zstr_recv_compress (void *source)
203 {
204     assert (source);
205 #ifndef HAVE_LIBLZ4
206     return NULL;
207 #else
208 
209     void *handle = zsock_resolve (source);
210 
211     zmsg_t *msg = zmsg_recv (handle);
212     if (!msg)
213         return NULL;
214 
215 #if defined (ZMQ_SERVER)
216     //  Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
217     if (zsock_is (source) && zsock_type (source) == ZMQ_SERVER)
218         zsock_set_routing_id ((zsock_t *) source, zmsg_routing_id (msg));
219 #endif
220     //  Filter a signal that may come from a dying actor
221     if (zmsg_signal (msg) >= 0) {
222         zmsg_destroy (&msg);
223         return NULL;
224     }
225 
226     //  Size and data
227     if (zmsg_size (msg) != 2) {
228         zmsg_destroy (&msg);
229         return NULL;
230     }
231 
232     size_t size = *((size_t *)zframe_data (zmsg_first (msg)));
233     char *string = (char *) malloc (size + 1);
234     if (string) {
235         zframe_t *data_frame = zmsg_next (msg);
236         int rc = LZ4_decompress_safe ((char *)zframe_data (data_frame),
237             string, zframe_size (data_frame), size);
238         string [size] = 0;
239         if (rc < 0) {
240             zstr_free (&string);
241         }
242     }
243     zmsg_destroy (&msg);
244     return string;
245 #endif
246 }
247 
248 
249 //  --------------------------------------------------------------------------
250 //  Send a C string to a socket, as a frame. The string is sent without
251 //  trailing null byte; to read this you can use zstr_recv, or a similar
252 //  method that adds a null terminator on the received string. String
253 //  may be NULL, which is sent as "".
254 
255 int
zstr_send(void * dest,const char * string)256 zstr_send (void *dest, const char *string)
257 {
258     assert (dest);
259     return s_send_string (dest, false, string? (char *) string: "", false);
260 }
261 
262 
263 //  --------------------------------------------------------------------------
264 //  Send a C string to a socket, as zstr_send(), with a MORE flag, so that
265 //  you can send further strings in the same multi-part message. String
266 //  may be NULL, which is sent as "".
267 
268 int
zstr_sendm(void * dest,const char * string)269 zstr_sendm (void *dest, const char *string)
270 {
271     assert (dest);
272     assert (string);
273     return s_send_string (dest, true, (char *) string, false);
274 }
275 
276 
277 //  --------------------------------------------------------------------------
278 //  Send a formatted string to a socket. Note that you should NOT use
279 //  user-supplied strings in the format (they may contain '%' which
280 //  will create security holes).
281 
282 int
zstr_sendf(void * dest,const char * format,...)283 zstr_sendf (void *dest, const char *format, ...)
284 {
285     assert (dest);
286     assert (format);
287 
288     va_list argptr;
289     va_start (argptr, format);
290     char *string = zsys_vprintf (format, argptr);
291     if (!string)
292         return -1;
293 
294     va_end (argptr);
295 
296     int rc = s_send_string (dest, false, string, false);
297     zstr_free (&string);
298     return rc;
299 }
300 
301 
302 //  --------------------------------------------------------------------------
303 //  Send a formatted string to a socket, as for zstr_sendf(), with a
304 //  MORE flag, so that you can send further strings in the same multi-part
305 //  message.
306 
307 int
zstr_sendfm(void * dest,const char * format,...)308 zstr_sendfm (void *dest, const char *format, ...)
309 {
310     assert (dest);
311     assert (format);
312 
313     va_list argptr;
314     va_start (argptr, format);
315     char *string = zsys_vprintf (format, argptr);
316     if (!string)
317         return -1;
318 
319     va_end (argptr);
320 
321     int rc = s_send_string (dest, true, string, false);
322     zstr_free (&string);
323     return rc;
324 }
325 
326 
327 //  --------------------------------------------------------------------------
328 //  Send a series of strings (until NULL) as multipart data
329 //  Returns 0 if the strings could be sent OK, or -1 on error.
330 
331 int
zstr_sendx(void * dest,const char * string,...)332 zstr_sendx (void *dest, const char *string, ...)
333 {
334     zmsg_t *msg = zmsg_new ();
335     if (!msg)
336         return -1;
337     va_list args;
338     va_start (args, string);
339     while (string) {
340         zmsg_addstr (msg, string);
341         string = va_arg (args, char *);
342     }
343     va_end (args);
344 
345 #if defined (ZMQ_SERVER)
346     //  Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
347     if (zsock_is (dest) && zsock_type (dest) == ZMQ_SERVER)
348         zmsg_set_routing_id (msg, zsock_routing_id ((zsock_t *) dest));
349 #endif
350     if (zmsg_send (&msg, dest) < 0) {
351         zmsg_destroy(&msg);
352         return -1;
353     }
354     else
355         return 0;
356 }
357 
358 //  Compress and send a C string to a socket, as a message with two frames:
359 //  size of the uncompressed string, and the string itself. The string is
360 //  sent without trailing null byte; to read this you can use
361 //  zstr_recv_compress, or a similar method that de-compresses and adds a
362 //  null terminator on the received string.
363 int
zstr_send_compress(void * dest,const char * string)364 zstr_send_compress (void *dest, const char *string)
365 {
366     assert (dest);
367     return s_send_string (dest, false, (char *) string, true);
368 }
369 
370 //  Compress and send a C string to a socket, as zstr_send_compress(),
371 //  with a MORE flag, so that you can send further strings in the same
372 //  multi-part message.
373 int
zstr_sendm_compress(void * dest,const char * string)374 zstr_sendm_compress (void *dest, const char *string)
375 {
376     assert (dest);
377     assert (string);
378     return s_send_string (dest, true, (char *) string, true);
379 }
380 
381 //  --------------------------------------------------------------------------
382 //  Accepts a void pointer and returns a fresh character string. If source is
383 //  null, returns an empty string.
384 
385 char *
zstr_str(void * source)386 zstr_str (void *source)
387 {
388     if (source)
389         return strdup ((char *) source);
390     else
391         return strdup ("");
392 }
393 
394 
395 //  --------------------------------------------------------------------------
396 //  Free a provided string, and nullify the parent pointer. Safe to call on
397 //  a null pointer.
398 
399 void
zstr_free(char ** string_p)400 zstr_free (char **string_p)
401 {
402     assert (string_p);
403     free (*string_p);
404     *string_p = NULL;
405 }
406 
407 
408 //  --------------------------------------------------------------------------
409 //  DEPRECATED as poor style -- callers should use zloop or zpoller
410 //  Receive C string from socket, if socket had input ready. Caller must
411 //  free returned string using zstr_free. Returns NULL if there was no input
412 //  waiting, or if the context was terminated. Use zsys_interrupted to exit
413 //  any loop that relies on this method.
414 
415 char *
zstr_recv_nowait(void * dest)416 zstr_recv_nowait (void *dest)
417 {
418     assert (dest);
419     void *handle = zsock_resolve (dest);
420 
421     zmq_msg_t message;
422     zmq_msg_init (&message);
423     if (zmq_recvmsg (handle, &message, ZMQ_DONTWAIT) < 0)
424         return NULL;
425 
426     size_t size = zmq_msg_size (&message);
427     char *string = (char *) malloc (size + 1);
428     if (string) {
429         memcpy (string, zmq_msg_data (&message), size);
430         string [size] = 0;
431     }
432     zmq_msg_close (&message);
433     return string;
434 }
435 
436 
437 //  --------------------------------------------------------------------------
438 //  Selftest
439 
440 void
zstr_test(bool verbose)441 zstr_test (bool verbose)
442 {
443     printf (" * zstr: ");
444 
445     //  @selftest
446     //  Create two PAIR sockets and connect over inproc
447     zsock_t *output = zsock_new_pair ("@inproc://zstr.test");
448     assert (output);
449     zsock_t *input = zsock_new_pair (">inproc://zstr.test");
450     assert (input);
451 
452     //  Send ten strings, five strings with MORE flag and then END
453     int string_nbr;
454     for (string_nbr = 0; string_nbr < 10; string_nbr++)
455         zstr_sendf (output, "this is string %d", string_nbr);
456     zstr_sendx (output, "This", "is", "almost", "the", "very", "END", NULL);
457 
458     //  Read and count until we receive END
459     string_nbr = 0;
460     for (string_nbr = 0;; string_nbr++) {
461         char *string = zstr_recv (input);
462         assert (string);
463         if (streq (string, "END")) {
464             zstr_free (&string);
465             break;
466         }
467         zstr_free (&string);
468     }
469     assert (string_nbr == 15);
470 
471 #ifdef HAVE_LIBLZ4
472     int ret = zstr_send_compress (output, "loooong");
473     assert (ret == 0);
474     char *string = zstr_recv_compress (input);
475     assert (string);
476     assert (streq (string, "loooong"));
477     zstr_free (&string);
478 
479     zstr_send_compress (output, "loooong");
480     assert (ret == 0);
481     zmsg_t *msg = zmsg_recv (input);
482     assert (msg);
483     assert (*((size_t *)zframe_data (zmsg_first (msg))) == strlen ("loooong"));
484     zmsg_destroy (&msg);
485 #endif
486 
487     zsock_destroy (&input);
488     zsock_destroy (&output);
489 
490 #if defined (ZMQ_SERVER)
491     //  Test SERVER/CLIENT over zstr
492     zsock_t *server = zsock_new_server ("inproc://zstr-test-routing");
493     zsock_t *client = zsock_new_client ("inproc://zstr-test-routing");;
494     assert (server);
495     assert (client);
496 
497     //  Try normal ping-pong to check reply routing ID
498     int rc = zstr_send (client, "Hello");
499     assert (rc == 0);
500     char *request = zstr_recv (server);
501     assert (streq (request, "Hello"));
502     assert (zsock_routing_id (server));
503     freen (request);
504 
505     rc = zstr_send (server, "World");
506     assert (rc == 0);
507     char *reply = zstr_recv (client);
508     assert (streq (reply, "World"));
509     freen (reply);
510 
511     rc = zstr_sendf (server, "%s", "World");
512     assert (rc == 0);
513     reply = zstr_recv (client);
514     assert (streq (reply, "World"));
515     freen (reply);
516 
517     //  Try ping-pong using sendx and recx
518     rc = zstr_sendx (client, "Hello", NULL);
519     assert (rc == 0);
520     rc = zstr_recvx (server, &request, NULL);
521     assert (rc >= 0);
522     assert (streq (request, "Hello"));
523     freen (request);
524 
525     rc = zstr_sendx (server, "World", NULL);
526     assert (rc == 0);
527     rc = zstr_recvx (client, &reply, NULL);
528     assert (rc >= 0);
529     assert (streq (reply, "World"));
530     freen (reply);
531 
532     //  Client and server disallow multipart
533     rc = zstr_sendm (client, "Hello");
534     assert (rc == -1);
535     rc = zstr_sendm (server, "World");
536     assert (rc == -1);
537 
538     zsock_destroy (&client);
539     zsock_destroy (&server);
540 #endif
541 
542 #if defined (__WINDOWS__)
543     zsys_shutdown();
544 #endif
545     //  @end
546 
547     printf ("OK\n");
548 }
549