1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "compat.hpp"
32 #include "macros.hpp"
33 #include "msg.hpp"
34 
35 #include <string.h>
36 #include <stdlib.h>
37 #include <new>
38 
39 #include "stdint.hpp"
40 #include "likely.hpp"
41 #include "metadata.hpp"
42 #include "err.hpp"
43 
44 //  Check whether the sizes of public representation of the message (zmq_msg_t)
45 //  and private representation of the message (zmq::msg_t) match.
46 
47 typedef char
48   zmq_msg_size_check[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0)
49                      - 1];
50 
check() const51 bool zmq::msg_t::check () const
52 {
53     return _u.base.type >= type_min && _u.base.type <= type_max;
54 }
55 
init(void * data_,size_t size_,msg_free_fn * ffn_,void * hint_,content_t * content_)56 int zmq::msg_t::init (void *data_,
57                       size_t size_,
58                       msg_free_fn *ffn_,
59                       void *hint_,
60                       content_t *content_)
61 {
62     if (size_ < max_vsm_size) {
63         const int rc = init_size (size_);
64 
65         if (rc != -1) {
66             memcpy (data (), data_, size_);
67             return 0;
68         }
69         return -1;
70     }
71     if (content_) {
72         return init_external_storage (content_, data_, size_, ffn_, hint_);
73     }
74     return init_data (data_, size_, ffn_, hint_);
75 }
76 
init()77 int zmq::msg_t::init ()
78 {
79     _u.vsm.metadata = NULL;
80     _u.vsm.type = type_vsm;
81     _u.vsm.flags = 0;
82     _u.vsm.size = 0;
83     _u.vsm.group.sgroup.group[0] = '\0';
84     _u.vsm.group.type = group_type_short;
85     _u.vsm.routing_id = 0;
86     return 0;
87 }
88 
init_size(size_t size_)89 int zmq::msg_t::init_size (size_t size_)
90 {
91     if (size_ <= max_vsm_size) {
92         _u.vsm.metadata = NULL;
93         _u.vsm.type = type_vsm;
94         _u.vsm.flags = 0;
95         _u.vsm.size = static_cast<unsigned char> (size_);
96         _u.vsm.group.sgroup.group[0] = '\0';
97         _u.vsm.group.type = group_type_short;
98         _u.vsm.routing_id = 0;
99     } else {
100         _u.lmsg.metadata = NULL;
101         _u.lmsg.type = type_lmsg;
102         _u.lmsg.flags = 0;
103         _u.lmsg.group.sgroup.group[0] = '\0';
104         _u.lmsg.group.type = group_type_short;
105         _u.lmsg.routing_id = 0;
106         _u.lmsg.content = NULL;
107         if (sizeof (content_t) + size_ > size_)
108             _u.lmsg.content =
109               static_cast<content_t *> (malloc (sizeof (content_t) + size_));
110         if (unlikely (!_u.lmsg.content)) {
111             errno = ENOMEM;
112             return -1;
113         }
114 
115         _u.lmsg.content->data = _u.lmsg.content + 1;
116         _u.lmsg.content->size = size_;
117         _u.lmsg.content->ffn = NULL;
118         _u.lmsg.content->hint = NULL;
119         new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
120     }
121     return 0;
122 }
123 
init_buffer(const void * buf_,size_t size_)124 int zmq::msg_t::init_buffer (const void *buf_, size_t size_)
125 {
126     const int rc = init_size (size_);
127     if (unlikely (rc < 0)) {
128         return -1;
129     }
130     if (size_) {
131         // NULL and zero size is allowed
132         assert (NULL != buf_);
133         memcpy (data (), buf_, size_);
134     }
135     return 0;
136 }
137 
init_external_storage(content_t * content_,void * data_,size_t size_,msg_free_fn * ffn_,void * hint_)138 int zmq::msg_t::init_external_storage (content_t *content_,
139                                        void *data_,
140                                        size_t size_,
141                                        msg_free_fn *ffn_,
142                                        void *hint_)
143 {
144     zmq_assert (NULL != data_);
145     zmq_assert (NULL != content_);
146 
147     _u.zclmsg.metadata = NULL;
148     _u.zclmsg.type = type_zclmsg;
149     _u.zclmsg.flags = 0;
150     _u.zclmsg.group.sgroup.group[0] = '\0';
151     _u.zclmsg.group.type = group_type_short;
152     _u.zclmsg.routing_id = 0;
153 
154     _u.zclmsg.content = content_;
155     _u.zclmsg.content->data = data_;
156     _u.zclmsg.content->size = size_;
157     _u.zclmsg.content->ffn = ffn_;
158     _u.zclmsg.content->hint = hint_;
159     new (&_u.zclmsg.content->refcnt) zmq::atomic_counter_t ();
160 
161     return 0;
162 }
163 
init_data(void * data_,size_t size_,msg_free_fn * ffn_,void * hint_)164 int zmq::msg_t::init_data (void *data_,
165                            size_t size_,
166                            msg_free_fn *ffn_,
167                            void *hint_)
168 {
169     //  If data is NULL and size is not 0, a segfault
170     //  would occur once the data is accessed
171     zmq_assert (data_ != NULL || size_ == 0);
172 
173     //  Initialize constant message if there's no need to deallocate
174     if (ffn_ == NULL) {
175         _u.cmsg.metadata = NULL;
176         _u.cmsg.type = type_cmsg;
177         _u.cmsg.flags = 0;
178         _u.cmsg.data = data_;
179         _u.cmsg.size = size_;
180         _u.cmsg.group.sgroup.group[0] = '\0';
181         _u.cmsg.group.type = group_type_short;
182         _u.cmsg.routing_id = 0;
183     } else {
184         _u.lmsg.metadata = NULL;
185         _u.lmsg.type = type_lmsg;
186         _u.lmsg.flags = 0;
187         _u.lmsg.group.sgroup.group[0] = '\0';
188         _u.lmsg.group.type = group_type_short;
189         _u.lmsg.routing_id = 0;
190         _u.lmsg.content =
191           static_cast<content_t *> (malloc (sizeof (content_t)));
192         if (!_u.lmsg.content) {
193             errno = ENOMEM;
194             return -1;
195         }
196 
197         _u.lmsg.content->data = data_;
198         _u.lmsg.content->size = size_;
199         _u.lmsg.content->ffn = ffn_;
200         _u.lmsg.content->hint = hint_;
201         new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
202     }
203     return 0;
204 }
205 
init_delimiter()206 int zmq::msg_t::init_delimiter ()
207 {
208     _u.delimiter.metadata = NULL;
209     _u.delimiter.type = type_delimiter;
210     _u.delimiter.flags = 0;
211     _u.delimiter.group.sgroup.group[0] = '\0';
212     _u.delimiter.group.type = group_type_short;
213     _u.delimiter.routing_id = 0;
214     return 0;
215 }
216 
init_join()217 int zmq::msg_t::init_join ()
218 {
219     _u.base.metadata = NULL;
220     _u.base.type = type_join;
221     _u.base.flags = 0;
222     _u.base.group.sgroup.group[0] = '\0';
223     _u.base.group.type = group_type_short;
224     _u.base.routing_id = 0;
225     return 0;
226 }
227 
init_leave()228 int zmq::msg_t::init_leave ()
229 {
230     _u.base.metadata = NULL;
231     _u.base.type = type_leave;
232     _u.base.flags = 0;
233     _u.base.group.sgroup.group[0] = '\0';
234     _u.base.group.type = group_type_short;
235     _u.base.routing_id = 0;
236     return 0;
237 }
238 
init_subscribe(const size_t size_,const unsigned char * topic_)239 int zmq::msg_t::init_subscribe (const size_t size_, const unsigned char *topic_)
240 {
241     int rc = init_size (size_);
242     if (rc == 0) {
243         set_flags (zmq::msg_t::subscribe);
244 
245         //  We explicitly allow a NULL subscription with size zero
246         if (size_) {
247             assert (topic_);
248             memcpy (data (), topic_, size_);
249         }
250     }
251     return rc;
252 }
253 
init_cancel(const size_t size_,const unsigned char * topic_)254 int zmq::msg_t::init_cancel (const size_t size_, const unsigned char *topic_)
255 {
256     int rc = init_size (size_);
257     if (rc == 0) {
258         set_flags (zmq::msg_t::cancel);
259 
260         //  We explicitly allow a NULL subscription with size zero
261         if (size_) {
262             assert (topic_);
263             memcpy (data (), topic_, size_);
264         }
265     }
266     return rc;
267 }
268 
close()269 int zmq::msg_t::close ()
270 {
271     //  Check the validity of the message.
272     if (unlikely (!check ())) {
273         errno = EFAULT;
274         return -1;
275     }
276 
277     if (_u.base.type == type_lmsg) {
278         //  If the content is not shared, or if it is shared and the reference
279         //  count has dropped to zero, deallocate it.
280         if (!(_u.lmsg.flags & msg_t::shared)
281             || !_u.lmsg.content->refcnt.sub (1)) {
282             //  We used "placement new" operator to initialize the reference
283             //  counter so we call the destructor explicitly now.
284             _u.lmsg.content->refcnt.~atomic_counter_t ();
285 
286             if (_u.lmsg.content->ffn)
287                 _u.lmsg.content->ffn (_u.lmsg.content->data,
288                                       _u.lmsg.content->hint);
289             free (_u.lmsg.content);
290         }
291     }
292 
293     if (is_zcmsg ()) {
294         zmq_assert (_u.zclmsg.content->ffn);
295 
296         //  If the content is not shared, or if it is shared and the reference
297         //  count has dropped to zero, deallocate it.
298         if (!(_u.zclmsg.flags & msg_t::shared)
299             || !_u.zclmsg.content->refcnt.sub (1)) {
300             //  We used "placement new" operator to initialize the reference
301             //  counter so we call the destructor explicitly now.
302             _u.zclmsg.content->refcnt.~atomic_counter_t ();
303 
304             _u.zclmsg.content->ffn (_u.zclmsg.content->data,
305                                     _u.zclmsg.content->hint);
306         }
307     }
308 
309     if (_u.base.metadata != NULL) {
310         if (_u.base.metadata->drop_ref ()) {
311             LIBZMQ_DELETE (_u.base.metadata);
312         }
313         _u.base.metadata = NULL;
314     }
315 
316     if (_u.base.group.type == group_type_long) {
317         if (!_u.base.group.lgroup.content->refcnt.sub (1)) {
318             //  We used "placement new" operator to initialize the reference
319             //  counter so we call the destructor explicitly now.
320             _u.base.group.lgroup.content->refcnt.~atomic_counter_t ();
321 
322             free (_u.base.group.lgroup.content);
323         }
324     }
325 
326     //  Make the message invalid.
327     _u.base.type = 0;
328 
329     return 0;
330 }
331 
move(msg_t & src_)332 int zmq::msg_t::move (msg_t &src_)
333 {
334     //  Check the validity of the source.
335     if (unlikely (!src_.check ())) {
336         errno = EFAULT;
337         return -1;
338     }
339 
340     int rc = close ();
341     if (unlikely (rc < 0))
342         return rc;
343 
344     *this = src_;
345 
346     rc = src_.init ();
347     if (unlikely (rc < 0))
348         return rc;
349 
350     return 0;
351 }
352 
copy(msg_t & src_)353 int zmq::msg_t::copy (msg_t &src_)
354 {
355     //  Check the validity of the source.
356     if (unlikely (!src_.check ())) {
357         errno = EFAULT;
358         return -1;
359     }
360 
361     const int rc = close ();
362     if (unlikely (rc < 0))
363         return rc;
364 
365     // The initial reference count, when a non-shared message is initially
366     // shared (between the original and the copy we create here).
367     const atomic_counter_t::integer_t initial_shared_refcnt = 2;
368 
369     if (src_.is_lmsg () || src_.is_zcmsg ()) {
370         //  One reference is added to shared messages. Non-shared messages
371         //  are turned into shared messages.
372         if (src_.flags () & msg_t::shared)
373             src_.refcnt ()->add (1);
374         else {
375             src_.set_flags (msg_t::shared);
376             src_.refcnt ()->set (initial_shared_refcnt);
377         }
378     }
379 
380     if (src_._u.base.metadata != NULL)
381         src_._u.base.metadata->add_ref ();
382 
383     if (src_._u.base.group.type == group_type_long)
384         src_._u.base.group.lgroup.content->refcnt.add (1);
385 
386     *this = src_;
387 
388     return 0;
389 }
390 
data()391 void *zmq::msg_t::data ()
392 {
393     //  Check the validity of the message.
394     zmq_assert (check ());
395 
396     switch (_u.base.type) {
397         case type_vsm:
398             return _u.vsm.data;
399         case type_lmsg:
400             return _u.lmsg.content->data;
401         case type_cmsg:
402             return _u.cmsg.data;
403         case type_zclmsg:
404             return _u.zclmsg.content->data;
405         default:
406             zmq_assert (false);
407             return NULL;
408     }
409 }
410 
size() const411 size_t zmq::msg_t::size () const
412 {
413     //  Check the validity of the message.
414     zmq_assert (check ());
415 
416     switch (_u.base.type) {
417         case type_vsm:
418             return _u.vsm.size;
419         case type_lmsg:
420             return _u.lmsg.content->size;
421         case type_zclmsg:
422             return _u.zclmsg.content->size;
423         case type_cmsg:
424             return _u.cmsg.size;
425         default:
426             zmq_assert (false);
427             return 0;
428     }
429 }
430 
shrink(size_t new_size_)431 void zmq::msg_t::shrink (size_t new_size_)
432 {
433     //  Check the validity of the message.
434     zmq_assert (check ());
435     zmq_assert (new_size_ <= size ());
436 
437     switch (_u.base.type) {
438         case type_vsm:
439             _u.vsm.size = static_cast<unsigned char> (new_size_);
440             break;
441         case type_lmsg:
442             _u.lmsg.content->size = new_size_;
443             break;
444         case type_zclmsg:
445             _u.zclmsg.content->size = new_size_;
446             break;
447         case type_cmsg:
448             _u.cmsg.size = new_size_;
449             break;
450         default:
451             zmq_assert (false);
452     }
453 }
454 
flags() const455 unsigned char zmq::msg_t::flags () const
456 {
457     return _u.base.flags;
458 }
459 
set_flags(unsigned char flags_)460 void zmq::msg_t::set_flags (unsigned char flags_)
461 {
462     _u.base.flags |= flags_;
463 }
464 
reset_flags(unsigned char flags_)465 void zmq::msg_t::reset_flags (unsigned char flags_)
466 {
467     _u.base.flags &= ~flags_;
468 }
469 
metadata() const470 zmq::metadata_t *zmq::msg_t::metadata () const
471 {
472     return _u.base.metadata;
473 }
474 
set_metadata(zmq::metadata_t * metadata_)475 void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
476 {
477     assert (metadata_ != NULL);
478     assert (_u.base.metadata == NULL);
479     metadata_->add_ref ();
480     _u.base.metadata = metadata_;
481 }
482 
reset_metadata()483 void zmq::msg_t::reset_metadata ()
484 {
485     if (_u.base.metadata) {
486         if (_u.base.metadata->drop_ref ()) {
487             LIBZMQ_DELETE (_u.base.metadata);
488         }
489         _u.base.metadata = NULL;
490     }
491 }
492 
is_routing_id() const493 bool zmq::msg_t::is_routing_id () const
494 {
495     return (_u.base.flags & routing_id) == routing_id;
496 }
497 
is_credential() const498 bool zmq::msg_t::is_credential () const
499 {
500     return (_u.base.flags & credential) == credential;
501 }
502 
is_delimiter() const503 bool zmq::msg_t::is_delimiter () const
504 {
505     return _u.base.type == type_delimiter;
506 }
507 
is_vsm() const508 bool zmq::msg_t::is_vsm () const
509 {
510     return _u.base.type == type_vsm;
511 }
512 
is_cmsg() const513 bool zmq::msg_t::is_cmsg () const
514 {
515     return _u.base.type == type_cmsg;
516 }
517 
is_lmsg() const518 bool zmq::msg_t::is_lmsg () const
519 {
520     return _u.base.type == type_lmsg;
521 }
522 
is_zcmsg() const523 bool zmq::msg_t::is_zcmsg () const
524 {
525     return _u.base.type == type_zclmsg;
526 }
527 
is_join() const528 bool zmq::msg_t::is_join () const
529 {
530     return _u.base.type == type_join;
531 }
532 
is_leave() const533 bool zmq::msg_t::is_leave () const
534 {
535     return _u.base.type == type_leave;
536 }
537 
is_ping() const538 bool zmq::msg_t::is_ping () const
539 {
540     return (_u.base.flags & CMD_TYPE_MASK) == ping;
541 }
542 
is_pong() const543 bool zmq::msg_t::is_pong () const
544 {
545     return (_u.base.flags & CMD_TYPE_MASK) == pong;
546 }
547 
is_close_cmd() const548 bool zmq::msg_t::is_close_cmd () const
549 {
550     return (_u.base.flags & CMD_TYPE_MASK) == close_cmd;
551 }
552 
command_body_size() const553 size_t zmq::msg_t::command_body_size () const
554 {
555     if (this->is_ping () || this->is_pong ())
556         return this->size () - ping_cmd_name_size;
557     else if (!(this->flags () & msg_t::command)
558              && (this->is_subscribe () || this->is_cancel ()))
559         return this->size ();
560     else if (this->is_subscribe ())
561         return this->size () - sub_cmd_name_size;
562     else if (this->is_cancel ())
563         return this->size () - cancel_cmd_name_size;
564 
565     return 0;
566 }
567 
command_body()568 void *zmq::msg_t::command_body ()
569 {
570     unsigned char *data = NULL;
571 
572     if (this->is_ping () || this->is_pong ())
573         data =
574           static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size;
575     //  With inproc, command flag is not set for sub/cancel
576     else if (!(this->flags () & msg_t::command)
577              && (this->is_subscribe () || this->is_cancel ()))
578         data = static_cast<unsigned char *> (this->data ());
579     else if (this->is_subscribe ())
580         data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size;
581     else if (this->is_cancel ())
582         data =
583           static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size;
584 
585     return data;
586 }
587 
add_refs(int refs_)588 void zmq::msg_t::add_refs (int refs_)
589 {
590     zmq_assert (refs_ >= 0);
591 
592     //  Operation not supported for messages with metadata.
593     zmq_assert (_u.base.metadata == NULL);
594 
595     //  No copies required.
596     if (!refs_)
597         return;
598 
599     //  VSMs, CMSGS and delimiters can be copied straight away. The only
600     //  message type that needs special care are long messages.
601     if (_u.base.type == type_lmsg || is_zcmsg ()) {
602         if (_u.base.flags & msg_t::shared)
603             refcnt ()->add (refs_);
604         else {
605             refcnt ()->set (refs_ + 1);
606             _u.base.flags |= msg_t::shared;
607         }
608     }
609 }
610 
rm_refs(int refs_)611 bool zmq::msg_t::rm_refs (int refs_)
612 {
613     zmq_assert (refs_ >= 0);
614 
615     //  Operation not supported for messages with metadata.
616     zmq_assert (_u.base.metadata == NULL);
617 
618     //  No copies required.
619     if (!refs_)
620         return true;
621 
622     //  If there's only one reference close the message.
623     if ((_u.base.type != type_zclmsg && _u.base.type != type_lmsg)
624         || !(_u.base.flags & msg_t::shared)) {
625         close ();
626         return false;
627     }
628 
629     //  The only message type that needs special care are long and zcopy messages.
630     if (_u.base.type == type_lmsg && !_u.lmsg.content->refcnt.sub (refs_)) {
631         //  We used "placement new" operator to initialize the reference
632         //  counter so we call the destructor explicitly now.
633         _u.lmsg.content->refcnt.~atomic_counter_t ();
634 
635         if (_u.lmsg.content->ffn)
636             _u.lmsg.content->ffn (_u.lmsg.content->data, _u.lmsg.content->hint);
637         free (_u.lmsg.content);
638 
639         return false;
640     }
641 
642     if (is_zcmsg () && !_u.zclmsg.content->refcnt.sub (refs_)) {
643         // storage for rfcnt is provided externally
644         if (_u.zclmsg.content->ffn) {
645             _u.zclmsg.content->ffn (_u.zclmsg.content->data,
646                                     _u.zclmsg.content->hint);
647         }
648 
649         return false;
650     }
651 
652     return true;
653 }
654 
get_routing_id() const655 uint32_t zmq::msg_t::get_routing_id () const
656 {
657     return _u.base.routing_id;
658 }
659 
set_routing_id(uint32_t routing_id_)660 int zmq::msg_t::set_routing_id (uint32_t routing_id_)
661 {
662     if (routing_id_) {
663         _u.base.routing_id = routing_id_;
664         return 0;
665     }
666     errno = EINVAL;
667     return -1;
668 }
669 
reset_routing_id()670 int zmq::msg_t::reset_routing_id ()
671 {
672     _u.base.routing_id = 0;
673     return 0;
674 }
675 
group() const676 const char *zmq::msg_t::group () const
677 {
678     if (_u.base.group.type == group_type_long)
679         return _u.base.group.lgroup.content->group;
680     return _u.base.group.sgroup.group;
681 }
682 
set_group(const char * group_)683 int zmq::msg_t::set_group (const char *group_)
684 {
685     size_t length = strnlen (group_, ZMQ_GROUP_MAX_LENGTH);
686 
687     return set_group (group_, length);
688 }
689 
set_group(const char * group_,size_t length_)690 int zmq::msg_t::set_group (const char *group_, size_t length_)
691 {
692     if (length_ > ZMQ_GROUP_MAX_LENGTH) {
693         errno = EINVAL;
694         return -1;
695     }
696 
697     if (length_ > 14) {
698         _u.base.group.lgroup.type = group_type_long;
699         _u.base.group.lgroup.content =
700           (long_group_t *) malloc (sizeof (long_group_t));
701         assert (_u.base.group.lgroup.content);
702         new (&_u.base.group.lgroup.content->refcnt) zmq::atomic_counter_t ();
703         _u.base.group.lgroup.content->refcnt.set (1);
704         strncpy (_u.base.group.lgroup.content->group, group_, length_);
705         _u.base.group.lgroup.content->group[length_] = '\0';
706     } else {
707         strncpy (_u.base.group.sgroup.group, group_, length_);
708         _u.base.group.sgroup.group[length_] = '\0';
709     }
710 
711     return 0;
712 }
713 
refcnt()714 zmq::atomic_counter_t *zmq::msg_t::refcnt ()
715 {
716     switch (_u.base.type) {
717         case type_lmsg:
718             return &_u.lmsg.content->refcnt;
719         case type_zclmsg:
720             return &_u.zclmsg.content->refcnt;
721         default:
722             zmq_assert (false);
723             return NULL;
724     }
725 }
726