1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <string.h>
32 
33 #include "xpub.hpp"
34 #include "pipe.hpp"
35 #include "err.hpp"
36 #include "msg.hpp"
37 #include "macros.hpp"
38 #include "generic_mtrie_impl.hpp"
39 
xpub_t(class ctx_t * parent_,uint32_t tid_,int sid_)40 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
41     socket_base_t (parent_, tid_, sid_),
42     _verbose_subs (false),
43     _verbose_unsubs (false),
44     _more_send (false),
45     _more_recv (false),
46     _process_subscribe (false),
47     _only_first_subscribe (false),
48     _lossy (true),
49     _manual (false),
50     _send_last_pipe (false),
51     _pending_pipes (),
52     _welcome_msg ()
53 {
54     _last_pipe = NULL;
55     options.type = ZMQ_XPUB;
56     _welcome_msg.init ();
57 }
58 
~xpub_t()59 zmq::xpub_t::~xpub_t ()
60 {
61     _welcome_msg.close ();
62     for (std::deque<metadata_t *>::iterator it = _pending_metadata.begin (),
63                                             end = _pending_metadata.end ();
64          it != end; ++it)
65         if (*it && (*it)->drop_ref ())
66             LIBZMQ_DELETE (*it);
67 }
68 
xattach_pipe(pipe_t * pipe_,bool subscribe_to_all_,bool locally_initiated_)69 void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
70                                 bool subscribe_to_all_,
71                                 bool locally_initiated_)
72 {
73     LIBZMQ_UNUSED (locally_initiated_);
74 
75     zmq_assert (pipe_);
76     _dist.attach (pipe_);
77 
78     //  If subscribe_to_all_ is specified, the caller would like to subscribe
79     //  to all data on this pipe, implicitly.
80     if (subscribe_to_all_)
81         _subscriptions.add (NULL, 0, pipe_);
82 
83     // if welcome message exists, send a copy of it
84     if (_welcome_msg.size () > 0) {
85         msg_t copy;
86         copy.init ();
87         const int rc = copy.copy (_welcome_msg);
88         errno_assert (rc == 0);
89         const bool ok = pipe_->write (&copy);
90         zmq_assert (ok);
91         pipe_->flush ();
92     }
93 
94     //  The pipe is active when attached. Let's read the subscriptions from
95     //  it, if any.
96     xread_activated (pipe_);
97 }
98 
xread_activated(pipe_t * pipe_)99 void zmq::xpub_t::xread_activated (pipe_t *pipe_)
100 {
101     //  There are some subscriptions waiting. Let's process them.
102     msg_t msg;
103     while (pipe_->read (&msg)) {
104         metadata_t *metadata = msg.metadata ();
105         unsigned char *msg_data = static_cast<unsigned char *> (msg.data ()),
106                       *data = NULL;
107         size_t size = 0;
108         bool subscribe = false;
109         bool is_subscribe_or_cancel = false;
110         bool notify = false;
111 
112         const bool first_part = !_more_recv;
113         _more_recv = (msg.flags () & msg_t::more) != 0;
114 
115         if (first_part || _process_subscribe) {
116             //  Apply the subscription to the trie
117             if (msg.is_subscribe () || msg.is_cancel ()) {
118                 data = static_cast<unsigned char *> (msg.command_body ());
119                 size = msg.command_body_size ();
120                 subscribe = msg.is_subscribe ();
121                 is_subscribe_or_cancel = true;
122             } else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
123                 data = msg_data + 1;
124                 size = msg.size () - 1;
125                 subscribe = *msg_data == 1;
126                 is_subscribe_or_cancel = true;
127             }
128         }
129 
130         if (first_part)
131             _process_subscribe =
132               !_only_first_subscribe || is_subscribe_or_cancel;
133 
134         if (is_subscribe_or_cancel) {
135             if (_manual) {
136                 // Store manual subscription to use on termination
137                 if (!subscribe)
138                     _manual_subscriptions.rm (data, size, pipe_);
139                 else
140                     _manual_subscriptions.add (data, size, pipe_);
141 
142                 _pending_pipes.push_back (pipe_);
143             } else {
144                 if (!subscribe) {
145                     const mtrie_t::rm_result rm_result =
146                       _subscriptions.rm (data, size, pipe_);
147                     //  TODO reconsider what to do if rm_result == mtrie_t::not_found
148                     notify =
149                       rm_result != mtrie_t::values_remain || _verbose_unsubs;
150                 } else {
151                     const bool first_added =
152                       _subscriptions.add (data, size, pipe_);
153                     notify = first_added || _verbose_subs;
154                 }
155             }
156 
157             //  If the request was a new subscription, or the subscription
158             //  was removed, or verbose mode or manual mode are enabled, store it
159             //  so that it can be passed to the user on next recv call.
160             if (_manual || (options.type == ZMQ_XPUB && notify)) {
161                 //  ZMTP 3.1 hack: we need to support sub/cancel commands, but
162                 //  we can't give them back to userspace as it would be an API
163                 //  breakage since the payload of the message is completely
164                 //  different. Manually craft an old-style message instead.
165                 //  Although with other transports it would be possible to simply
166                 //  reuse the same buffer and prefix a 0/1 byte to the topic, with
167                 //  inproc the subscribe/cancel command string is not present in
168                 //  the message, so this optimization is not possible.
169                 //  The pushback makes a copy of the data array anyway, so the
170                 //  number of buffer copies does not change.
171                 blob_t notification (size + 1);
172                 if (subscribe)
173                     *notification.data () = 1;
174                 else
175                     *notification.data () = 0;
176                 memcpy (notification.data () + 1, data, size);
177 
178                 _pending_data.push_back (ZMQ_MOVE (notification));
179                 if (metadata)
180                     metadata->add_ref ();
181                 _pending_metadata.push_back (metadata);
182                 _pending_flags.push_back (0);
183             }
184         } else if (options.type != ZMQ_PUB) {
185             //  Process user message coming upstream from xsub socket,
186             //  but not if the type is PUB, which never processes user
187             //  messages
188             _pending_data.push_back (blob_t (msg_data, msg.size ()));
189             if (metadata)
190                 metadata->add_ref ();
191             _pending_metadata.push_back (metadata);
192             _pending_flags.push_back (msg.flags ());
193         }
194 
195         msg.close ();
196     }
197 }
198 
xwrite_activated(pipe_t * pipe_)199 void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
200 {
201     _dist.activated (pipe_);
202 }
203 
xsetsockopt(int option_,const void * optval_,size_t optvallen_)204 int zmq::xpub_t::xsetsockopt (int option_,
205                               const void *optval_,
206                               size_t optvallen_)
207 {
208     if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
209         || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP
210         || option_ == ZMQ_XPUB_MANUAL || option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
211         if (optvallen_ != sizeof (int)
212             || *static_cast<const int *> (optval_) < 0) {
213             errno = EINVAL;
214             return -1;
215         }
216         if (option_ == ZMQ_XPUB_VERBOSE) {
217             _verbose_subs = (*static_cast<const int *> (optval_) != 0);
218             _verbose_unsubs = false;
219         } else if (option_ == ZMQ_XPUB_VERBOSER) {
220             _verbose_subs = (*static_cast<const int *> (optval_) != 0);
221             _verbose_unsubs = _verbose_subs;
222         } else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) {
223             _manual = (*static_cast<const int *> (optval_) != 0);
224             _send_last_pipe = _manual;
225         } else if (option_ == ZMQ_XPUB_NODROP)
226             _lossy = (*static_cast<const int *> (optval_) == 0);
227         else if (option_ == ZMQ_XPUB_MANUAL)
228             _manual = (*static_cast<const int *> (optval_) != 0);
229         else if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE)
230             _only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
231     } else if (option_ == ZMQ_SUBSCRIBE && _manual) {
232         if (_last_pipe != NULL)
233             _subscriptions.add ((unsigned char *) optval_, optvallen_,
234                                 _last_pipe);
235     } else if (option_ == ZMQ_UNSUBSCRIBE && _manual) {
236         if (_last_pipe != NULL)
237             _subscriptions.rm ((unsigned char *) optval_, optvallen_,
238                                _last_pipe);
239     } else if (option_ == ZMQ_XPUB_WELCOME_MSG) {
240         _welcome_msg.close ();
241 
242         if (optvallen_ > 0) {
243             const int rc = _welcome_msg.init_size (optvallen_);
244             errno_assert (rc == 0);
245 
246             unsigned char *data =
247               static_cast<unsigned char *> (_welcome_msg.data ());
248             memcpy (data, optval_, optvallen_);
249         } else
250             _welcome_msg.init ();
251     } else {
252         errno = EINVAL;
253         return -1;
254     }
255     return 0;
256 }
257 
stub(zmq::mtrie_t::prefix_t data_,size_t size_,void * arg_)258 static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
259 {
260     LIBZMQ_UNUSED (data_);
261     LIBZMQ_UNUSED (size_);
262     LIBZMQ_UNUSED (arg_);
263 }
264 
xpipe_terminated(pipe_t * pipe_)265 void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
266 {
267     if (_manual) {
268         //  Remove the pipe from the trie and send corresponding manual
269         //  unsubscriptions upstream.
270         _manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
271         //  Remove pipe without actually sending the message as it was taken
272         //  care of by the manual call above. subscriptions is the real mtrie,
273         //  so the pipe must be removed from there or it will be left over.
274         _subscriptions.rm (pipe_, stub, static_cast<void *> (NULL), false);
275     } else {
276         //  Remove the pipe from the trie. If there are topics that nobody
277         //  is interested in anymore, send corresponding unsubscriptions
278         //  upstream.
279         _subscriptions.rm (pipe_, send_unsubscription, this, !_verbose_unsubs);
280     }
281 
282     _dist.pipe_terminated (pipe_);
283 }
284 
mark_as_matching(pipe_t * pipe_,xpub_t * self_)285 void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
286 {
287     self_->_dist.match (pipe_);
288 }
289 
mark_last_pipe_as_matching(pipe_t * pipe_,xpub_t * self_)290 void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_)
291 {
292     if (self_->_last_pipe == pipe_)
293         self_->_dist.match (pipe_);
294 }
295 
xsend(msg_t * msg_)296 int zmq::xpub_t::xsend (msg_t *msg_)
297 {
298     const bool msg_more = (msg_->flags () & msg_t::more) != 0;
299 
300     //  For the first part of multi-part message, find the matching pipes.
301     if (!_more_send) {
302         // Ensure nothing from previous failed attempt to send is left matched
303         _dist.unmatch ();
304 
305         if (unlikely (_manual && _last_pipe && _send_last_pipe)) {
306             _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
307                                   msg_->size (), mark_last_pipe_as_matching,
308                                   this);
309             _last_pipe = NULL;
310         } else
311             _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
312                                   msg_->size (), mark_as_matching, this);
313         // If inverted matching is used, reverse the selection now
314         if (options.invert_matching) {
315             _dist.reverse_match ();
316         }
317     }
318 
319     int rc = -1; //  Assume we fail
320     if (_lossy || _dist.check_hwm ()) {
321         if (_dist.send_to_matching (msg_) == 0) {
322             //  If we are at the end of multi-part message we can mark
323             //  all the pipes as non-matching.
324             if (!msg_more)
325                 _dist.unmatch ();
326             _more_send = msg_more;
327             rc = 0; //  Yay, sent successfully
328         }
329     } else
330         errno = EAGAIN;
331     return rc;
332 }
333 
xhas_out()334 bool zmq::xpub_t::xhas_out ()
335 {
336     return _dist.has_out ();
337 }
338 
xrecv(msg_t * msg_)339 int zmq::xpub_t::xrecv (msg_t *msg_)
340 {
341     //  If there is at least one
342     if (_pending_data.empty ()) {
343         errno = EAGAIN;
344         return -1;
345     }
346 
347     // User is reading a message, set last_pipe and remove it from the deque
348     if (_manual && !_pending_pipes.empty ()) {
349         _last_pipe = _pending_pipes.front ();
350         _pending_pipes.pop_front ();
351     }
352 
353     int rc = msg_->close ();
354     errno_assert (rc == 0);
355     rc = msg_->init_size (_pending_data.front ().size ());
356     errno_assert (rc == 0);
357     memcpy (msg_->data (), _pending_data.front ().data (),
358             _pending_data.front ().size ());
359 
360     // set metadata only if there is some
361     if (metadata_t *metadata = _pending_metadata.front ()) {
362         msg_->set_metadata (metadata);
363         // Remove ref corresponding to vector placement
364         metadata->drop_ref ();
365     }
366 
367     msg_->set_flags (_pending_flags.front ());
368     _pending_data.pop_front ();
369     _pending_metadata.pop_front ();
370     _pending_flags.pop_front ();
371     return 0;
372 }
373 
xhas_in()374 bool zmq::xpub_t::xhas_in ()
375 {
376     return !_pending_data.empty ();
377 }
378 
send_unsubscription(zmq::mtrie_t::prefix_t data_,size_t size_,xpub_t * self_)379 void zmq::xpub_t::send_unsubscription (zmq::mtrie_t::prefix_t data_,
380                                        size_t size_,
381                                        xpub_t *self_)
382 {
383     if (self_->options.type != ZMQ_PUB) {
384         //  Place the unsubscription to the queue of pending (un)subscriptions
385         //  to be retrieved by the user later on.
386         blob_t unsub (size_ + 1);
387         *unsub.data () = 0;
388         if (size_ > 0)
389             memcpy (unsub.data () + 1, data_, size_);
390         self_->_pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub));
391         self_->_pending_metadata.push_back (NULL);
392         self_->_pending_flags.push_back (0);
393 
394         if (self_->_manual) {
395             self_->_last_pipe = NULL;
396             self_->_pending_pipes.push_back (NULL);
397         }
398     }
399 }
400