1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include "compat.hpp"
32 #include "macros.hpp"
33 #include "msg.hpp"
34
35 #include <string.h>
36 #include <stdlib.h>
37 #include <new>
38
39 #include "stdint.hpp"
40 #include "likely.hpp"
41 #include "metadata.hpp"
42 #include "err.hpp"
43
44 // Check whether the sizes of public representation of the message (zmq_msg_t)
45 // and private representation of the message (zmq::msg_t) match.
46
47 typedef char
48 zmq_msg_size_check[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0)
49 - 1];
50
check() const51 bool zmq::msg_t::check () const
52 {
53 return _u.base.type >= type_min && _u.base.type <= type_max;
54 }
55
init(void * data_,size_t size_,msg_free_fn * ffn_,void * hint_,content_t * content_)56 int zmq::msg_t::init (void *data_,
57 size_t size_,
58 msg_free_fn *ffn_,
59 void *hint_,
60 content_t *content_)
61 {
62 if (size_ < max_vsm_size) {
63 const int rc = init_size (size_);
64
65 if (rc != -1) {
66 memcpy (data (), data_, size_);
67 return 0;
68 }
69 return -1;
70 }
71 if (content_) {
72 return init_external_storage (content_, data_, size_, ffn_, hint_);
73 }
74 return init_data (data_, size_, ffn_, hint_);
75 }
76
init()77 int zmq::msg_t::init ()
78 {
79 _u.vsm.metadata = NULL;
80 _u.vsm.type = type_vsm;
81 _u.vsm.flags = 0;
82 _u.vsm.size = 0;
83 _u.vsm.group.sgroup.group[0] = '\0';
84 _u.vsm.group.type = group_type_short;
85 _u.vsm.routing_id = 0;
86 return 0;
87 }
88
init_size(size_t size_)89 int zmq::msg_t::init_size (size_t size_)
90 {
91 if (size_ <= max_vsm_size) {
92 _u.vsm.metadata = NULL;
93 _u.vsm.type = type_vsm;
94 _u.vsm.flags = 0;
95 _u.vsm.size = static_cast<unsigned char> (size_);
96 _u.vsm.group.sgroup.group[0] = '\0';
97 _u.vsm.group.type = group_type_short;
98 _u.vsm.routing_id = 0;
99 } else {
100 _u.lmsg.metadata = NULL;
101 _u.lmsg.type = type_lmsg;
102 _u.lmsg.flags = 0;
103 _u.lmsg.group.sgroup.group[0] = '\0';
104 _u.lmsg.group.type = group_type_short;
105 _u.lmsg.routing_id = 0;
106 _u.lmsg.content = NULL;
107 if (sizeof (content_t) + size_ > size_)
108 _u.lmsg.content =
109 static_cast<content_t *> (malloc (sizeof (content_t) + size_));
110 if (unlikely (!_u.lmsg.content)) {
111 errno = ENOMEM;
112 return -1;
113 }
114
115 _u.lmsg.content->data = _u.lmsg.content + 1;
116 _u.lmsg.content->size = size_;
117 _u.lmsg.content->ffn = NULL;
118 _u.lmsg.content->hint = NULL;
119 new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
120 }
121 return 0;
122 }
123
init_buffer(const void * buf_,size_t size_)124 int zmq::msg_t::init_buffer (const void *buf_, size_t size_)
125 {
126 const int rc = init_size (size_);
127 if (unlikely (rc < 0)) {
128 return -1;
129 }
130 if (size_) {
131 // NULL and zero size is allowed
132 assert (NULL != buf_);
133 memcpy (data (), buf_, size_);
134 }
135 return 0;
136 }
137
init_external_storage(content_t * content_,void * data_,size_t size_,msg_free_fn * ffn_,void * hint_)138 int zmq::msg_t::init_external_storage (content_t *content_,
139 void *data_,
140 size_t size_,
141 msg_free_fn *ffn_,
142 void *hint_)
143 {
144 zmq_assert (NULL != data_);
145 zmq_assert (NULL != content_);
146
147 _u.zclmsg.metadata = NULL;
148 _u.zclmsg.type = type_zclmsg;
149 _u.zclmsg.flags = 0;
150 _u.zclmsg.group.sgroup.group[0] = '\0';
151 _u.zclmsg.group.type = group_type_short;
152 _u.zclmsg.routing_id = 0;
153
154 _u.zclmsg.content = content_;
155 _u.zclmsg.content->data = data_;
156 _u.zclmsg.content->size = size_;
157 _u.zclmsg.content->ffn = ffn_;
158 _u.zclmsg.content->hint = hint_;
159 new (&_u.zclmsg.content->refcnt) zmq::atomic_counter_t ();
160
161 return 0;
162 }
163
init_data(void * data_,size_t size_,msg_free_fn * ffn_,void * hint_)164 int zmq::msg_t::init_data (void *data_,
165 size_t size_,
166 msg_free_fn *ffn_,
167 void *hint_)
168 {
169 // If data is NULL and size is not 0, a segfault
170 // would occur once the data is accessed
171 zmq_assert (data_ != NULL || size_ == 0);
172
173 // Initialize constant message if there's no need to deallocate
174 if (ffn_ == NULL) {
175 _u.cmsg.metadata = NULL;
176 _u.cmsg.type = type_cmsg;
177 _u.cmsg.flags = 0;
178 _u.cmsg.data = data_;
179 _u.cmsg.size = size_;
180 _u.cmsg.group.sgroup.group[0] = '\0';
181 _u.cmsg.group.type = group_type_short;
182 _u.cmsg.routing_id = 0;
183 } else {
184 _u.lmsg.metadata = NULL;
185 _u.lmsg.type = type_lmsg;
186 _u.lmsg.flags = 0;
187 _u.lmsg.group.sgroup.group[0] = '\0';
188 _u.lmsg.group.type = group_type_short;
189 _u.lmsg.routing_id = 0;
190 _u.lmsg.content =
191 static_cast<content_t *> (malloc (sizeof (content_t)));
192 if (!_u.lmsg.content) {
193 errno = ENOMEM;
194 return -1;
195 }
196
197 _u.lmsg.content->data = data_;
198 _u.lmsg.content->size = size_;
199 _u.lmsg.content->ffn = ffn_;
200 _u.lmsg.content->hint = hint_;
201 new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
202 }
203 return 0;
204 }
205
init_delimiter()206 int zmq::msg_t::init_delimiter ()
207 {
208 _u.delimiter.metadata = NULL;
209 _u.delimiter.type = type_delimiter;
210 _u.delimiter.flags = 0;
211 _u.delimiter.group.sgroup.group[0] = '\0';
212 _u.delimiter.group.type = group_type_short;
213 _u.delimiter.routing_id = 0;
214 return 0;
215 }
216
init_join()217 int zmq::msg_t::init_join ()
218 {
219 _u.base.metadata = NULL;
220 _u.base.type = type_join;
221 _u.base.flags = 0;
222 _u.base.group.sgroup.group[0] = '\0';
223 _u.base.group.type = group_type_short;
224 _u.base.routing_id = 0;
225 return 0;
226 }
227
init_leave()228 int zmq::msg_t::init_leave ()
229 {
230 _u.base.metadata = NULL;
231 _u.base.type = type_leave;
232 _u.base.flags = 0;
233 _u.base.group.sgroup.group[0] = '\0';
234 _u.base.group.type = group_type_short;
235 _u.base.routing_id = 0;
236 return 0;
237 }
238
init_subscribe(const size_t size_,const unsigned char * topic_)239 int zmq::msg_t::init_subscribe (const size_t size_, const unsigned char *topic_)
240 {
241 int rc = init_size (size_);
242 if (rc == 0) {
243 set_flags (zmq::msg_t::subscribe);
244
245 // We explicitly allow a NULL subscription with size zero
246 if (size_) {
247 assert (topic_);
248 memcpy (data (), topic_, size_);
249 }
250 }
251 return rc;
252 }
253
init_cancel(const size_t size_,const unsigned char * topic_)254 int zmq::msg_t::init_cancel (const size_t size_, const unsigned char *topic_)
255 {
256 int rc = init_size (size_);
257 if (rc == 0) {
258 set_flags (zmq::msg_t::cancel);
259
260 // We explicitly allow a NULL subscription with size zero
261 if (size_) {
262 assert (topic_);
263 memcpy (data (), topic_, size_);
264 }
265 }
266 return rc;
267 }
268
close()269 int zmq::msg_t::close ()
270 {
271 // Check the validity of the message.
272 if (unlikely (!check ())) {
273 errno = EFAULT;
274 return -1;
275 }
276
277 if (_u.base.type == type_lmsg) {
278 // If the content is not shared, or if it is shared and the reference
279 // count has dropped to zero, deallocate it.
280 if (!(_u.lmsg.flags & msg_t::shared)
281 || !_u.lmsg.content->refcnt.sub (1)) {
282 // We used "placement new" operator to initialize the reference
283 // counter so we call the destructor explicitly now.
284 _u.lmsg.content->refcnt.~atomic_counter_t ();
285
286 if (_u.lmsg.content->ffn)
287 _u.lmsg.content->ffn (_u.lmsg.content->data,
288 _u.lmsg.content->hint);
289 free (_u.lmsg.content);
290 }
291 }
292
293 if (is_zcmsg ()) {
294 zmq_assert (_u.zclmsg.content->ffn);
295
296 // If the content is not shared, or if it is shared and the reference
297 // count has dropped to zero, deallocate it.
298 if (!(_u.zclmsg.flags & msg_t::shared)
299 || !_u.zclmsg.content->refcnt.sub (1)) {
300 // We used "placement new" operator to initialize the reference
301 // counter so we call the destructor explicitly now.
302 _u.zclmsg.content->refcnt.~atomic_counter_t ();
303
304 _u.zclmsg.content->ffn (_u.zclmsg.content->data,
305 _u.zclmsg.content->hint);
306 }
307 }
308
309 if (_u.base.metadata != NULL) {
310 if (_u.base.metadata->drop_ref ()) {
311 LIBZMQ_DELETE (_u.base.metadata);
312 }
313 _u.base.metadata = NULL;
314 }
315
316 if (_u.base.group.type == group_type_long) {
317 if (!_u.base.group.lgroup.content->refcnt.sub (1)) {
318 // We used "placement new" operator to initialize the reference
319 // counter so we call the destructor explicitly now.
320 _u.base.group.lgroup.content->refcnt.~atomic_counter_t ();
321
322 free (_u.base.group.lgroup.content);
323 }
324 }
325
326 // Make the message invalid.
327 _u.base.type = 0;
328
329 return 0;
330 }
331
move(msg_t & src_)332 int zmq::msg_t::move (msg_t &src_)
333 {
334 // Check the validity of the source.
335 if (unlikely (!src_.check ())) {
336 errno = EFAULT;
337 return -1;
338 }
339
340 int rc = close ();
341 if (unlikely (rc < 0))
342 return rc;
343
344 *this = src_;
345
346 rc = src_.init ();
347 if (unlikely (rc < 0))
348 return rc;
349
350 return 0;
351 }
352
copy(msg_t & src_)353 int zmq::msg_t::copy (msg_t &src_)
354 {
355 // Check the validity of the source.
356 if (unlikely (!src_.check ())) {
357 errno = EFAULT;
358 return -1;
359 }
360
361 const int rc = close ();
362 if (unlikely (rc < 0))
363 return rc;
364
365 // The initial reference count, when a non-shared message is initially
366 // shared (between the original and the copy we create here).
367 const atomic_counter_t::integer_t initial_shared_refcnt = 2;
368
369 if (src_.is_lmsg () || src_.is_zcmsg ()) {
370 // One reference is added to shared messages. Non-shared messages
371 // are turned into shared messages.
372 if (src_.flags () & msg_t::shared)
373 src_.refcnt ()->add (1);
374 else {
375 src_.set_flags (msg_t::shared);
376 src_.refcnt ()->set (initial_shared_refcnt);
377 }
378 }
379
380 if (src_._u.base.metadata != NULL)
381 src_._u.base.metadata->add_ref ();
382
383 if (src_._u.base.group.type == group_type_long)
384 src_._u.base.group.lgroup.content->refcnt.add (1);
385
386 *this = src_;
387
388 return 0;
389 }
390
data()391 void *zmq::msg_t::data ()
392 {
393 // Check the validity of the message.
394 zmq_assert (check ());
395
396 switch (_u.base.type) {
397 case type_vsm:
398 return _u.vsm.data;
399 case type_lmsg:
400 return _u.lmsg.content->data;
401 case type_cmsg:
402 return _u.cmsg.data;
403 case type_zclmsg:
404 return _u.zclmsg.content->data;
405 default:
406 zmq_assert (false);
407 return NULL;
408 }
409 }
410
size() const411 size_t zmq::msg_t::size () const
412 {
413 // Check the validity of the message.
414 zmq_assert (check ());
415
416 switch (_u.base.type) {
417 case type_vsm:
418 return _u.vsm.size;
419 case type_lmsg:
420 return _u.lmsg.content->size;
421 case type_zclmsg:
422 return _u.zclmsg.content->size;
423 case type_cmsg:
424 return _u.cmsg.size;
425 default:
426 zmq_assert (false);
427 return 0;
428 }
429 }
430
shrink(size_t new_size_)431 void zmq::msg_t::shrink (size_t new_size_)
432 {
433 // Check the validity of the message.
434 zmq_assert (check ());
435 zmq_assert (new_size_ <= size ());
436
437 switch (_u.base.type) {
438 case type_vsm:
439 _u.vsm.size = static_cast<unsigned char> (new_size_);
440 break;
441 case type_lmsg:
442 _u.lmsg.content->size = new_size_;
443 break;
444 case type_zclmsg:
445 _u.zclmsg.content->size = new_size_;
446 break;
447 case type_cmsg:
448 _u.cmsg.size = new_size_;
449 break;
450 default:
451 zmq_assert (false);
452 }
453 }
454
flags() const455 unsigned char zmq::msg_t::flags () const
456 {
457 return _u.base.flags;
458 }
459
set_flags(unsigned char flags_)460 void zmq::msg_t::set_flags (unsigned char flags_)
461 {
462 _u.base.flags |= flags_;
463 }
464
reset_flags(unsigned char flags_)465 void zmq::msg_t::reset_flags (unsigned char flags_)
466 {
467 _u.base.flags &= ~flags_;
468 }
469
metadata() const470 zmq::metadata_t *zmq::msg_t::metadata () const
471 {
472 return _u.base.metadata;
473 }
474
set_metadata(zmq::metadata_t * metadata_)475 void zmq::msg_t::set_metadata (zmq::metadata_t *metadata_)
476 {
477 assert (metadata_ != NULL);
478 assert (_u.base.metadata == NULL);
479 metadata_->add_ref ();
480 _u.base.metadata = metadata_;
481 }
482
reset_metadata()483 void zmq::msg_t::reset_metadata ()
484 {
485 if (_u.base.metadata) {
486 if (_u.base.metadata->drop_ref ()) {
487 LIBZMQ_DELETE (_u.base.metadata);
488 }
489 _u.base.metadata = NULL;
490 }
491 }
492
is_routing_id() const493 bool zmq::msg_t::is_routing_id () const
494 {
495 return (_u.base.flags & routing_id) == routing_id;
496 }
497
is_credential() const498 bool zmq::msg_t::is_credential () const
499 {
500 return (_u.base.flags & credential) == credential;
501 }
502
is_delimiter() const503 bool zmq::msg_t::is_delimiter () const
504 {
505 return _u.base.type == type_delimiter;
506 }
507
is_vsm() const508 bool zmq::msg_t::is_vsm () const
509 {
510 return _u.base.type == type_vsm;
511 }
512
is_cmsg() const513 bool zmq::msg_t::is_cmsg () const
514 {
515 return _u.base.type == type_cmsg;
516 }
517
is_lmsg() const518 bool zmq::msg_t::is_lmsg () const
519 {
520 return _u.base.type == type_lmsg;
521 }
522
is_zcmsg() const523 bool zmq::msg_t::is_zcmsg () const
524 {
525 return _u.base.type == type_zclmsg;
526 }
527
is_join() const528 bool zmq::msg_t::is_join () const
529 {
530 return _u.base.type == type_join;
531 }
532
is_leave() const533 bool zmq::msg_t::is_leave () const
534 {
535 return _u.base.type == type_leave;
536 }
537
is_ping() const538 bool zmq::msg_t::is_ping () const
539 {
540 return (_u.base.flags & CMD_TYPE_MASK) == ping;
541 }
542
is_pong() const543 bool zmq::msg_t::is_pong () const
544 {
545 return (_u.base.flags & CMD_TYPE_MASK) == pong;
546 }
547
is_close_cmd() const548 bool zmq::msg_t::is_close_cmd () const
549 {
550 return (_u.base.flags & CMD_TYPE_MASK) == close_cmd;
551 }
552
command_body_size() const553 size_t zmq::msg_t::command_body_size () const
554 {
555 if (this->is_ping () || this->is_pong ())
556 return this->size () - ping_cmd_name_size;
557 else if (!(this->flags () & msg_t::command)
558 && (this->is_subscribe () || this->is_cancel ()))
559 return this->size ();
560 else if (this->is_subscribe ())
561 return this->size () - sub_cmd_name_size;
562 else if (this->is_cancel ())
563 return this->size () - cancel_cmd_name_size;
564
565 return 0;
566 }
567
command_body()568 void *zmq::msg_t::command_body ()
569 {
570 unsigned char *data = NULL;
571
572 if (this->is_ping () || this->is_pong ())
573 data =
574 static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size;
575 // With inproc, command flag is not set for sub/cancel
576 else if (!(this->flags () & msg_t::command)
577 && (this->is_subscribe () || this->is_cancel ()))
578 data = static_cast<unsigned char *> (this->data ());
579 else if (this->is_subscribe ())
580 data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size;
581 else if (this->is_cancel ())
582 data =
583 static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size;
584
585 return data;
586 }
587
add_refs(int refs_)588 void zmq::msg_t::add_refs (int refs_)
589 {
590 zmq_assert (refs_ >= 0);
591
592 // Operation not supported for messages with metadata.
593 zmq_assert (_u.base.metadata == NULL);
594
595 // No copies required.
596 if (!refs_)
597 return;
598
599 // VSMs, CMSGS and delimiters can be copied straight away. The only
600 // message type that needs special care are long messages.
601 if (_u.base.type == type_lmsg || is_zcmsg ()) {
602 if (_u.base.flags & msg_t::shared)
603 refcnt ()->add (refs_);
604 else {
605 refcnt ()->set (refs_ + 1);
606 _u.base.flags |= msg_t::shared;
607 }
608 }
609 }
610
rm_refs(int refs_)611 bool zmq::msg_t::rm_refs (int refs_)
612 {
613 zmq_assert (refs_ >= 0);
614
615 // Operation not supported for messages with metadata.
616 zmq_assert (_u.base.metadata == NULL);
617
618 // No copies required.
619 if (!refs_)
620 return true;
621
622 // If there's only one reference close the message.
623 if ((_u.base.type != type_zclmsg && _u.base.type != type_lmsg)
624 || !(_u.base.flags & msg_t::shared)) {
625 close ();
626 return false;
627 }
628
629 // The only message type that needs special care are long and zcopy messages.
630 if (_u.base.type == type_lmsg && !_u.lmsg.content->refcnt.sub (refs_)) {
631 // We used "placement new" operator to initialize the reference
632 // counter so we call the destructor explicitly now.
633 _u.lmsg.content->refcnt.~atomic_counter_t ();
634
635 if (_u.lmsg.content->ffn)
636 _u.lmsg.content->ffn (_u.lmsg.content->data, _u.lmsg.content->hint);
637 free (_u.lmsg.content);
638
639 return false;
640 }
641
642 if (is_zcmsg () && !_u.zclmsg.content->refcnt.sub (refs_)) {
643 // storage for rfcnt is provided externally
644 if (_u.zclmsg.content->ffn) {
645 _u.zclmsg.content->ffn (_u.zclmsg.content->data,
646 _u.zclmsg.content->hint);
647 }
648
649 return false;
650 }
651
652 return true;
653 }
654
get_routing_id() const655 uint32_t zmq::msg_t::get_routing_id () const
656 {
657 return _u.base.routing_id;
658 }
659
set_routing_id(uint32_t routing_id_)660 int zmq::msg_t::set_routing_id (uint32_t routing_id_)
661 {
662 if (routing_id_) {
663 _u.base.routing_id = routing_id_;
664 return 0;
665 }
666 errno = EINVAL;
667 return -1;
668 }
669
reset_routing_id()670 int zmq::msg_t::reset_routing_id ()
671 {
672 _u.base.routing_id = 0;
673 return 0;
674 }
675
group() const676 const char *zmq::msg_t::group () const
677 {
678 if (_u.base.group.type == group_type_long)
679 return _u.base.group.lgroup.content->group;
680 return _u.base.group.sgroup.group;
681 }
682
set_group(const char * group_)683 int zmq::msg_t::set_group (const char *group_)
684 {
685 size_t length = strnlen (group_, ZMQ_GROUP_MAX_LENGTH);
686
687 return set_group (group_, length);
688 }
689
set_group(const char * group_,size_t length_)690 int zmq::msg_t::set_group (const char *group_, size_t length_)
691 {
692 if (length_ > ZMQ_GROUP_MAX_LENGTH) {
693 errno = EINVAL;
694 return -1;
695 }
696
697 if (length_ > 14) {
698 _u.base.group.lgroup.type = group_type_long;
699 _u.base.group.lgroup.content =
700 (long_group_t *) malloc (sizeof (long_group_t));
701 assert (_u.base.group.lgroup.content);
702 new (&_u.base.group.lgroup.content->refcnt) zmq::atomic_counter_t ();
703 _u.base.group.lgroup.content->refcnt.set (1);
704 strncpy (_u.base.group.lgroup.content->group, group_, length_);
705 _u.base.group.lgroup.content->group[length_] = '\0';
706 } else {
707 strncpy (_u.base.group.sgroup.group, group_, length_);
708 _u.base.group.sgroup.group[length_] = '\0';
709 }
710
711 return 0;
712 }
713
refcnt()714 zmq::atomic_counter_t *zmq::msg_t::refcnt ()
715 {
716 switch (_u.base.type) {
717 case type_lmsg:
718 return &_u.lmsg.content->refcnt;
719 case type_zclmsg:
720 return &_u.zclmsg.content->refcnt;
721 default:
722 zmq_assert (false);
723 return NULL;
724 }
725 }
726