1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_MSG_HPP_INCLUDE__
31 #define __ZMQ_MSG_HPP_INCLUDE__
32 
33 #include <stddef.h>
34 #include <stdio.h>
35 
36 #include "config.hpp"
37 #include "err.hpp"
38 #include "fd.hpp"
39 #include "atomic_counter.hpp"
40 #include "metadata.hpp"
41 
42 //  bits 2-5
43 #define CMD_TYPE_MASK 0x1c
44 
45 //  Signature for free function to deallocate the message content.
46 //  Note that it has to be declared as "C" so that it is the same as
47 //  zmq_free_fn defined in zmq.h.
48 extern "C" {
49 typedef void(msg_free_fn) (void *data_, void *hint_);
50 }
51 
52 namespace zmq
53 {
54 //  Note that this structure needs to be explicitly constructed
55 //  (init functions) and destructed (close function).
56 
57 static const char cancel_cmd_name[] = "\6CANCEL";
58 static const char sub_cmd_name[] = "\x9SUBSCRIBE";
59 
60 class msg_t
61 {
62   public:
63     //  Shared message buffer. Message data are either allocated in one
64     //  continuous block along with this structure - thus avoiding one
65     //  malloc/free pair or they are stored in user-supplied memory.
66     //  In the latter case, ffn member stores pointer to the function to be
67     //  used to deallocate the data. If the buffer is actually shared (there
68     //  are at least 2 references to it) refcount member contains number of
69     //  references.
70     struct content_t
71     {
72         void *data;
73         size_t size;
74         msg_free_fn *ffn;
75         void *hint;
76         zmq::atomic_counter_t refcnt;
77     };
78 
79     //  Message flags.
80     enum
81     {
82         more = 1,    //  Followed by more parts
83         command = 2, //  Command frame (see ZMTP spec)
84         //  Command types, use only bits 2-5 and compare with ==, not bitwise,
85         //  a command can never be of more that one type at the same time
86         ping = 4,
87         pong = 8,
88         subscribe = 12,
89         cancel = 16,
90         close_cmd = 20,
91         credential = 32,
92         routing_id = 64,
93         shared = 128
94     };
95 
96     bool check () const;
97     int init ();
98 
99     int init (void *data_,
100               size_t size_,
101               msg_free_fn *ffn_,
102               void *hint_,
103               content_t *content_ = NULL);
104 
105     int init_size (size_t size_);
106     int init_buffer (const void *buf_, size_t size_);
107     int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_);
108     int init_external_storage (content_t *content_,
109                                void *data_,
110                                size_t size_,
111                                msg_free_fn *ffn_,
112                                void *hint_);
113     int init_delimiter ();
114     int init_join ();
115     int init_leave ();
116     int init_subscribe (const size_t size_, const unsigned char *topic);
117     int init_cancel (const size_t size_, const unsigned char *topic);
118     int close ();
119     int move (msg_t &src_);
120     int copy (msg_t &src_);
121     void *data ();
122     size_t size () const;
123     unsigned char flags () const;
124     void set_flags (unsigned char flags_);
125     void reset_flags (unsigned char flags_);
126     metadata_t *metadata () const;
127     void set_metadata (metadata_t *metadata_);
128     void reset_metadata ();
129     bool is_routing_id () const;
130     bool is_credential () const;
131     bool is_delimiter () const;
132     bool is_join () const;
133     bool is_leave () const;
134     bool is_ping () const;
135     bool is_pong () const;
136     bool is_close_cmd () const;
137 
138     //  These are called on each message received by the session_base class,
139     //  so get them inlined to avoid the overhead of 2 function calls per msg
is_subscribe() const140     bool is_subscribe () const
141     {
142         return (_u.base.flags & CMD_TYPE_MASK) == subscribe;
143     }
144 
is_cancel() const145     bool is_cancel () const
146     {
147         return (_u.base.flags & CMD_TYPE_MASK) == cancel;
148     }
149 
150     size_t command_body_size () const;
151     void *command_body ();
152     bool is_vsm () const;
153     bool is_cmsg () const;
154     bool is_lmsg () const;
155     bool is_zcmsg () const;
156     uint32_t get_routing_id () const;
157     int set_routing_id (uint32_t routing_id_);
158     int reset_routing_id ();
159     const char *group () const;
160     int set_group (const char *group_);
161     int set_group (const char *, size_t length_);
162 
163     //  After calling this function you can copy the message in POD-style
164     //  refs_ times. No need to call copy.
165     void add_refs (int refs_);
166 
167     //  Removes references previously added by add_refs. If the number of
168     //  references drops to 0, the message is closed and false is returned.
169     bool rm_refs (int refs_);
170 
171     void shrink (size_t new_size_);
172 
173     //  Size in bytes of the largest message that is still copied around
174     //  rather than being reference-counted.
175     enum
176     {
177         msg_t_size = 64
178     };
179     enum
180     {
181         max_vsm_size =
182           msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t))
183     };
184     enum
185     {
186         ping_cmd_name_size = 5,   // 4PING
187         cancel_cmd_name_size = 7, // 6CANCEL
188         sub_cmd_name_size = 10    // 9SUBSCRIBE
189     };
190 
191   private:
192     zmq::atomic_counter_t *refcnt ();
193 
194     //  Different message types.
195     enum type_t
196     {
197         type_min = 101,
198         //  VSM messages store the content in the message itself
199         type_vsm = 101,
200         //  LMSG messages store the content in malloc-ed memory
201         type_lmsg = 102,
202         //  Delimiter messages are used in envelopes
203         type_delimiter = 103,
204         //  CMSG messages point to constant data
205         type_cmsg = 104,
206 
207         // zero-copy LMSG message for v2_decoder
208         type_zclmsg = 105,
209 
210         //  Join message for radio_dish
211         type_join = 106,
212 
213         //  Leave message for radio_dish
214         type_leave = 107,
215 
216         type_max = 107
217     };
218 
219     enum group_type_t
220     {
221         group_type_short,
222         group_type_long
223     };
224 
225     struct long_group_t
226     {
227         char group[ZMQ_GROUP_MAX_LENGTH + 1];
228         atomic_counter_t refcnt;
229     };
230 
231     union group_t
232     {
233         unsigned char type;
234         struct
235         {
236             unsigned char type;
237             char group[15];
238         } sgroup;
239         struct
240         {
241             unsigned char type;
242             long_group_t *content;
243         } lgroup;
244     };
245 
246     //  Note that fields shared between different message types are not
247     //  moved to the parent class (msg_t). This way we get tighter packing
248     //  of the data. Shared fields can be accessed via 'base' member of
249     //  the union.
250     union
251     {
252         struct
253         {
254             metadata_t *metadata;
255             unsigned char unused[msg_t_size
256                                  - (sizeof (metadata_t *) + 2
257                                     + sizeof (uint32_t) + sizeof (group_t))];
258             unsigned char type;
259             unsigned char flags;
260             uint32_t routing_id;
261             group_t group;
262         } base;
263         struct
264         {
265             metadata_t *metadata;
266             unsigned char data[max_vsm_size];
267             unsigned char size;
268             unsigned char type;
269             unsigned char flags;
270             uint32_t routing_id;
271             group_t group;
272         } vsm;
273         struct
274         {
275             metadata_t *metadata;
276             content_t *content;
277             unsigned char
278               unused[msg_t_size
279                      - (sizeof (metadata_t *) + sizeof (content_t *) + 2
280                         + sizeof (uint32_t) + sizeof (group_t))];
281             unsigned char type;
282             unsigned char flags;
283             uint32_t routing_id;
284             group_t group;
285         } lmsg;
286         struct
287         {
288             metadata_t *metadata;
289             content_t *content;
290             unsigned char
291               unused[msg_t_size
292                      - (sizeof (metadata_t *) + sizeof (content_t *) + 2
293                         + sizeof (uint32_t) + sizeof (group_t))];
294             unsigned char type;
295             unsigned char flags;
296             uint32_t routing_id;
297             group_t group;
298         } zclmsg;
299         struct
300         {
301             metadata_t *metadata;
302             void *data;
303             size_t size;
304             unsigned char unused[msg_t_size
305                                  - (sizeof (metadata_t *) + sizeof (void *)
306                                     + sizeof (size_t) + 2 + sizeof (uint32_t)
307                                     + sizeof (group_t))];
308             unsigned char type;
309             unsigned char flags;
310             uint32_t routing_id;
311             group_t group;
312         } cmsg;
313         struct
314         {
315             metadata_t *metadata;
316             unsigned char unused[msg_t_size
317                                  - (sizeof (metadata_t *) + 2
318                                     + sizeof (uint32_t) + sizeof (group_t))];
319             unsigned char type;
320             unsigned char flags;
321             uint32_t routing_id;
322             group_t group;
323         } delimiter;
324     } _u;
325 };
326 
close_and_return(zmq::msg_t * msg_,int echo_)327 inline int close_and_return (zmq::msg_t *msg_, int echo_)
328 {
329     // Since we abort on close failure we preserve errno for success case.
330     const int err = errno;
331     const int rc = msg_->close ();
332     errno_assert (rc == 0);
333     errno = err;
334     return echo_;
335 }
336 
close_and_return(zmq::msg_t msg_[],int count_,int echo_)337 inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_)
338 {
339     for (int i = 0; i < count_; i++)
340         close_and_return (&msg_[i], 0);
341     return echo_;
342 }
343 }
344 
345 #endif
346