1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include <string.h>
32
33 #include "xpub.hpp"
34 #include "pipe.hpp"
35 #include "err.hpp"
36 #include "msg.hpp"
37 #include "macros.hpp"
38 #include "generic_mtrie_impl.hpp"
39
xpub_t(class ctx_t * parent_,uint32_t tid_,int sid_)40 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
41 socket_base_t (parent_, tid_, sid_),
42 _verbose_subs (false),
43 _verbose_unsubs (false),
44 _more_send (false),
45 _more_recv (false),
46 _process_subscribe (false),
47 _only_first_subscribe (false),
48 _lossy (true),
49 _manual (false),
50 _send_last_pipe (false),
51 _pending_pipes (),
52 _welcome_msg ()
53 {
54 _last_pipe = NULL;
55 options.type = ZMQ_XPUB;
56 _welcome_msg.init ();
57 }
58
~xpub_t()59 zmq::xpub_t::~xpub_t ()
60 {
61 _welcome_msg.close ();
62 for (std::deque<metadata_t *>::iterator it = _pending_metadata.begin (),
63 end = _pending_metadata.end ();
64 it != end; ++it)
65 if (*it && (*it)->drop_ref ())
66 LIBZMQ_DELETE (*it);
67 }
68
xattach_pipe(pipe_t * pipe_,bool subscribe_to_all_,bool locally_initiated_)69 void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
70 bool subscribe_to_all_,
71 bool locally_initiated_)
72 {
73 LIBZMQ_UNUSED (locally_initiated_);
74
75 zmq_assert (pipe_);
76 _dist.attach (pipe_);
77
78 // If subscribe_to_all_ is specified, the caller would like to subscribe
79 // to all data on this pipe, implicitly.
80 if (subscribe_to_all_)
81 _subscriptions.add (NULL, 0, pipe_);
82
83 // if welcome message exists, send a copy of it
84 if (_welcome_msg.size () > 0) {
85 msg_t copy;
86 copy.init ();
87 const int rc = copy.copy (_welcome_msg);
88 errno_assert (rc == 0);
89 const bool ok = pipe_->write (©);
90 zmq_assert (ok);
91 pipe_->flush ();
92 }
93
94 // The pipe is active when attached. Let's read the subscriptions from
95 // it, if any.
96 xread_activated (pipe_);
97 }
98
xread_activated(pipe_t * pipe_)99 void zmq::xpub_t::xread_activated (pipe_t *pipe_)
100 {
101 // There are some subscriptions waiting. Let's process them.
102 msg_t msg;
103 while (pipe_->read (&msg)) {
104 metadata_t *metadata = msg.metadata ();
105 unsigned char *msg_data = static_cast<unsigned char *> (msg.data ()),
106 *data = NULL;
107 size_t size = 0;
108 bool subscribe = false;
109 bool is_subscribe_or_cancel = false;
110 bool notify = false;
111
112 const bool first_part = !_more_recv;
113 _more_recv = (msg.flags () & msg_t::more) != 0;
114
115 if (first_part || _process_subscribe) {
116 // Apply the subscription to the trie
117 if (msg.is_subscribe () || msg.is_cancel ()) {
118 data = static_cast<unsigned char *> (msg.command_body ());
119 size = msg.command_body_size ();
120 subscribe = msg.is_subscribe ();
121 is_subscribe_or_cancel = true;
122 } else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
123 data = msg_data + 1;
124 size = msg.size () - 1;
125 subscribe = *msg_data == 1;
126 is_subscribe_or_cancel = true;
127 }
128 }
129
130 if (first_part)
131 _process_subscribe =
132 !_only_first_subscribe || is_subscribe_or_cancel;
133
134 if (is_subscribe_or_cancel) {
135 if (_manual) {
136 // Store manual subscription to use on termination
137 if (!subscribe)
138 _manual_subscriptions.rm (data, size, pipe_);
139 else
140 _manual_subscriptions.add (data, size, pipe_);
141
142 _pending_pipes.push_back (pipe_);
143 } else {
144 if (!subscribe) {
145 const mtrie_t::rm_result rm_result =
146 _subscriptions.rm (data, size, pipe_);
147 // TODO reconsider what to do if rm_result == mtrie_t::not_found
148 notify =
149 rm_result != mtrie_t::values_remain || _verbose_unsubs;
150 } else {
151 const bool first_added =
152 _subscriptions.add (data, size, pipe_);
153 notify = first_added || _verbose_subs;
154 }
155 }
156
157 // If the request was a new subscription, or the subscription
158 // was removed, or verbose mode or manual mode are enabled, store it
159 // so that it can be passed to the user on next recv call.
160 if (_manual || (options.type == ZMQ_XPUB && notify)) {
161 // ZMTP 3.1 hack: we need to support sub/cancel commands, but
162 // we can't give them back to userspace as it would be an API
163 // breakage since the payload of the message is completely
164 // different. Manually craft an old-style message instead.
165 // Although with other transports it would be possible to simply
166 // reuse the same buffer and prefix a 0/1 byte to the topic, with
167 // inproc the subscribe/cancel command string is not present in
168 // the message, so this optimization is not possible.
169 // The pushback makes a copy of the data array anyway, so the
170 // number of buffer copies does not change.
171 blob_t notification (size + 1);
172 if (subscribe)
173 *notification.data () = 1;
174 else
175 *notification.data () = 0;
176 memcpy (notification.data () + 1, data, size);
177
178 _pending_data.push_back (ZMQ_MOVE (notification));
179 if (metadata)
180 metadata->add_ref ();
181 _pending_metadata.push_back (metadata);
182 _pending_flags.push_back (0);
183 }
184 } else if (options.type != ZMQ_PUB) {
185 // Process user message coming upstream from xsub socket,
186 // but not if the type is PUB, which never processes user
187 // messages
188 _pending_data.push_back (blob_t (msg_data, msg.size ()));
189 if (metadata)
190 metadata->add_ref ();
191 _pending_metadata.push_back (metadata);
192 _pending_flags.push_back (msg.flags ());
193 }
194
195 msg.close ();
196 }
197 }
198
xwrite_activated(pipe_t * pipe_)199 void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
200 {
201 _dist.activated (pipe_);
202 }
203
xsetsockopt(int option_,const void * optval_,size_t optvallen_)204 int zmq::xpub_t::xsetsockopt (int option_,
205 const void *optval_,
206 size_t optvallen_)
207 {
208 if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
209 || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP
210 || option_ == ZMQ_XPUB_MANUAL || option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
211 if (optvallen_ != sizeof (int)
212 || *static_cast<const int *> (optval_) < 0) {
213 errno = EINVAL;
214 return -1;
215 }
216 if (option_ == ZMQ_XPUB_VERBOSE) {
217 _verbose_subs = (*static_cast<const int *> (optval_) != 0);
218 _verbose_unsubs = false;
219 } else if (option_ == ZMQ_XPUB_VERBOSER) {
220 _verbose_subs = (*static_cast<const int *> (optval_) != 0);
221 _verbose_unsubs = _verbose_subs;
222 } else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) {
223 _manual = (*static_cast<const int *> (optval_) != 0);
224 _send_last_pipe = _manual;
225 } else if (option_ == ZMQ_XPUB_NODROP)
226 _lossy = (*static_cast<const int *> (optval_) == 0);
227 else if (option_ == ZMQ_XPUB_MANUAL)
228 _manual = (*static_cast<const int *> (optval_) != 0);
229 else if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE)
230 _only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
231 } else if (option_ == ZMQ_SUBSCRIBE && _manual) {
232 if (_last_pipe != NULL)
233 _subscriptions.add ((unsigned char *) optval_, optvallen_,
234 _last_pipe);
235 } else if (option_ == ZMQ_UNSUBSCRIBE && _manual) {
236 if (_last_pipe != NULL)
237 _subscriptions.rm ((unsigned char *) optval_, optvallen_,
238 _last_pipe);
239 } else if (option_ == ZMQ_XPUB_WELCOME_MSG) {
240 _welcome_msg.close ();
241
242 if (optvallen_ > 0) {
243 const int rc = _welcome_msg.init_size (optvallen_);
244 errno_assert (rc == 0);
245
246 unsigned char *data =
247 static_cast<unsigned char *> (_welcome_msg.data ());
248 memcpy (data, optval_, optvallen_);
249 } else
250 _welcome_msg.init ();
251 } else {
252 errno = EINVAL;
253 return -1;
254 }
255 return 0;
256 }
257
stub(zmq::mtrie_t::prefix_t data_,size_t size_,void * arg_)258 static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
259 {
260 LIBZMQ_UNUSED (data_);
261 LIBZMQ_UNUSED (size_);
262 LIBZMQ_UNUSED (arg_);
263 }
264
xpipe_terminated(pipe_t * pipe_)265 void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
266 {
267 if (_manual) {
268 // Remove the pipe from the trie and send corresponding manual
269 // unsubscriptions upstream.
270 _manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
271 // Remove pipe without actually sending the message as it was taken
272 // care of by the manual call above. subscriptions is the real mtrie,
273 // so the pipe must be removed from there or it will be left over.
274 _subscriptions.rm (pipe_, stub, static_cast<void *> (NULL), false);
275 } else {
276 // Remove the pipe from the trie. If there are topics that nobody
277 // is interested in anymore, send corresponding unsubscriptions
278 // upstream.
279 _subscriptions.rm (pipe_, send_unsubscription, this, !_verbose_unsubs);
280 }
281
282 _dist.pipe_terminated (pipe_);
283 }
284
mark_as_matching(pipe_t * pipe_,xpub_t * self_)285 void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
286 {
287 self_->_dist.match (pipe_);
288 }
289
mark_last_pipe_as_matching(pipe_t * pipe_,xpub_t * self_)290 void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_)
291 {
292 if (self_->_last_pipe == pipe_)
293 self_->_dist.match (pipe_);
294 }
295
xsend(msg_t * msg_)296 int zmq::xpub_t::xsend (msg_t *msg_)
297 {
298 const bool msg_more = (msg_->flags () & msg_t::more) != 0;
299
300 // For the first part of multi-part message, find the matching pipes.
301 if (!_more_send) {
302 // Ensure nothing from previous failed attempt to send is left matched
303 _dist.unmatch ();
304
305 if (unlikely (_manual && _last_pipe && _send_last_pipe)) {
306 _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
307 msg_->size (), mark_last_pipe_as_matching,
308 this);
309 _last_pipe = NULL;
310 } else
311 _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
312 msg_->size (), mark_as_matching, this);
313 // If inverted matching is used, reverse the selection now
314 if (options.invert_matching) {
315 _dist.reverse_match ();
316 }
317 }
318
319 int rc = -1; // Assume we fail
320 if (_lossy || _dist.check_hwm ()) {
321 if (_dist.send_to_matching (msg_) == 0) {
322 // If we are at the end of multi-part message we can mark
323 // all the pipes as non-matching.
324 if (!msg_more)
325 _dist.unmatch ();
326 _more_send = msg_more;
327 rc = 0; // Yay, sent successfully
328 }
329 } else
330 errno = EAGAIN;
331 return rc;
332 }
333
xhas_out()334 bool zmq::xpub_t::xhas_out ()
335 {
336 return _dist.has_out ();
337 }
338
xrecv(msg_t * msg_)339 int zmq::xpub_t::xrecv (msg_t *msg_)
340 {
341 // If there is at least one
342 if (_pending_data.empty ()) {
343 errno = EAGAIN;
344 return -1;
345 }
346
347 // User is reading a message, set last_pipe and remove it from the deque
348 if (_manual && !_pending_pipes.empty ()) {
349 _last_pipe = _pending_pipes.front ();
350 _pending_pipes.pop_front ();
351 }
352
353 int rc = msg_->close ();
354 errno_assert (rc == 0);
355 rc = msg_->init_size (_pending_data.front ().size ());
356 errno_assert (rc == 0);
357 memcpy (msg_->data (), _pending_data.front ().data (),
358 _pending_data.front ().size ());
359
360 // set metadata only if there is some
361 if (metadata_t *metadata = _pending_metadata.front ()) {
362 msg_->set_metadata (metadata);
363 // Remove ref corresponding to vector placement
364 metadata->drop_ref ();
365 }
366
367 msg_->set_flags (_pending_flags.front ());
368 _pending_data.pop_front ();
369 _pending_metadata.pop_front ();
370 _pending_flags.pop_front ();
371 return 0;
372 }
373
xhas_in()374 bool zmq::xpub_t::xhas_in ()
375 {
376 return !_pending_data.empty ();
377 }
378
send_unsubscription(zmq::mtrie_t::prefix_t data_,size_t size_,xpub_t * self_)379 void zmq::xpub_t::send_unsubscription (zmq::mtrie_t::prefix_t data_,
380 size_t size_,
381 xpub_t *self_)
382 {
383 if (self_->options.type != ZMQ_PUB) {
384 // Place the unsubscription to the queue of pending (un)subscriptions
385 // to be retrieved by the user later on.
386 blob_t unsub (size_ + 1);
387 *unsub.data () = 0;
388 if (size_ > 0)
389 memcpy (unsub.data () + 1, data_, size_);
390 self_->_pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub));
391 self_->_pending_metadata.push_back (NULL);
392 self_->_pending_flags.push_back (0);
393
394 if (self_->_manual) {
395 self_->_last_pipe = NULL;
396 self_->_pending_pipes.push_back (NULL);
397 }
398 }
399 }
400