1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #ifndef __ZMQ_MSG_HPP_INCLUDE__
31 #define __ZMQ_MSG_HPP_INCLUDE__
32
33 #include <stddef.h>
34 #include <stdio.h>
35
36 #include "config.hpp"
37 #include "err.hpp"
38 #include "fd.hpp"
39 #include "atomic_counter.hpp"
40 #include "metadata.hpp"
41
42 // bits 2-5
43 #define CMD_TYPE_MASK 0x1c
44
45 // Signature for free function to deallocate the message content.
46 // Note that it has to be declared as "C" so that it is the same as
47 // zmq_free_fn defined in zmq.h.
48 extern "C" {
49 typedef void(msg_free_fn) (void *data_, void *hint_);
50 }
51
52 namespace zmq
53 {
54 // Note that this structure needs to be explicitly constructed
55 // (init functions) and destructed (close function).
56
57 static const char cancel_cmd_name[] = "\6CANCEL";
58 static const char sub_cmd_name[] = "\x9SUBSCRIBE";
59
60 class msg_t
61 {
62 public:
63 // Shared message buffer. Message data are either allocated in one
64 // continuous block along with this structure - thus avoiding one
65 // malloc/free pair or they are stored in user-supplied memory.
66 // In the latter case, ffn member stores pointer to the function to be
67 // used to deallocate the data. If the buffer is actually shared (there
68 // are at least 2 references to it) refcount member contains number of
69 // references.
70 struct content_t
71 {
72 void *data;
73 size_t size;
74 msg_free_fn *ffn;
75 void *hint;
76 zmq::atomic_counter_t refcnt;
77 };
78
79 // Message flags.
80 enum
81 {
82 more = 1, // Followed by more parts
83 command = 2, // Command frame (see ZMTP spec)
84 // Command types, use only bits 2-5 and compare with ==, not bitwise,
85 // a command can never be of more that one type at the same time
86 ping = 4,
87 pong = 8,
88 subscribe = 12,
89 cancel = 16,
90 close_cmd = 20,
91 credential = 32,
92 routing_id = 64,
93 shared = 128
94 };
95
96 bool check () const;
97 int init ();
98
99 int init (void *data_,
100 size_t size_,
101 msg_free_fn *ffn_,
102 void *hint_,
103 content_t *content_ = NULL);
104
105 int init_size (size_t size_);
106 int init_buffer (const void *buf_, size_t size_);
107 int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_);
108 int init_external_storage (content_t *content_,
109 void *data_,
110 size_t size_,
111 msg_free_fn *ffn_,
112 void *hint_);
113 int init_delimiter ();
114 int init_join ();
115 int init_leave ();
116 int init_subscribe (const size_t size_, const unsigned char *topic);
117 int init_cancel (const size_t size_, const unsigned char *topic);
118 int close ();
119 int move (msg_t &src_);
120 int copy (msg_t &src_);
121 void *data ();
122 size_t size () const;
123 unsigned char flags () const;
124 void set_flags (unsigned char flags_);
125 void reset_flags (unsigned char flags_);
126 metadata_t *metadata () const;
127 void set_metadata (metadata_t *metadata_);
128 void reset_metadata ();
129 bool is_routing_id () const;
130 bool is_credential () const;
131 bool is_delimiter () const;
132 bool is_join () const;
133 bool is_leave () const;
134 bool is_ping () const;
135 bool is_pong () const;
136 bool is_close_cmd () const;
137
138 // These are called on each message received by the session_base class,
139 // so get them inlined to avoid the overhead of 2 function calls per msg
is_subscribe() const140 bool is_subscribe () const
141 {
142 return (_u.base.flags & CMD_TYPE_MASK) == subscribe;
143 }
144
is_cancel() const145 bool is_cancel () const
146 {
147 return (_u.base.flags & CMD_TYPE_MASK) == cancel;
148 }
149
150 size_t command_body_size () const;
151 void *command_body ();
152 bool is_vsm () const;
153 bool is_cmsg () const;
154 bool is_lmsg () const;
155 bool is_zcmsg () const;
156 uint32_t get_routing_id () const;
157 int set_routing_id (uint32_t routing_id_);
158 int reset_routing_id ();
159 const char *group () const;
160 int set_group (const char *group_);
161 int set_group (const char *, size_t length_);
162
163 // After calling this function you can copy the message in POD-style
164 // refs_ times. No need to call copy.
165 void add_refs (int refs_);
166
167 // Removes references previously added by add_refs. If the number of
168 // references drops to 0, the message is closed and false is returned.
169 bool rm_refs (int refs_);
170
171 void shrink (size_t new_size_);
172
173 // Size in bytes of the largest message that is still copied around
174 // rather than being reference-counted.
175 enum
176 {
177 msg_t_size = 64
178 };
179 enum
180 {
181 max_vsm_size =
182 msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t))
183 };
184 enum
185 {
186 ping_cmd_name_size = 5, // 4PING
187 cancel_cmd_name_size = 7, // 6CANCEL
188 sub_cmd_name_size = 10 // 9SUBSCRIBE
189 };
190
191 private:
192 zmq::atomic_counter_t *refcnt ();
193
194 // Different message types.
195 enum type_t
196 {
197 type_min = 101,
198 // VSM messages store the content in the message itself
199 type_vsm = 101,
200 // LMSG messages store the content in malloc-ed memory
201 type_lmsg = 102,
202 // Delimiter messages are used in envelopes
203 type_delimiter = 103,
204 // CMSG messages point to constant data
205 type_cmsg = 104,
206
207 // zero-copy LMSG message for v2_decoder
208 type_zclmsg = 105,
209
210 // Join message for radio_dish
211 type_join = 106,
212
213 // Leave message for radio_dish
214 type_leave = 107,
215
216 type_max = 107
217 };
218
219 enum group_type_t
220 {
221 group_type_short,
222 group_type_long
223 };
224
225 struct long_group_t
226 {
227 char group[ZMQ_GROUP_MAX_LENGTH + 1];
228 atomic_counter_t refcnt;
229 };
230
231 union group_t
232 {
233 unsigned char type;
234 struct
235 {
236 unsigned char type;
237 char group[15];
238 } sgroup;
239 struct
240 {
241 unsigned char type;
242 long_group_t *content;
243 } lgroup;
244 };
245
246 // Note that fields shared between different message types are not
247 // moved to the parent class (msg_t). This way we get tighter packing
248 // of the data. Shared fields can be accessed via 'base' member of
249 // the union.
250 union
251 {
252 struct
253 {
254 metadata_t *metadata;
255 unsigned char unused[msg_t_size
256 - (sizeof (metadata_t *) + 2
257 + sizeof (uint32_t) + sizeof (group_t))];
258 unsigned char type;
259 unsigned char flags;
260 uint32_t routing_id;
261 group_t group;
262 } base;
263 struct
264 {
265 metadata_t *metadata;
266 unsigned char data[max_vsm_size];
267 unsigned char size;
268 unsigned char type;
269 unsigned char flags;
270 uint32_t routing_id;
271 group_t group;
272 } vsm;
273 struct
274 {
275 metadata_t *metadata;
276 content_t *content;
277 unsigned char
278 unused[msg_t_size
279 - (sizeof (metadata_t *) + sizeof (content_t *) + 2
280 + sizeof (uint32_t) + sizeof (group_t))];
281 unsigned char type;
282 unsigned char flags;
283 uint32_t routing_id;
284 group_t group;
285 } lmsg;
286 struct
287 {
288 metadata_t *metadata;
289 content_t *content;
290 unsigned char
291 unused[msg_t_size
292 - (sizeof (metadata_t *) + sizeof (content_t *) + 2
293 + sizeof (uint32_t) + sizeof (group_t))];
294 unsigned char type;
295 unsigned char flags;
296 uint32_t routing_id;
297 group_t group;
298 } zclmsg;
299 struct
300 {
301 metadata_t *metadata;
302 void *data;
303 size_t size;
304 unsigned char unused[msg_t_size
305 - (sizeof (metadata_t *) + sizeof (void *)
306 + sizeof (size_t) + 2 + sizeof (uint32_t)
307 + sizeof (group_t))];
308 unsigned char type;
309 unsigned char flags;
310 uint32_t routing_id;
311 group_t group;
312 } cmsg;
313 struct
314 {
315 metadata_t *metadata;
316 unsigned char unused[msg_t_size
317 - (sizeof (metadata_t *) + 2
318 + sizeof (uint32_t) + sizeof (group_t))];
319 unsigned char type;
320 unsigned char flags;
321 uint32_t routing_id;
322 group_t group;
323 } delimiter;
324 } _u;
325 };
326
close_and_return(zmq::msg_t * msg_,int echo_)327 inline int close_and_return (zmq::msg_t *msg_, int echo_)
328 {
329 // Since we abort on close failure we preserve errno for success case.
330 const int err = errno;
331 const int rc = msg_->close ();
332 errno_assert (rc == 0);
333 errno = err;
334 return echo_;
335 }
336
close_and_return(zmq::msg_t msg_[],int count_,int echo_)337 inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_)
338 {
339 for (int i = 0; i < count_; i++)
340 close_and_return (&msg_[i], 0);
341 return echo_;
342 }
343 }
344
345 #endif
346