1 /* =========================================================================
2 zframe - working with single message frames
3
4 Copyright (c) the Contributors as noted in the AUTHORS file.
5 This file is part of CZMQ, the high-level C binding for 0MQ:
6 http://czmq.zeromq.org.
7
8 This Source Code Form is subject to the terms of the Mozilla Public
9 License, v. 2.0. If a copy of the MPL was not distributed with this
10 file, You can obtain one at http://mozilla.org/MPL/2.0/.
11 =========================================================================
12 */
13
14 /*
15 @header
16 The zframe class provides methods to send and receive single message
17 frames across 0MQ sockets. A 'frame' corresponds to one zmq_msg_t. When
18 you read a frame from a socket, the zframe_more() method indicates if the
19 frame is part of an unfinished multipart message. The zframe_send method
20 normally destroys the frame, but with the ZFRAME_REUSE flag, you can send
21 the same frame many times. Frames are binary, and this class has no
22 special support for text data.
23 @discuss
24 @end
25 */
26
27 #include "czmq_classes.h"
28
29 // zframe_t instances always have this tag as the first 4 octets of
30 // their data, which lets us do runtime object typing & validation.
31 #define ZFRAME_TAG 0xcafe0002
32
33 // Structure of our class
34
35 struct _zframe_t {
36 uint32_t tag; // Object tag for runtime detection
37 zmq_msg_t zmsg; // zmq_msg_t blob for frame
38 int more; // More flag, from last read
39 uint32_t routing_id; // Routing ID back to sender, if any
40 #ifdef ZMQ_GROUP_MAX_LENGTH
41 char group[ZMQ_GROUP_MAX_LENGTH + 1];
42 #endif
43 zframe_destructor_fn *destructor; // Destructor for memory
44 void * hint; // Hint for destroying the memory
45 };
46
47 // --------------------------------------------------------------------------
48 // Constructor; if size is >0, allocates frame with that size, and if data
49 // is not null, copies data into frame.
50
51 zframe_t *
zframe_new(const void * data,size_t size)52 zframe_new (const void *data, size_t size)
53 {
54 zframe_t *self = (zframe_t *) zmalloc (sizeof (zframe_t));
55 assert (self);
56 self->tag = ZFRAME_TAG;
57 if (size) {
58 // Catch heap exhaustion in this specific case
59 if (zmq_msg_init_size (&self->zmsg, size)) {
60 zframe_destroy (&self);
61 return NULL;
62 }
63 if (data)
64 memcpy (zmq_msg_data (&self->zmsg), data, size);
65 }
66 else
67 zmq_msg_init (&self->zmsg);
68
69 return self;
70 }
71
72
73 // --------------------------------------------------------------------------
74 // Create an empty (zero-sized) frame. The caller is responsible for
75 // destroying the return value when finished with it.
76
77 zframe_t *
zframe_new_empty(void)78 zframe_new_empty (void)
79 {
80 zframe_t *self = (zframe_t *) zmalloc (sizeof (zframe_t));
81 assert (self);
82 self->tag = ZFRAME_TAG;
83 zmq_msg_init (&self->zmsg);
84 return self;
85 }
86
87
88 // --------------------------------------------------------------------------
89 // Destructor
90
91 void
zframe_destroy(zframe_t ** self_p)92 zframe_destroy (zframe_t **self_p)
93 {
94 assert (self_p);
95 if (*self_p) {
96 zframe_t *self = *self_p;
97 assert (zframe_is (self));
98 zmq_msg_close (&self->zmsg);
99 self->tag = 0xDeadBeef;
100 freen (self);
101
102 *self_p = NULL;
103 }
104 }
105
106
107 // --------------------------------------------------------------------------
108 // Create a frame with a specified string content.
109 // The caller is responsible for destroying the return value when finished with it.
110
111 zframe_t *
zframe_from(const char * string)112 zframe_from (const char *string)
113 {
114 return zframe_new (string, strlen (string));
115 }
116
117 // --------------------------------------------------------------------------
118 // Create a new frame from memory. Taking ownership of the memory and calling the destructor
119 // on destroy.
120
121 struct zmq_msg_free_arg {
122 zframe_destructor_fn *destructor;
123 void *hint;
124 };
125
126 static void
zmq_msg_free(void * data,void * hint)127 zmq_msg_free (void *data, void *hint) {
128 struct zmq_msg_free_arg *arg = (struct zmq_msg_free_arg *)hint;
129 arg->destructor (&arg->hint);
130
131 freen (arg);
132 }
133
134 zframe_t *
zframe_frommem(void * data,size_t size,zframe_destructor_fn destructor,void * hint)135 zframe_frommem (void *data, size_t size, zframe_destructor_fn destructor, void *hint) {
136 assert (data);
137
138 zframe_t *self = (zframe_t *) zmalloc (sizeof (zframe_t));
139 assert (self);
140 self->tag = ZFRAME_TAG;
141 self->destructor = destructor;
142 self->hint = hint;
143
144 struct zmq_msg_free_arg *arg = (struct zmq_msg_free_arg*) zmalloc (sizeof (struct zmq_msg_free_arg));
145 arg->destructor = destructor;
146 arg->hint = hint;
147
148 // Catch heap exhaustion in this specific case
149 if (zmq_msg_init_data (&self->zmsg, data, size, zmq_msg_free, arg)) {
150 freen (arg);
151 zframe_destroy (&self);
152 return NULL;
153 }
154
155 return self;
156 }
157
158
159 // --------------------------------------------------------------------------
160 // Receive frame from socket, returns zframe_t object or NULL if the recv
161 // was interrupted. Does a blocking recv, if you want to not block then use
162 // zpoller or zloop.
163
164 zframe_t *
zframe_recv(void * source)165 zframe_recv (void *source)
166 {
167 assert (source);
168 void *handle = zsock_resolve (source);
169 zframe_t *self = zframe_new (NULL, 0);
170 assert (self);
171
172 if (zmq_recvmsg (handle, &self->zmsg, 0) < 0) {
173 zframe_destroy (&self);
174 return NULL; // Interrupted or terminated
175 }
176 self->more = zsock_rcvmore (source);
177 #if defined (ZMQ_SERVER)
178 // Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
179 if (zsock_type (source) == ZMQ_SERVER)
180 self->routing_id = zmq_msg_routing_id (&self->zmsg);
181 #endif
182 #if defined (ZMQ_DISH)
183 // Grab group if we're reading from a DISH Socket (ZMQ 4.2 and later)
184 if (zsock_type (source) == ZMQ_DISH)
185 strcpy (self->group, zmq_msg_group (&self->zmsg));
186 #endif
187 return self;
188 }
189
190
191 // --------------------------------------------------------------------------
192 // Send frame to socket, destroy after sending unless ZFRAME_REUSE is
193 // set or the attempt to send the message errors out.
194
195 int
zframe_send(zframe_t ** self_p,void * dest,int flags)196 zframe_send (zframe_t **self_p, void *dest, int flags)
197 {
198 assert (dest);
199 assert (self_p);
200
201 void *handle = zsock_resolve (dest);
202 if (*self_p) {
203 zframe_t *self = *self_p;
204 assert (zframe_is (self));
205
206 int send_flags = (flags & ZFRAME_MORE)? ZMQ_SNDMORE: 0;
207 send_flags |= (flags & ZFRAME_DONTWAIT)? ZMQ_DONTWAIT: 0;
208 if (flags & ZFRAME_REUSE) {
209 zmq_msg_t copy;
210 zmq_msg_init (©);
211 if (zmq_msg_copy (©, &self->zmsg))
212 return -1;
213 #if defined (ZMQ_SERVER)
214 if (zsock_type (dest) == ZMQ_SERVER)
215 zmq_msg_set_routing_id (©, self->routing_id);
216 #endif
217 #if defined (ZMQ_RADIO)
218 if (zsock_type (dest) == ZMQ_RADIO)
219 zmq_msg_set_group (©, self->group);
220 #endif
221 if (zmq_sendmsg (handle, ©, send_flags) == -1) {
222 zmq_msg_close (©);
223 return -1;
224 }
225 }
226 else {
227 #if defined (ZMQ_SERVER)
228 if (zsock_type (dest) == ZMQ_SERVER)
229 zmq_msg_set_routing_id (&self->zmsg, self->routing_id);
230 #endif
231 #if defined (ZMQ_RADIO)
232 if (zsock_type (dest) == ZMQ_RADIO)
233 zmq_msg_set_group (&self->zmsg, self->group);
234 #endif
235 if (zmq_sendmsg (handle, &self->zmsg, send_flags) >= 0)
236 zframe_destroy (self_p);
237 else
238 return -1;
239 }
240 }
241 return 0;
242 }
243
244
245 // --------------------------------------------------------------------------
246 // Return size of frame.
247
248 size_t
zframe_size(zframe_t * self)249 zframe_size (zframe_t *self)
250 {
251 assert (self);
252 assert (zframe_is (self));
253
254 return zmq_msg_size (&self->zmsg);
255 }
256
257
258 // --------------------------------------------------------------------------
259 // Return pointer to frame data.
260
261 byte *
zframe_data(zframe_t * self)262 zframe_data (zframe_t *self)
263 {
264 assert (self);
265 assert (zframe_is (self));
266
267 return (byte *) zmq_msg_data (&self->zmsg);
268 }
269
270
271 // --------------------------------------------------------------------------
272 // Return meta data property for frame.
273 // The caller shall not modify or free the returned value, which shall be
274 // owned by the message.
275
276 const char *
zframe_meta(zframe_t * self,const char * property)277 zframe_meta (zframe_t *self, const char *property)
278 {
279 #if (ZMQ_VERSION >= ZMQ_MAKE_VERSION (4, 1, 0))
280 assert (self);
281 assert (zframe_is (self));
282
283 return zmq_msg_gets (&self->zmsg, property);
284 #else
285 return NULL;
286 #endif
287 }
288
289
290 // --------------------------------------------------------------------------
291 // Create a new frame that duplicates an existing frame. If frame is null,
292 // or memory was exhausted, returns null.
293
294 zframe_t *
zframe_dup(zframe_t * self)295 zframe_dup (zframe_t *self)
296 {
297 if (self) {
298 assert (zframe_is (self));
299 return zframe_new (zframe_data (self), zframe_size (self));
300 }
301 else
302 return NULL;
303 }
304
305
306 // --------------------------------------------------------------------------
307 // Return frame data encoded as printable hex string, useful for 0MQ UUIDs.
308 // Caller must free string when finished with it.
309
310 char *
zframe_strhex(zframe_t * self)311 zframe_strhex (zframe_t *self)
312 {
313 assert (self);
314 assert (zframe_is (self));
315
316 static const char
317 hex_char [] = "0123456789ABCDEF";
318
319 size_t size = zframe_size (self);
320 byte *data = zframe_data (self);
321 char *hex_str = (char *) malloc (size * 2 + 1);
322 if (!hex_str)
323 return NULL;
324
325 uint byte_nbr;
326 for (byte_nbr = 0; byte_nbr < size; byte_nbr++) {
327 hex_str [byte_nbr * 2 + 0] = hex_char [data [byte_nbr] >> 4];
328 hex_str [byte_nbr * 2 + 1] = hex_char [data [byte_nbr] & 15];
329 }
330 hex_str [size * 2] = 0;
331 return hex_str;
332 }
333
334
335 // --------------------------------------------------------------------------
336 // Return frame data copied into freshly allocated string
337 // Caller must free string when finished with it.
338
339 char *
zframe_strdup(zframe_t * self)340 zframe_strdup (zframe_t *self)
341 {
342 assert (self);
343 assert (zframe_is (self));
344
345 size_t size = zframe_size (self);
346 char *string = (char *) malloc (size + 1);
347 assert (string);
348 memcpy (string, zframe_data (self), size);
349 string [size] = 0;
350 return string;
351 }
352
353
354 // --------------------------------------------------------------------------
355 // Return true if frame body is equal to string, excluding terminator
356
357 bool
zframe_streq(zframe_t * self,const char * string)358 zframe_streq (zframe_t *self, const char *string)
359 {
360 assert (self);
361 assert (zframe_is (self));
362
363 if (zframe_size (self) == strlen (string)
364 && memcmp (zframe_data (self), string, strlen (string)) == 0)
365 return true;
366 else
367 return false;
368 }
369
370
371 // --------------------------------------------------------------------------
372 // Return frame MORE indicator (1 or 0), set when reading frame from socket
373 // or by the zframe_set_more() method.
374
375 int
zframe_more(zframe_t * self)376 zframe_more (zframe_t *self)
377 {
378 assert (self);
379 assert (zframe_is (self));
380
381 return self->more;
382 }
383
384
385 // --------------------------------------------------------------------------
386 // Set frame MORE indicator (1 or 0). Note this is NOT used when sending
387 // frame to socket, you have to specify flag explicitly.
388
389 void
zframe_set_more(zframe_t * self,int more)390 zframe_set_more (zframe_t *self, int more)
391 {
392 assert (self);
393 assert (zframe_is (self));
394 assert (more == 0 || more == 1);
395
396 self->more = more;
397 }
398
399
400 // --------------------------------------------------------------------------
401 // Return frame routing ID, if the frame came from a ZMQ_SERVER socket.
402 // Else returns zero.
403
404 uint32_t
zframe_routing_id(zframe_t * self)405 zframe_routing_id (zframe_t *self)
406 {
407 assert (self);
408 assert (zframe_is (self));
409 return self->routing_id;
410 }
411
412
413 // --------------------------------------------------------------------------
414 // Set routing ID on frame. This is used if/when the frame is sent to a
415 // ZMQ_SERVER socket.
416
417 void
zframe_set_routing_id(zframe_t * self,uint32_t routing_id)418 zframe_set_routing_id (zframe_t *self, uint32_t routing_id)
419 {
420 assert (self);
421 assert (zframe_is (self));
422 self->routing_id = routing_id;
423 }
424
425
426 // --------------------------------------------------------------------------
427 // Return frame group of radio-dish pattern.
428 const char *
zframe_group(zframe_t * self)429 zframe_group (zframe_t *self)
430 {
431 assert (self);
432 #ifdef ZMQ_DISH
433 return self->group;
434 #else
435 return NULL;
436 #endif
437 }
438
439
440 // --------------------------------------------------------------------------
441 // Set group on frame. This is used if/when the frame is sent to a
442 // ZMQ_RADIO socket.
443 // Return -1 on error, 0 on success.
444 int
zframe_set_group(zframe_t * self,const char * group)445 zframe_set_group (zframe_t *self, const char *group)
446 {
447 #ifdef ZMQ_RADIO
448 if (strlen(group) > ZMQ_GROUP_MAX_LENGTH) {
449 errno = EINVAL;
450 return -1;
451 }
452
453 strcpy (self->group, group);
454 return 0;
455 #else
456 errno = ENOTSUP;
457 return -1;
458 #endif
459 }
460
461
462 // --------------------------------------------------------------------------
463 // Return true if two frames have identical size and data
464
465 bool
zframe_eq(zframe_t * self,zframe_t * other)466 zframe_eq (zframe_t *self, zframe_t *other)
467 {
468 if (!self || !other)
469 return false; // Tolerate null references here
470 else {
471 assert (zframe_is (self));
472 assert (zframe_is (other));
473
474 if (zframe_size (self) == zframe_size (other)
475 && memcmp (zframe_data (self),
476 zframe_data (other),
477 zframe_size (self)) == 0)
478 return true;
479 else
480 return false;
481 }
482 }
483
484
485 // --------------------------------------------------------------------------
486 // Set new contents for frame
487
488 void
zframe_reset(zframe_t * self,const void * data,size_t size)489 zframe_reset (zframe_t *self, const void *data, size_t size)
490 {
491 assert (self);
492 assert (zframe_is (self));
493 assert (data);
494
495 zmq_msg_close (&self->zmsg);
496 zmq_msg_init_size (&self->zmsg, size);
497 memcpy (zmq_msg_data (&self->zmsg), data, size);
498 }
499
500
501 // --------------------------------------------------------------------------
502 // Send message to zsys log sink (may be stdout, or system facility as
503 // configured by zsys_set_logstream). Prefix shows before frame, if not null.
504
505 // Send the specified number of chars.
506 // If len is 0, then log 35 hex or 70 chars with ellipsis.
507 // If 0 < len < frame size,
508 // then log in blocks of 35 or 70 up to the specified length.
509 //
510 void
zframe_print_n(zframe_t * self,const char * prefix,size_t length)511 zframe_print_n (zframe_t *self, const char *prefix, size_t length)
512 {
513 assert (self);
514 assert (zframe_is (self));
515
516 byte *data = zframe_data (self);
517 size_t size = zframe_size (self);
518
519 // Probe data to check if it looks like unprintable binary
520 int is_bin = 0;
521 uint char_nbr;
522 for (char_nbr = 0; char_nbr < size; char_nbr++)
523 if (data [char_nbr] < 32 || data [char_nbr] > 127)
524 is_bin = 1;
525
526 char buffer [256] = "";
527 size_t max_size = is_bin? 35: 70;
528 const char *ellipsis = "";
529
530 // backwards compatibility
531 if (!length) {
532 if (size > max_size) {
533 size = max_size;
534 ellipsis = "...";
535 }
536
537 length = size;
538 }
539 if (length > size)
540 length = size;
541
542 for (char_nbr = 0; char_nbr < length; char_nbr++) {
543 if (!(char_nbr % max_size)) {
544 if (char_nbr) zsys_debug (buffer);
545 snprintf (buffer, 30, "%s[%03d] ", prefix? prefix: "", (int) size);
546 }
547 if (is_bin)
548 sprintf (buffer + strlen (buffer), "%02X", (unsigned char) data [char_nbr]);
549 else
550 sprintf (buffer + strlen (buffer), "%c", data [char_nbr]);
551 }
552 strcat (buffer, ellipsis);
553 zsys_debug (buffer);
554 }
555
556 void
zframe_print(zframe_t * self,const char * prefix)557 zframe_print (zframe_t *self, const char *prefix)
558 {
559 zframe_print_n (self, prefix, 0);
560 }
561
562
563 // --------------------------------------------------------------------------
564 // Probe the supplied object, and report if it looks like a zframe_t.
565
566 bool
zframe_is(void * self)567 zframe_is (void *self)
568 {
569 assert (self);
570 return ((zframe_t *) self)->tag == ZFRAME_TAG;
571 }
572
573
574 // --------------------------------------------------------------------------
575 // DEPRECATED as poor style -- callers should use zloop or zpoller
576 // Receive a new frame off the socket. Returns newly allocated frame, or
577 // NULL if there was no input waiting, or if the read was interrupted.
578
579 zframe_t *
zframe_recv_nowait(void * source)580 zframe_recv_nowait (void *source)
581 {
582 assert (source);
583 void *handle = zsock_resolve (source);
584
585 zframe_t *self = zframe_new (NULL, 0);
586 assert (self);
587 if (zmq_recvmsg (handle, &self->zmsg, ZMQ_DONTWAIT) < 0) {
588 zframe_destroy (&self);
589 return NULL; // Interrupted or terminated
590 }
591 self->more = zsock_rcvmore (source);
592 #if defined (ZMQ_SERVER)
593 // Grab routing ID if we're reading from a SERVER socket (ZMQ 4.2 and later)
594 if (zsock_type (source) == ZMQ_SERVER)
595 self->routing_id = zmq_msg_routing_id (&self->zmsg);
596 #endif
597 #if defined (ZMQ_DISH)
598 // Grab group if we're reading from a DISH Socket (ZMQ 4.2 and later)
599 if (zsock_type (source) == ZMQ_DISH)
600 strcpy (self->group, zmq_msg_group (&self->zmsg));
601 #endif
602 return self;
603 }
604
605
606 // --------------------------------------------------------------------------
607 // DEPRECATED as inconsistent; breaks principle that logging should all go
608 // to a single destination.
609 // Print contents of frame to FILE stream, prefix is ignored if null.
610
611 void
zframe_fprint(zframe_t * self,const char * prefix,FILE * file)612 zframe_fprint (zframe_t *self, const char *prefix, FILE *file)
613 {
614 assert (self);
615 assert (zframe_is (self));
616
617 if (prefix)
618 fprintf (file, "%s", prefix);
619 byte *data = zframe_data (self);
620 size_t size = zframe_size (self);
621
622 int is_bin = 0;
623 uint char_nbr;
624 for (char_nbr = 0; char_nbr < size; char_nbr++)
625 if (data [char_nbr] < 9 || data [char_nbr] > 127)
626 is_bin = 1;
627
628 fprintf (file, "[%03d] ", (int) size);
629 size_t max_size = is_bin? 35: 70;
630 const char *ellipsis = "";
631 if (size > max_size) {
632 size = max_size;
633 ellipsis = "...";
634 }
635 for (char_nbr = 0; char_nbr < size; char_nbr++) {
636 if (is_bin)
637 fprintf (file, "%02X", (unsigned char) data [char_nbr]);
638 else
639 fprintf (file, "%c", data [char_nbr]);
640 }
641 fprintf (file, "%s\n", ellipsis);
642 }
643
644
645 // --------------------------------------------------------------------------
646 // Selftest
647
648 static void
mem_destructor(void ** hint)649 mem_destructor (void **hint) {
650 strcpy ((char*)*hint, "world");
651 }
652
653
654 void
zframe_test(bool verbose)655 zframe_test (bool verbose)
656 {
657 printf (" * zframe: ");
658 int rc;
659 zframe_t* frame;
660
661 // @selftest
662 // Create two PAIR sockets and connect over TCP
663 zsock_t *output = zsock_new (ZMQ_PAIR);
664 assert (output);
665 int port = zsock_bind (output, "tcp://127.0.0.1:*");
666 assert (port != -1);
667 zsock_t *input = zsock_new (ZMQ_PAIR);
668 assert (input);
669 rc = zsock_connect (input, "tcp://127.0.0.1:%d", port);
670 assert (rc != -1);
671
672 // Send five different frames, test ZFRAME_MORE
673 int frame_nbr;
674 for (frame_nbr = 0; frame_nbr < 5; frame_nbr++) {
675 frame = zframe_new ("Hello", 5);
676 assert (frame);
677 rc = zframe_send (&frame, output, ZFRAME_MORE);
678 assert (rc == 0);
679 }
680 // Send same frame five times, test ZFRAME_REUSE
681 frame = zframe_new ("Hello", 5);
682 assert (frame);
683 for (frame_nbr = 0; frame_nbr < 5; frame_nbr++) {
684 rc = zframe_send (&frame, output, ZFRAME_MORE + ZFRAME_REUSE);
685 assert (rc == 0);
686 }
687 assert (frame);
688 zframe_t *copy = zframe_dup (frame);
689 assert (zframe_eq (frame, copy));
690 zframe_destroy (&frame);
691 assert (!zframe_eq (frame, copy));
692 assert (zframe_size (copy) == 5);
693 zframe_destroy (©);
694 assert (!zframe_eq (frame, copy));
695
696 // Test zframe_new_empty
697 frame = zframe_new_empty ();
698 assert (frame);
699 assert (zframe_size (frame) == 0);
700 zframe_destroy (&frame);
701
702 // Send END frame
703 frame = zframe_new ("NOT", 3);
704 assert (frame);
705 zframe_reset (frame, "END", 3);
706 char *string = zframe_strhex (frame);
707 assert (streq (string, "454E44"));
708 freen (string);
709 string = zframe_strdup (frame);
710 assert (streq (string, "END"));
711 freen (string);
712 rc = zframe_send (&frame, output, 0);
713 assert (rc == 0);
714
715 // Read and count until we receive END
716 frame_nbr = 0;
717 for (frame_nbr = 0;; frame_nbr++) {
718 zframe_t *frame = zframe_recv (input);
719 if (zframe_streq (frame, "END")) {
720 zframe_destroy (&frame);
721 break;
722 }
723 assert (zframe_more (frame));
724 zframe_set_more (frame, 0);
725 assert (zframe_more (frame) == 0);
726 zframe_destroy (&frame);
727 }
728 assert (frame_nbr == 10);
729
730 #if (ZMQ_VERSION >= ZMQ_MAKE_VERSION (4, 1, 0))
731 // Test zframe_meta
732 frame = zframe_new ("Hello", 5);
733 assert (frame);
734 rc = zframe_send (&frame, output, 0);
735 assert (rc == 0);
736 frame = zframe_recv (input);
737 const char *meta = zframe_meta (frame, "Socket-Type");
738 assert (meta != NULL);
739 assert (streq (meta, "PAIR"));
740 assert (zframe_meta (frame, "nonexistent") == NULL);
741 zframe_destroy (&frame);
742 #endif
743
744 zsock_destroy (&input);
745 zsock_destroy (&output);
746
747 #if defined (ZMQ_SERVER)
748 // Create server and client sockets and connect over inproc
749 zsock_t *server = zsock_new_server ("inproc://zframe-test-routing");
750 assert (server);
751 zsock_t *client = zsock_new_client ("inproc://zframe-test-routing");
752 assert (client);
753
754 // Send request from client to server
755 zframe_t *request = zframe_new ("Hello", 5);
756 assert (request);
757 rc = zframe_send (&request, client, 0);
758 assert (rc == 0);
759 assert (!request);
760
761 // Read request and send reply
762 request = zframe_recv (server);
763 assert (request);
764 assert (zframe_streq (request, "Hello"));
765 assert (zframe_routing_id (request));
766
767 zframe_t *reply = zframe_new ("World", 5);
768 assert (reply);
769 zframe_set_routing_id (reply, zframe_routing_id (request));
770 rc = zframe_send (&reply, server, 0);
771 assert (rc == 0);
772 zframe_destroy (&request);
773
774 // Read reply
775 reply = zframe_recv (client);
776 assert (zframe_streq (reply, "World"));
777 assert (zframe_routing_id (reply) == 0);
778 zframe_destroy (&reply);
779
780 // Client and server disallow multipart
781 frame = zframe_new ("Hello", 5);
782 rc = zframe_send (&frame, client, ZFRAME_MORE);
783 assert (rc == -1);
784 rc = zframe_send (&frame, server, ZFRAME_MORE);
785 assert (rc == -1);
786 zframe_destroy (&frame);
787
788 zsock_destroy (&client);
789 zsock_destroy (&server);
790 #endif
791
792 #ifdef ZMQ_RADIO
793 // Create radio and dish sockets and connect over inproc
794 zsock_t *radio = zsock_new_radio ("inproc://zframe-test-radio");
795 assert (radio);
796 zsock_t *dish = zsock_new_dish ("inproc://zframe-test-radio");
797 assert (dish);
798
799 // Join the group
800 rc = zsock_join (dish, "World");
801 assert (rc == 0);
802
803 // Publish message from radio
804 zframe_t *message = zframe_new ("Hello", 5);
805 assert (message);
806 rc = zframe_set_group (message, "World");
807 assert (rc == 0);
808 rc = zframe_send (&message, radio, 0);
809 assert (rc == 0);
810 assert (!message);
811
812 // Receive the message from dish
813 message = zframe_recv (dish);
814 assert (message);
815 assert (zframe_streq (message, "Hello"));
816 assert (strcmp("World", zframe_group (message)) == 0);
817 zframe_destroy (&message);
818
819 zsock_destroy (&dish);
820 zsock_destroy (&radio);
821 #else
822 frame = zframe_new ("Hello", 5);
823 rc = zframe_set_group (frame, "World");
824 assert(rc == -1);
825 assert(errno == ENOTSUP);
826 zframe_destroy (&frame);
827 #endif
828
829 char str[] = "hello";
830 frame = zframe_frommem (str, 5, mem_destructor, str);
831 assert (frame);
832 zframe_destroy (&frame);
833
834 // The destructor doesn't free the memory, only changing the strid,
835 // so we can check if the destructor was invoked
836 assert (streq (str, "world"));
837
838
839 // zframe_print tests
840
841 zsys_set_logstream(verbose ? stdout : NULL);
842
843 // == No data ==
844 frame = zframe_new ("", 0);
845
846 // no prefix, backwards compatible
847 // - emits nothing but the timestamp
848 zframe_print (frame, "");
849 zframe_print_n (frame, "", 0);
850
851 // prefix, backwards compatible
852 // - emits nothing but the timestamp
853 zframe_print (frame, "Prefix");
854 zframe_print_n (frame, "Prefix", 0);
855
856 // len > data
857 // - emits nothing but the timestamp
858 zframe_print_n (frame, "", 15);
859 zframe_print_n (frame, "Prefix", 15);
860
861 // max len
862 // - emits nothing but the timestamp
863 zframe_print_n (frame, "", -1);
864 zframe_print_n (frame, "Prefix", -1);
865
866 zframe_destroy (&frame);
867
868
869 // == Short data ==
870 frame = zframe_new ("Hello there!", 12);
871
872 // no prefix, backwards compatible: ellipsis
873 // - "[012] Hello there!"
874 zframe_print (frame, "");
875 zframe_print_n (frame, "", 0);
876
877 // prefix, backwards compatible: ellipsis
878 // - "Prefix[012] Hello there!"
879 zframe_print (frame, "Prefix");
880 zframe_print_n (frame, "Prefix", 0);
881
882 // len < data
883 // - "[012] Hello"
884 // - "Prefix[012] Hello"
885 zframe_print_n (frame, "", 5);
886 zframe_print_n (frame, "Prefix", 5);
887
888 // len > data
889 // - "[012] Hello there!"
890 // - "Prefix[012] Hello there!"
891 zframe_print_n (frame, "", 15);
892 zframe_print_n (frame, "Prefix", 15);
893
894 // max len
895 // - "[012] Hello there!"
896 // - "Prefix[012] Hello there!"
897 zframe_print_n (frame, "", -1);
898 zframe_print_n (frame, "Prefix", -1);
899
900 zframe_destroy (&frame);
901
902
903 // == Long data ==
904 frame = zframe_new ("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus ligula et aliquam tristique. Phasellus consequat, enim et blandit varius, sapien diam faucibus lorem, non ultricies lacus turpis sed lectus. Vivamus id elit urna. In sit amet lacinia mauris. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Integer ut cursus diam. Vestibulum semper vel leo eu finibus. Ut urna magna, commodo vel auctor sed, eleifend quis lacus. Aenean quis ipsum et velit auctor ultrices.", 519);
905
906 // no prefix, backwards compatible: ellipsis
907 // - "[070] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus..."
908 zframe_print (frame, "");
909 zframe_print_n (frame, "", 0);
910
911 // prefix, backwards compatible: ellipsis
912 // - "Prefix[070] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus..."
913 zframe_print (frame, "Prefix");
914 zframe_print_n (frame, "Prefix", 0);
915
916 // len < data
917 // - "[519] Lorem"
918 // - "Prefix[519] Lorem"
919 zframe_print_n (frame, "", 5);
920 zframe_print_n (frame, "Prefix", 5);
921
922 // small len
923 // - "[519] Lorem ipsum dolor sit amet"
924 // - "Prefix[519] Lorem ipsum dolor sit amet"
925 zframe_print_n (frame, "", 26);
926 zframe_print_n (frame, "Prefix", 26);
927
928 // mid len
929 // - "[519] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus"
930 // "[519] ligula et aliquam tristique. Phasellus consequat, enim et blandit var"
931 // "[519] ius, sapie"
932 // - "Prefix[519] Lorem ipsum dolor sit amet, consectetur adipiscing elit. Proin finibus"
933 // "Prefix[519] ligula et aliquam tristique. Phasellus consequat, enim et blandit var"
934 // "Prefix[519] ius, sapie"
935 zframe_print_n (frame, "", 150);
936 zframe_print_n (frame, "Prefix", 150);
937
938 // len > data
939 // - emits the whole paragraph
940 zframe_print_n (frame, "", 1500);
941 zframe_print_n (frame, "Prefix", 1500);
942
943 // max len
944 // - emits the whole paragraph
945 zframe_print_n (frame, "", -1);
946 zframe_print_n (frame, "Prefix", -1);
947
948 zframe_destroy (&frame);
949
950 #if defined (__WINDOWS__)
951 zsys_shutdown();
952 #endif
953
954 // @end
955 printf ("OK\n");
956 }
957