1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <string.h>
32 
33 #include "macros.hpp"
34 #include "dish.hpp"
35 #include "err.hpp"
36 
dish_t(class ctx_t * parent_,uint32_t tid_,int sid_)37 zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38     socket_base_t (parent_, tid_, sid_, true),
39     _has_message (false)
40 {
41     options.type = ZMQ_DISH;
42 
43     //  When socket is being closed down we don't want to wait till pending
44     //  subscription commands are sent to the wire.
45     options.linger.store (0);
46 
47     const int rc = _message.init ();
48     errno_assert (rc == 0);
49 }
50 
~dish_t()51 zmq::dish_t::~dish_t ()
52 {
53     const int rc = _message.close ();
54     errno_assert (rc == 0);
55 }
56 
xattach_pipe(pipe_t * pipe_,bool subscribe_to_all_,bool locally_initiated_)57 void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
58                                 bool subscribe_to_all_,
59                                 bool locally_initiated_)
60 {
61     LIBZMQ_UNUSED (subscribe_to_all_);
62     LIBZMQ_UNUSED (locally_initiated_);
63 
64     zmq_assert (pipe_);
65     _fq.attach (pipe_);
66     _dist.attach (pipe_);
67 
68     //  Send all the cached subscriptions to the new upstream peer.
69     send_subscriptions (pipe_);
70 }
71 
xread_activated(pipe_t * pipe_)72 void zmq::dish_t::xread_activated (pipe_t *pipe_)
73 {
74     _fq.activated (pipe_);
75 }
76 
xwrite_activated(pipe_t * pipe_)77 void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
78 {
79     _dist.activated (pipe_);
80 }
81 
xpipe_terminated(pipe_t * pipe_)82 void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
83 {
84     _fq.pipe_terminated (pipe_);
85     _dist.pipe_terminated (pipe_);
86 }
87 
xhiccuped(pipe_t * pipe_)88 void zmq::dish_t::xhiccuped (pipe_t *pipe_)
89 {
90     //  Send all the cached subscriptions to the hiccuped pipe.
91     send_subscriptions (pipe_);
92 }
93 
xjoin(const char * group_)94 int zmq::dish_t::xjoin (const char *group_)
95 {
96     const std::string group = std::string (group_);
97 
98     if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
99         errno = EINVAL;
100         return -1;
101     }
102 
103     //  User cannot join same group twice
104     if (!_subscriptions.insert (group).second) {
105         errno = EINVAL;
106         return -1;
107     }
108 
109     msg_t msg;
110     int rc = msg.init_join ();
111     errno_assert (rc == 0);
112 
113     rc = msg.set_group (group_);
114     errno_assert (rc == 0);
115 
116     int err = 0;
117     rc = _dist.send_to_all (&msg);
118     if (rc != 0)
119         err = errno;
120     const int rc2 = msg.close ();
121     errno_assert (rc2 == 0);
122     if (rc != 0)
123         errno = err;
124     return rc;
125 }
126 
xleave(const char * group_)127 int zmq::dish_t::xleave (const char *group_)
128 {
129     const std::string group = std::string (group_);
130 
131     if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
132         errno = EINVAL;
133         return -1;
134     }
135 
136     if (0 == _subscriptions.erase (group)) {
137         errno = EINVAL;
138         return -1;
139     }
140 
141     msg_t msg;
142     int rc = msg.init_leave ();
143     errno_assert (rc == 0);
144 
145     rc = msg.set_group (group_);
146     errno_assert (rc == 0);
147 
148     int err = 0;
149     rc = _dist.send_to_all (&msg);
150     if (rc != 0)
151         err = errno;
152     const int rc2 = msg.close ();
153     errno_assert (rc2 == 0);
154     if (rc != 0)
155         errno = err;
156     return rc;
157 }
158 
xsend(msg_t * msg_)159 int zmq::dish_t::xsend (msg_t *msg_)
160 {
161     LIBZMQ_UNUSED (msg_);
162     errno = ENOTSUP;
163     return -1;
164 }
165 
xhas_out()166 bool zmq::dish_t::xhas_out ()
167 {
168     //  Subscription can be added/removed anytime.
169     return true;
170 }
171 
xrecv(msg_t * msg_)172 int zmq::dish_t::xrecv (msg_t *msg_)
173 {
174     //  If there's already a message prepared by a previous call to zmq_poll,
175     //  return it straight ahead.
176     if (_has_message) {
177         const int rc = msg_->move (_message);
178         errno_assert (rc == 0);
179         _has_message = false;
180         return 0;
181     }
182 
183     return xxrecv (msg_);
184 }
185 
xxrecv(msg_t * msg_)186 int zmq::dish_t::xxrecv (msg_t *msg_)
187 {
188     do {
189         //  Get a message using fair queueing algorithm.
190         const int rc = _fq.recv (msg_);
191 
192         //  If there's no message available, return immediately.
193         //  The same when error occurs.
194         if (rc != 0)
195             return -1;
196 
197         //  Skip non matching messages
198     } while (0 == _subscriptions.count (std::string (msg_->group ())));
199 
200     //  Found a matching message
201     return 0;
202 }
203 
xhas_in()204 bool zmq::dish_t::xhas_in ()
205 {
206     //  If there's already a message prepared by a previous call to zmq_poll,
207     //  return straight ahead.
208     if (_has_message)
209         return true;
210 
211     const int rc = xxrecv (&_message);
212     if (rc != 0) {
213         errno_assert (errno == EAGAIN);
214         return false;
215     }
216 
217     //  Matching message found
218     _has_message = true;
219     return true;
220 }
221 
send_subscriptions(pipe_t * pipe_)222 void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
223 {
224     for (subscriptions_t::iterator it = _subscriptions.begin (),
225                                    end = _subscriptions.end ();
226          it != end; ++it) {
227         msg_t msg;
228         int rc = msg.init_join ();
229         errno_assert (rc == 0);
230 
231         rc = msg.set_group (it->c_str ());
232         errno_assert (rc == 0);
233 
234         //  Send it to the pipe.
235         pipe_->write (&msg);
236     }
237 
238     pipe_->flush ();
239 }
240 
dish_session_t(io_thread_t * io_thread_,bool connect_,socket_base_t * socket_,const options_t & options_,address_t * addr_)241 zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
242                                      bool connect_,
243                                      socket_base_t *socket_,
244                                      const options_t &options_,
245                                      address_t *addr_) :
246     session_base_t (io_thread_, connect_, socket_, options_, addr_),
247     _state (group)
248 {
249 }
250 
~dish_session_t()251 zmq::dish_session_t::~dish_session_t ()
252 {
253 }
254 
push_msg(msg_t * msg_)255 int zmq::dish_session_t::push_msg (msg_t *msg_)
256 {
257     if (_state == group) {
258         if ((msg_->flags () & msg_t::more) != msg_t::more) {
259             errno = EFAULT;
260             return -1;
261         }
262 
263         if (msg_->size () > ZMQ_GROUP_MAX_LENGTH) {
264             errno = EFAULT;
265             return -1;
266         }
267 
268         _group_msg = *msg_;
269         _state = body;
270 
271         const int rc = msg_->init ();
272         errno_assert (rc == 0);
273         return 0;
274     }
275     const char *group_setting = msg_->group ();
276     int rc;
277     if (group_setting[0] != 0)
278         goto has_group;
279 
280     //  Set the message group
281     rc = msg_->set_group (static_cast<char *> (_group_msg.data ()),
282                           _group_msg.size ());
283     errno_assert (rc == 0);
284 
285     //  We set the group, so we don't need the group_msg anymore
286     rc = _group_msg.close ();
287     errno_assert (rc == 0);
288 has_group:
289     //  Thread safe socket doesn't support multipart messages
290     if ((msg_->flags () & msg_t::more) == msg_t::more) {
291         errno = EFAULT;
292         return -1;
293     }
294 
295     //  Push message to dish socket
296     rc = session_base_t::push_msg (msg_);
297 
298     if (rc == 0)
299         _state = group;
300 
301     return rc;
302 }
303 
pull_msg(msg_t * msg_)304 int zmq::dish_session_t::pull_msg (msg_t *msg_)
305 {
306     int rc = session_base_t::pull_msg (msg_);
307 
308     if (rc != 0)
309         return rc;
310 
311     if (!msg_->is_join () && !msg_->is_leave ())
312         return rc;
313 
314     const int group_length = static_cast<int> (strlen (msg_->group ()));
315 
316     msg_t command;
317     int offset;
318 
319     if (msg_->is_join ()) {
320         rc = command.init_size (group_length + 5);
321         errno_assert (rc == 0);
322         offset = 5;
323         memcpy (command.data (), "\4JOIN", 5);
324     } else {
325         rc = command.init_size (group_length + 6);
326         errno_assert (rc == 0);
327         offset = 6;
328         memcpy (command.data (), "\5LEAVE", 6);
329     }
330 
331     command.set_flags (msg_t::command);
332     char *command_data = static_cast<char *> (command.data ());
333 
334     //  Copy the group
335     memcpy (command_data + offset, msg_->group (), group_length);
336 
337     //  Close the join message
338     rc = msg_->close ();
339     errno_assert (rc == 0);
340 
341     *msg_ = command;
342 
343     return 0;
344 }
345 
reset()346 void zmq::dish_session_t::reset ()
347 {
348     session_base_t::reset ();
349     _state = group;
350 }
351