1 /*  =========================================================================
2     zframe - working with single message frames
3 
4     Copyright (c) the Contributors as noted in the AUTHORS file.
5     This file is part of CZMQ, the high-level C binding for 0MQ:
6     http://czmq.zeromq.org.
7 
8     This Source Code Form is subject to the terms of the Mozilla Public
9     License, v. 2.0. If a copy of the MPL was not distributed with this
10     file, You can obtain one at http://mozilla.org/MPL/2.0/.
11     =========================================================================
12 */
13 
14 /*
15 @header
16     The zframe class provides methods to send and receive single message
17     frames across 0MQ sockets. A 'frame' corresponds to one zmq_msg_t. When
18     you read a frame from a socket, the zframe_more() method indicates if the
19     frame is part of an unfinished multipart message. The zframe_send method
20     normally destroys the frame, but with the ZFRAME_REUSE flag, you can send
21     the same frame many times. Frames are binary, and this class has no
22     special support for text data.
23 @discuss
24 @end
25 */
26 
27 #include "czmq_classes.h"
28 
29 //  zframe_t instances always have this tag as the first 4 octets of
30 //  their data, which lets us do runtime object typing & validation.
31 #define ZFRAME_TAG              0xcafe0002
32 
33 //  Structure of our class
34 
35 struct _zframe_t {
36     uint32_t tag;               //  Object tag for runtime detection
37     zmq_msg_t zmsg;             //  zmq_msg_t blob for frame
38     int more;                   //  More flag, from last read
39     uint32_t routing_id;        //  Routing ID back to sender, if any
40 #ifdef ZMQ_GROUP_MAX_LENGTH
41     char group[ZMQ_GROUP_MAX_LENGTH + 1];
42 #endif
43     zframe_destructor_fn *destructor;   //  Destructor for memory
44     void * hint;                        //  Hint for destroying the memory
45 };
46 
47 //  --------------------------------------------------------------------------
48 //  Constructor; if size is >0, allocates frame with that size, and if data
49 //  is not null, copies data into frame.
50 
51 zframe_t *
zframe_new(const void * data,size_t size)52 zframe_new (const void *data, size_t size)
53 {
54     zframe_t *self = (zframe_t *) zmalloc (sizeof (zframe_t));
55     assert (self);
56     self->tag = ZFRAME_TAG;
57     if (size) {
58         //  Catch heap exhaustion in this specific case
59         if (zmq_msg_init_size (&self->zmsg, size)) {
60             zframe_destroy (&self);
61             return NULL;
62         }
63         if (data)
64             memcpy (zmq_msg_data (&self->zmsg), data, size);
65     }
66     else
67         zmq_msg_init (&self->zmsg);
68 
69     return self;
70 }
71 
72 
73 //  --------------------------------------------------------------------------
74 //  Create an empty (zero-sized) frame. The caller is responsible for
75 //  destroying the return value when finished with it.
76 
77 zframe_t *
zframe_new_empty(void)78 zframe_new_empty (void)
79 {
80     zframe_t *self = (zframe_t *) zmalloc (sizeof (zframe_t));
81     assert (self);
82     self->tag = ZFRAME_TAG;
83     zmq_msg_init (&self->zmsg);
84     return self;
85 }
86 
87 
88 //  --------------------------------------------------------------------------
89 //  Destructor
90 
91 void
zframe_destroy(zframe_t ** self_p)92 zframe_destroy (zframe_t **self_p)
93 {
94     assert (self_p);
95     if (*self_p) {
96         zframe_t *self = *self_p;
97         assert (zframe_is (self));
98         zmq_msg_close (&self->zmsg);
99         self->tag = 0xDeadBeef;
100         freen (self);
101 
102         *self_p = NULL;
103     }
104 }
105 
106 
107 //  --------------------------------------------------------------------------
108 //  Create a frame with a specified string content.
109 //  The caller is responsible for destroying the return value when finished with it.
110 
111 zframe_t *
zframe_from(const char * string)112 zframe_from (const char *string)
113 {
114     return zframe_new (string, strlen (string));
115 }
116 
117 //  --------------------------------------------------------------------------
118 //  Create a new frame from memory. Taking ownership of the memory and calling the destructor
119 //  on destroy.
120 
121 struct zmq_msg_free_arg {
122     zframe_destructor_fn *destructor;
123     void *hint;
124 };
125 
126 static void
zmq_msg_free(void * data,void * hint)127 zmq_msg_free (void *data, void *hint) {
128     struct zmq_msg_free_arg *arg = (struct zmq_msg_free_arg *)hint;
129     arg->destructor (&arg->hint);
130 
131     freen (arg);
132 }
133 
134 zframe_t *
zframe_frommem(void * data,size_t size,zframe_destructor_fn destructor,void * hint)135 zframe_frommem (void *data, size_t size, zframe_destructor_fn destructor, void *hint) {
136     assert (data);
137 
138     zframe_t *self = (zframe_t *) zmalloc (sizeof (zframe_t));
139     assert (self);
140     self->tag = ZFRAME_TAG;
141     self->destructor = destructor;
142     self->hint = hint;
143 
144     struct zmq_msg_free_arg *arg = (struct zmq_msg_free_arg*) zmalloc (sizeof (struct zmq_msg_free_arg));
145     arg->destructor = destructor;
146     arg->hint = hint;
147 
148     //  Catch heap exhaustion in this specific case
149     if (zmq_msg_init_data (&self->zmsg, data, size, zmq_msg_free, arg)) {
150         freen (arg);
151         zframe_destroy (&self);
152         return NULL;
153     }
154 
155     return self;
156 }
157 
158 
159 //  --------------------------------------------------------------------------
160 //  Receive frame from socket, returns zframe_t object or NULL if the recv
161 //  was interrupted. Does a blocking recv, if you want to not block then use
162 //  zpoller or zloop.
163 
164 zframe_t *
zframe_recv(void * source)165 zframe_recv (void *source)
166 {
167     assert (source);
168     void *handle = zsock_resolve (source);
169     zframe_t *self = zframe_new (NULL, 0);
170     assert (self);
171 
172     if (zmq_recvmsg (handle, &self->zmsg, 0) < 0) {
173         zframe_destroy (&self);
174         return NULL;            //  Interrupted or terminated
175     }
176     self->more = zsock_rcvmore (source);
177 #if defined (ZMQ_SERVER)
178     //  Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
179     if (zsock_type (source) == ZMQ_SERVER)
180         self->routing_id = zmq_msg_routing_id (&self->zmsg);
181 #endif
182 #if defined (ZMQ_DISH)
183     //  Grab group if we're reading from a DISH Socket (ZMQ 4.2 and later)
184     if (zsock_type (source) == ZMQ_DISH)
185         strcpy (self->group, zmq_msg_group (&self->zmsg));
186 #endif
187     return self;
188 }
189 
190 
191 //  --------------------------------------------------------------------------
192 //  Send frame to socket, destroy after sending unless ZFRAME_REUSE is
193 //  set or the attempt to send the message errors out.
194 
195 int
zframe_send(zframe_t ** self_p,void * dest,int flags)196 zframe_send (zframe_t **self_p, void *dest, int flags)
197 {
198     assert (dest);
199     assert (self_p);
200 
201     void *handle = zsock_resolve (dest);
202     if (*self_p) {
203         zframe_t *self = *self_p;
204         assert (zframe_is (self));
205 
206         int send_flags = (flags & ZFRAME_MORE)? ZMQ_SNDMORE: 0;
207         send_flags |= (flags & ZFRAME_DONTWAIT)? ZMQ_DONTWAIT: 0;
208         if (flags & ZFRAME_REUSE) {
209             zmq_msg_t copy;
210             zmq_msg_init (&copy);
211             if (zmq_msg_copy (&copy, &self->zmsg))
212                 return -1;
213 #if defined (ZMQ_SERVER)
214             if (zsock_type (dest) == ZMQ_SERVER)
215                 zmq_msg_set_routing_id (&copy, self->routing_id);
216 #endif
217 #if defined (ZMQ_RADIO)
218             if (zsock_type (dest) == ZMQ_RADIO)
219                 zmq_msg_set_group (&copy, self->group);
220 #endif
221             if (zmq_sendmsg (handle, &copy, send_flags) == -1) {
222                 zmq_msg_close (&copy);
223                 return -1;
224             }
225         }
226         else {
227 #if defined (ZMQ_SERVER)
228             if (zsock_type (dest) == ZMQ_SERVER)
229                 zmq_msg_set_routing_id (&self->zmsg, self->routing_id);
230 #endif
231 #if defined (ZMQ_RADIO)
232             if (zsock_type (dest) == ZMQ_RADIO)
233                 zmq_msg_set_group (&self->zmsg, self->group);
234 #endif
235             if (zmq_sendmsg (handle, &self->zmsg, send_flags) >= 0)
236                 zframe_destroy (self_p);
237             else
238                 return -1;
239         }
240     }
241     return 0;
242 }
243 
244 
245 //  --------------------------------------------------------------------------
246 //  Return size of frame.
247 
248 size_t
zframe_size(zframe_t * self)249 zframe_size (zframe_t *self)
250 {
251     assert (self);
252     assert (zframe_is (self));
253 
254     return zmq_msg_size (&self->zmsg);
255 }
256 
257 
258 //  --------------------------------------------------------------------------
259 //  Return pointer to frame data.
260 
261 byte *
zframe_data(zframe_t * self)262 zframe_data (zframe_t *self)
263 {
264     assert (self);
265     assert (zframe_is (self));
266 
267     return (byte *) zmq_msg_data (&self->zmsg);
268 }
269 
270 
271 //  --------------------------------------------------------------------------
272 //  Return meta data property for frame.
273 //  The caller shall not modify or free the returned value, which shall be
274 //  owned by the message.
275 
276 const char *
zframe_meta(zframe_t * self,const char * property)277 zframe_meta (zframe_t *self, const char *property)
278 {
279 #if (ZMQ_VERSION >= ZMQ_MAKE_VERSION (4, 1, 0))
280     assert (self);
281     assert (zframe_is (self));
282 
283     return zmq_msg_gets (&self->zmsg, property);
284 #else
285     return NULL;
286 #endif
287 }
288 
289 
290 //  --------------------------------------------------------------------------
291 //  Create a new frame that duplicates an existing frame. If frame is null,
292 //  or memory was exhausted, returns null.
293 
294 zframe_t *
zframe_dup(zframe_t * self)295 zframe_dup (zframe_t *self)
296 {
297     if (self) {
298         assert (zframe_is (self));
299         return zframe_new (zframe_data (self), zframe_size (self));
300     }
301     else
302         return NULL;
303 }
304 
305 
306 //  --------------------------------------------------------------------------
307 //  Return frame data encoded as printable hex string, useful for 0MQ UUIDs.
308 //  Caller must free string when finished with it.
309 
310 char *
zframe_strhex(zframe_t * self)311 zframe_strhex (zframe_t *self)
312 {
313     assert (self);
314     assert (zframe_is (self));
315 
316     static const char
317         hex_char [] = "0123456789ABCDEF";
318 
319     size_t size = zframe_size (self);
320     byte *data = zframe_data (self);
321     char *hex_str = (char *) malloc (size * 2 + 1);
322     if (!hex_str)
323         return NULL;
324 
325     uint byte_nbr;
326     for (byte_nbr = 0; byte_nbr < size; byte_nbr++) {
327         hex_str [byte_nbr * 2 + 0] = hex_char [data [byte_nbr] >> 4];
328         hex_str [byte_nbr * 2 + 1] = hex_char [data [byte_nbr] & 15];
329     }
330     hex_str [size * 2] = 0;
331     return hex_str;
332 }
333 
334 
335 //  --------------------------------------------------------------------------
336 //  Return frame data copied into freshly allocated string
337 //  Caller must free string when finished with it.
338 
339 char *
zframe_strdup(zframe_t * self)340 zframe_strdup (zframe_t *self)
341 {
342     assert (self);
343     assert (zframe_is (self));
344 
345     size_t size = zframe_size (self);
346     char *string = (char *) malloc (size + 1);
347     assert (string);
348     memcpy (string, zframe_data (self), size);
349     string [size] = 0;
350     return string;
351 }
352 
353 
354 //  --------------------------------------------------------------------------
355 //  Return true if frame body is equal to string, excluding terminator
356 
357 bool
zframe_streq(zframe_t * self,const char * string)358 zframe_streq (zframe_t *self, const char *string)
359 {
360     assert (self);
361     assert (zframe_is (self));
362 
363     if (zframe_size (self) == strlen (string)
364     && memcmp (zframe_data (self), string, strlen (string)) == 0)
365         return true;
366     else
367         return false;
368 }
369 
370 
371 //  --------------------------------------------------------------------------
372 //  Return frame MORE indicator (1 or 0), set when reading frame from socket
373 //  or by the zframe_set_more() method.
374 
375 int
zframe_more(zframe_t * self)376 zframe_more (zframe_t *self)
377 {
378     assert (self);
379     assert (zframe_is (self));
380 
381     return self->more;
382 }
383 
384 
385 //  --------------------------------------------------------------------------
386 //  Set frame MORE indicator (1 or 0). Note this is NOT used when sending
387 //  frame to socket, you have to specify flag explicitly.
388 
389 void
zframe_set_more(zframe_t * self,int more)390 zframe_set_more (zframe_t *self, int more)
391 {
392     assert (self);
393     assert (zframe_is (self));
394     assert (more == 0 || more == 1);
395 
396     self->more = more;
397 }
398 
399 
400 //  --------------------------------------------------------------------------
401 //  Return frame routing ID, if the frame came from a ZMQ_SERVER socket.
402 //  Else returns zero.
403 
404 uint32_t
zframe_routing_id(zframe_t * self)405 zframe_routing_id (zframe_t *self)
406 {
407     assert (self);
408     assert (zframe_is (self));
409     return self->routing_id;
410 }
411 
412 
413 //  --------------------------------------------------------------------------
414 //  Set routing ID on frame. This is used if/when the frame is sent to a
415 //  ZMQ_SERVER socket.
416 
417 void
zframe_set_routing_id(zframe_t * self,uint32_t routing_id)418 zframe_set_routing_id (zframe_t *self, uint32_t routing_id)
419 {
420     assert (self);
421     assert (zframe_is (self));
422     self->routing_id = routing_id;
423 }
424 
425 
426 //  --------------------------------------------------------------------------
427 //  Return frame group of radio-dish pattern.
428 const char *
zframe_group(zframe_t * self)429 zframe_group (zframe_t *self)
430 {
431     assert (self);
432 #ifdef ZMQ_DISH
433     return self->group;
434 #else
435     return NULL;
436 #endif
437 }
438 
439 
440 //  --------------------------------------------------------------------------
441 //  Set group on frame. This is used if/when the frame is sent to a
442 //  ZMQ_RADIO socket.
443 //  Return -1 on error, 0 on success.
444 int
zframe_set_group(zframe_t * self,const char * group)445 zframe_set_group (zframe_t *self, const char *group)
446 {
447 #ifdef ZMQ_RADIO
448     if (strlen(group) > ZMQ_GROUP_MAX_LENGTH) {
449         errno = EINVAL;
450         return -1;
451     }
452 
453     strcpy (self->group, group);
454     return 0;
455 #else
456     errno = ENOTSUP;
457     return -1;
458 #endif
459 }
460 
461 
462 //  --------------------------------------------------------------------------
463 //  Return true if two frames have identical size and data
464 
465 bool
zframe_eq(zframe_t * self,zframe_t * other)466 zframe_eq (zframe_t *self, zframe_t *other)
467 {
468     if (!self || !other)
469         return false;           //  Tolerate null references here
470     else {
471         assert (zframe_is (self));
472         assert (zframe_is (other));
473 
474         if (zframe_size (self) == zframe_size (other)
475         &&  memcmp (zframe_data (self),
476                     zframe_data (other),
477                     zframe_size (self)) == 0)
478             return true;
479         else
480             return false;
481     }
482 }
483 
484 
485 //  --------------------------------------------------------------------------
486 //  Set new contents for frame
487 
488 void
zframe_reset(zframe_t * self,const void * data,size_t size)489 zframe_reset (zframe_t *self, const void *data, size_t size)
490 {
491     assert (self);
492     assert (zframe_is (self));
493     assert (data);
494 
495     zmq_msg_close (&self->zmsg);
496     zmq_msg_init_size (&self->zmsg, size);
497     memcpy (zmq_msg_data (&self->zmsg), data, size);
498 }
499 
500 
501 //  --------------------------------------------------------------------------
502 //  Send message to zsys log sink (may be stdout, or system facility as
503 //  configured by zsys_set_logstream). Prefix shows before frame, if not null.
504 
505 //  Send the specified number of chars.
506 //  If len is 0, then log 35 hex or 70 chars with ellipsis.
507 //  If 0 < len < frame size,
508 //  then log in blocks of 35 or 70 up to the specified length.
509 //
510 void
zframe_print_n(zframe_t * self,const char * prefix,size_t length)511 zframe_print_n (zframe_t *self, const char *prefix, size_t length)
512 {
513     assert (self);
514     assert (zframe_is (self));
515 
516     byte *data = zframe_data (self);
517     size_t size = zframe_size (self);
518 
519     //  Probe data to check if it looks like unprintable binary
520     int is_bin = 0;
521     uint char_nbr;
522     for (char_nbr = 0; char_nbr < size; char_nbr++)
523         if (data [char_nbr] < 32 || data [char_nbr] > 127)
524             is_bin = 1;
525 
526     char buffer [256] = "";
527     size_t max_size = is_bin? 35: 70;
528     const char *ellipsis = "";
529 
530     // backwards compatibility
531     if (!length) {
532         if (size > max_size) {
533             size = max_size;
534             ellipsis = "...";
535         }
536 
537 	length = size;
538     }
539     if (length > size)
540         length = size;
541 
542     for (char_nbr = 0; char_nbr < length; char_nbr++) {
543         if (!(char_nbr % max_size)) {
544             if (char_nbr) zsys_debug (buffer);
545             snprintf (buffer, 30, "%s[%03d] ", prefix? prefix: "", (int) size);
546         }
547         if (is_bin)
548             sprintf (buffer + strlen (buffer), "%02X", (unsigned char) data [char_nbr]);
549         else
550             sprintf (buffer + strlen (buffer), "%c", data [char_nbr]);
551     }
552     strcat (buffer, ellipsis);
553     zsys_debug (buffer);
554 }
555 
556 void
zframe_print(zframe_t * self,const char * prefix)557 zframe_print (zframe_t *self, const char *prefix)
558 {
559     zframe_print_n (self, prefix, 0);
560 }
561 
562 
563 //  --------------------------------------------------------------------------
564 //  Probe the supplied object, and report if it looks like a zframe_t.
565 
566 bool
zframe_is(void * self)567 zframe_is (void *self)
568 {
569     assert (self);
570     return ((zframe_t *) self)->tag == ZFRAME_TAG;
571 }
572 
573 
574 //  --------------------------------------------------------------------------
575 //  DEPRECATED as poor style -- callers should use zloop or zpoller
576 //  Receive a new frame off the socket. Returns newly allocated frame, or
577 //  NULL if there was no input waiting, or if the read was interrupted.
578 
579 zframe_t *
zframe_recv_nowait(void * source)580 zframe_recv_nowait (void *source)
581 {
582     assert (source);
583     void *handle = zsock_resolve (source);
584 
585     zframe_t *self = zframe_new (NULL, 0);
586     assert (self);
587     if (zmq_recvmsg (handle, &self->zmsg, ZMQ_DONTWAIT) < 0) {
588         zframe_destroy (&self);
589         return NULL;            //  Interrupted or terminated
590     }
591     self->more = zsock_rcvmore (source);
592 #if defined (ZMQ_SERVER)
593     //  Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
594     if (zsock_type (source) == ZMQ_SERVER)
595         self->routing_id = zmq_msg_routing_id (&self->zmsg);
596 #endif
597 #if defined (ZMQ_DISH)
598     //  Grab group if we're reading from a DISH Socket (ZMQ 4.2 and later)
599     if (zsock_type (source) == ZMQ_DISH)
600         strcpy (self->group, zmq_msg_group (&self->zmsg));
601 #endif
602     return self;
603 }
604 
605 
606 //  --------------------------------------------------------------------------
607 //  DEPRECATED as inconsistent; breaks principle that logging should all go
608 //  to a single destination.
609 //  Print contents of frame to FILE stream, prefix is ignored if null.
610 
611 void
zframe_fprint(zframe_t * self,const char * prefix,FILE * file)612 zframe_fprint (zframe_t *self, const char *prefix, FILE *file)
613 {
614     assert (self);
615     assert (zframe_is (self));
616 
617     if (prefix)
618         fprintf (file, "%s", prefix);
619     byte *data = zframe_data (self);
620     size_t size = zframe_size (self);
621 
622     int is_bin = 0;
623     uint char_nbr;
624     for (char_nbr = 0; char_nbr < size; char_nbr++)
625         if (data [char_nbr] < 9 || data [char_nbr] > 127)
626             is_bin = 1;
627 
628     fprintf (file, "[%03d] ", (int) size);
629     size_t max_size = is_bin? 35: 70;
630     const char *ellipsis = "";
631     if (size > max_size) {
632         size = max_size;
633         ellipsis = "...";
634     }
635     for (char_nbr = 0; char_nbr < size; char_nbr++) {
636         if (is_bin)
637             fprintf (file, "%02X", (unsigned char) data [char_nbr]);
638         else
639             fprintf (file, "%c", data [char_nbr]);
640     }
641     fprintf (file, "%s\n", ellipsis);
642 }
643 
644 
645 //  --------------------------------------------------------------------------
646 //  Selftest
647 
648 static void
mem_destructor(void ** hint)649 mem_destructor (void **hint) {
650     strcpy ((char*)*hint, "world");
651 }
652 
653 
654 void
zframe_test(bool verbose)655 zframe_test (bool verbose)
656 {
657     printf (" * zframe: ");
658     int rc;
659     zframe_t* frame;
660 
661     //  @selftest
662     //  Create two PAIR sockets and connect over TCP
663     zsock_t *output = zsock_new (ZMQ_PAIR);
664     assert (output);
665     int port = zsock_bind (output, "tcp://127.0.0.1:*");
666     assert (port != -1);
667     zsock_t *input = zsock_new (ZMQ_PAIR);
668     assert (input);
669     rc = zsock_connect (input, "tcp://127.0.0.1:%d", port);
670     assert (rc != -1);
671 
672     //  Send five different frames, test ZFRAME_MORE
673     int frame_nbr;
674     for (frame_nbr = 0; frame_nbr < 5; frame_nbr++) {
675         frame = zframe_new ("Hello", 5);
676         assert (frame);
677         rc = zframe_send (&frame, output, ZFRAME_MORE);
678         assert (rc == 0);
679     }
680     //  Send same frame five times, test ZFRAME_REUSE
681     frame = zframe_new ("Hello", 5);
682     assert (frame);
683     for (frame_nbr = 0; frame_nbr < 5; frame_nbr++) {
684         rc = zframe_send (&frame, output, ZFRAME_MORE + ZFRAME_REUSE);
685         assert (rc == 0);
686     }
687     assert (frame);
688     zframe_t *copy = zframe_dup (frame);
689     assert (zframe_eq (frame, copy));
690     zframe_destroy (&frame);
691     assert (!zframe_eq (frame, copy));
692     assert (zframe_size (copy) == 5);
693     zframe_destroy (&copy);
694     assert (!zframe_eq (frame, copy));
695 
696     //  Test zframe_new_empty
697     frame = zframe_new_empty ();
698     assert (frame);
699     assert (zframe_size (frame) == 0);
700     zframe_destroy (&frame);
701 
702     //  Send END frame
703     frame = zframe_new ("NOT", 3);
704     assert (frame);
705     zframe_reset (frame, "END", 3);
706     char *string = zframe_strhex (frame);
707     assert (streq (string, "454E44"));
708     freen (string);
709     string = zframe_strdup (frame);
710     assert (streq (string, "END"));
711     freen (string);
712     rc = zframe_send (&frame, output, 0);
713     assert (rc == 0);
714 
715     //  Read and count until we receive END
716     frame_nbr = 0;
717     for (frame_nbr = 0;; frame_nbr++) {
718         zframe_t *frame = zframe_recv (input);
719         if (zframe_streq (frame, "END")) {
720             zframe_destroy (&frame);
721             break;
722         }
723         assert (zframe_more (frame));
724         zframe_set_more (frame, 0);
725         assert (zframe_more (frame) == 0);
726         zframe_destroy (&frame);
727     }
728     assert (frame_nbr == 10);
729 
730 #if (ZMQ_VERSION >= ZMQ_MAKE_VERSION (4, 1, 0))
731     // Test zframe_meta
732     frame = zframe_new ("Hello", 5);
733     assert (frame);
734     rc = zframe_send (&frame, output, 0);
735     assert (rc == 0);
736     frame = zframe_recv (input);
737     const char *meta = zframe_meta (frame, "Socket-Type");
738     assert (meta != NULL);
739     assert (streq (meta, "PAIR"));
740     assert (zframe_meta (frame, "nonexistent") == NULL);
741     zframe_destroy (&frame);
742 #endif
743 
744     zsock_destroy (&input);
745     zsock_destroy (&output);
746 
747 #if defined (ZMQ_SERVER)
748     //  Create server and client sockets and connect over inproc
749     zsock_t *server = zsock_new_server ("inproc://zframe-test-routing");
750     assert (server);
751     zsock_t *client = zsock_new_client ("inproc://zframe-test-routing");
752     assert (client);
753 
754     //  Send request from client to server
755     zframe_t *request = zframe_new ("Hello", 5);
756     assert (request);
757     rc = zframe_send (&request, client, 0);
758     assert (rc == 0);
759     assert (!request);
760 
761     //  Read request and send reply
762     request = zframe_recv (server);
763     assert (request);
764     assert (zframe_streq (request, "Hello"));
765     assert (zframe_routing_id (request));
766 
767     zframe_t *reply = zframe_new ("World", 5);
768     assert (reply);
769     zframe_set_routing_id (reply, zframe_routing_id (request));
770     rc = zframe_send (&reply, server, 0);
771     assert (rc == 0);
772     zframe_destroy (&request);
773 
774     //  Read reply
775     reply = zframe_recv (client);
776     assert (zframe_streq (reply, "World"));
777     assert (zframe_routing_id (reply) == 0);
778     zframe_destroy (&reply);
779 
780     //  Client and server disallow multipart
781     frame = zframe_new ("Hello", 5);
782     rc = zframe_send (&frame, client, ZFRAME_MORE);
783     assert (rc == -1);
784     rc = zframe_send (&frame, server, ZFRAME_MORE);
785     assert (rc == -1);
786     zframe_destroy (&frame);
787 
788     zsock_destroy (&client);
789     zsock_destroy (&server);
790 #endif
791 
792 #ifdef ZMQ_RADIO
793     //  Create radio and dish sockets and connect over inproc
794     zsock_t *radio = zsock_new_radio ("inproc://zframe-test-radio");
795     assert (radio);
796     zsock_t *dish = zsock_new_dish ("inproc://zframe-test-radio");
797     assert (dish);
798 
799     //  Join the group
800     rc = zsock_join (dish, "World");
801     assert (rc == 0);
802 
803     //  Publish message from radio
804     zframe_t *message = zframe_new ("Hello", 5);
805     assert (message);
806     rc = zframe_set_group (message, "World");
807     assert (rc == 0);
808     rc = zframe_send (&message, radio, 0);
809     assert (rc == 0);
810     assert (!message);
811 
812     //  Receive the message from dish
813     message = zframe_recv (dish);
814     assert (message);
815     assert (zframe_streq (message, "Hello"));
816     assert (strcmp("World", zframe_group (message)) == 0);
817     zframe_destroy (&message);
818 
819     zsock_destroy (&dish);
820     zsock_destroy (&radio);
821 #else
822     frame = zframe_new ("Hello", 5);
823     rc = zframe_set_group (frame, "World");
824     assert(rc == -1);
825     assert(errno == ENOTSUP);
826     zframe_destroy (&frame);
827 #endif
828 
829     char str[] = "hello";
830     frame = zframe_frommem (str, 5, mem_destructor, str);
831     assert (frame);
832     zframe_destroy (&frame);
833 
834     //  The destructor doesn't free the memory, only changing the strid,
835     //  so we can check if the destructor was invoked
836     assert (streq (str, "world"));
837 
838 
839     // zframe_print tests
840 
841     zsys_set_logstream(verbose ? stdout : NULL);
842 
843     // == No data ==
844     frame = zframe_new ("", 0);
845 
846     // no prefix, backwards compatible
847     //  - emits nothing but the timestamp
848     zframe_print (frame, "");
849     zframe_print_n (frame, "", 0);
850 
851     // prefix, backwards compatible
852     //  - emits nothing but the timestamp
853     zframe_print (frame, "Prefix");
854     zframe_print_n (frame, "Prefix", 0);
855 
856     // len > data
857     //  - emits nothing but the timestamp
858     zframe_print_n (frame, "", 15);
859     zframe_print_n (frame, "Prefix", 15);
860 
861     // max len
862     //  - emits nothing but the timestamp
863     zframe_print_n (frame, "", -1);
864     zframe_print_n (frame, "Prefix", -1);
865 
866     zframe_destroy (&frame);
867 
868 
869     // == Short data ==
870     frame = zframe_new ("Hello there!", 12);
871 
872     // no prefix, backwards compatible: ellipsis
873     //  - "[012] Hello there!"
874     zframe_print (frame, "");
875     zframe_print_n (frame, "", 0);
876 
877     // prefix, backwards compatible: ellipsis
878     //  - "Prefix[012] Hello there!"
879     zframe_print (frame, "Prefix");
880     zframe_print_n (frame, "Prefix", 0);
881 
882     // len < data
883     //  - "[012] Hello"
884     //  - "Prefix[012] Hello"
885     zframe_print_n (frame, "", 5);
886     zframe_print_n (frame, "Prefix", 5);
887 
888     // len > data
889     //  - "[012] Hello there!"
890     //  - "Prefix[012] Hello there!"
891     zframe_print_n (frame, "", 15);
892     zframe_print_n (frame, "Prefix", 15);
893 
894     // max len
895     //  - "[012] Hello there!"
896     //  - "Prefix[012] Hello there!"
897     zframe_print_n (frame, "", -1);
898     zframe_print_n (frame, "Prefix", -1);
899 
900     zframe_destroy (&frame);
901 
902 
903     // == Long data ==
904     frame = zframe_new ("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus ligula et aliquam tristique. Phasellus consequat, enim et blandit varius, sapien diam faucibus lorem, non ultricies lacus turpis sed lectus. Vivamus id elit urna. In sit amet lacinia mauris. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Integer ut cursus diam. Vestibulum semper vel leo eu finibus. Ut urna magna, commodo vel auctor sed, eleifend quis lacus. Aenean quis ipsum et velit auctor ultrices.", 519);
905 
906     // no prefix, backwards compatible: ellipsis
907     //  - "[070] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus..."
908     zframe_print (frame, "");
909     zframe_print_n (frame, "", 0);
910 
911     // prefix, backwards compatible: ellipsis
912     //  - "Prefix[070] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus..."
913     zframe_print (frame, "Prefix");
914     zframe_print_n (frame, "Prefix", 0);
915 
916     // len < data
917     //  - "[519] Lorem"
918     //  - "Prefix[519] Lorem"
919     zframe_print_n (frame, "", 5);
920     zframe_print_n (frame, "Prefix", 5);
921 
922     // small len
923     //  - "[519] Lorem ipsum dolor sit amet"
924     //  - "Prefix[519] Lorem ipsum dolor sit amet"
925     zframe_print_n (frame, "", 26);
926     zframe_print_n (frame, "Prefix", 26);
927 
928     // mid len
929     // - "[519] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus"
930     //   "[519]  ligula et aliquam tristique. Phasellus consequat, enim et blandit var"
931     //   "[519] ius, sapie"
932     // - "Prefix[519] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus"
933     //   "Prefix[519]  ligula et aliquam tristique. Phasellus consequat, enim et blandit var"
934     //   "Prefix[519] ius, sapie"
935     zframe_print_n (frame, "", 150);
936     zframe_print_n (frame, "Prefix", 150);
937 
938     // len > data
939     //  - emits the whole paragraph
940     zframe_print_n (frame, "", 1500);
941     zframe_print_n (frame, "Prefix", 1500);
942 
943     // max len
944     //  - emits the whole paragraph
945     zframe_print_n (frame, "", -1);
946     zframe_print_n (frame, "Prefix", -1);
947 
948     zframe_destroy (&frame);
949 
950 #if defined (__WINDOWS__)
951     zsys_shutdown();
952 #endif
953 
954     //  @end
955     printf ("OK\n");
956 }
957