1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include <string.h>
32
33 #include "macros.hpp"
34 #include "dish.hpp"
35 #include "err.hpp"
36
dish_t(class ctx_t * parent_,uint32_t tid_,int sid_)37 zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38 socket_base_t (parent_, tid_, sid_, true),
39 _has_message (false)
40 {
41 options.type = ZMQ_DISH;
42
43 // When socket is being closed down we don't want to wait till pending
44 // subscription commands are sent to the wire.
45 options.linger.store (0);
46
47 const int rc = _message.init ();
48 errno_assert (rc == 0);
49 }
50
~dish_t()51 zmq::dish_t::~dish_t ()
52 {
53 const int rc = _message.close ();
54 errno_assert (rc == 0);
55 }
56
xattach_pipe(pipe_t * pipe_,bool subscribe_to_all_,bool locally_initiated_)57 void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
58 bool subscribe_to_all_,
59 bool locally_initiated_)
60 {
61 LIBZMQ_UNUSED (subscribe_to_all_);
62 LIBZMQ_UNUSED (locally_initiated_);
63
64 zmq_assert (pipe_);
65 _fq.attach (pipe_);
66 _dist.attach (pipe_);
67
68 // Send all the cached subscriptions to the new upstream peer.
69 send_subscriptions (pipe_);
70 }
71
xread_activated(pipe_t * pipe_)72 void zmq::dish_t::xread_activated (pipe_t *pipe_)
73 {
74 _fq.activated (pipe_);
75 }
76
xwrite_activated(pipe_t * pipe_)77 void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
78 {
79 _dist.activated (pipe_);
80 }
81
xpipe_terminated(pipe_t * pipe_)82 void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
83 {
84 _fq.pipe_terminated (pipe_);
85 _dist.pipe_terminated (pipe_);
86 }
87
xhiccuped(pipe_t * pipe_)88 void zmq::dish_t::xhiccuped (pipe_t *pipe_)
89 {
90 // Send all the cached subscriptions to the hiccuped pipe.
91 send_subscriptions (pipe_);
92 }
93
xjoin(const char * group_)94 int zmq::dish_t::xjoin (const char *group_)
95 {
96 const std::string group = std::string (group_);
97
98 if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
99 errno = EINVAL;
100 return -1;
101 }
102
103 // User cannot join same group twice
104 if (!_subscriptions.insert (group).second) {
105 errno = EINVAL;
106 return -1;
107 }
108
109 msg_t msg;
110 int rc = msg.init_join ();
111 errno_assert (rc == 0);
112
113 rc = msg.set_group (group_);
114 errno_assert (rc == 0);
115
116 int err = 0;
117 rc = _dist.send_to_all (&msg);
118 if (rc != 0)
119 err = errno;
120 const int rc2 = msg.close ();
121 errno_assert (rc2 == 0);
122 if (rc != 0)
123 errno = err;
124 return rc;
125 }
126
xleave(const char * group_)127 int zmq::dish_t::xleave (const char *group_)
128 {
129 const std::string group = std::string (group_);
130
131 if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
132 errno = EINVAL;
133 return -1;
134 }
135
136 if (0 == _subscriptions.erase (group)) {
137 errno = EINVAL;
138 return -1;
139 }
140
141 msg_t msg;
142 int rc = msg.init_leave ();
143 errno_assert (rc == 0);
144
145 rc = msg.set_group (group_);
146 errno_assert (rc == 0);
147
148 int err = 0;
149 rc = _dist.send_to_all (&msg);
150 if (rc != 0)
151 err = errno;
152 const int rc2 = msg.close ();
153 errno_assert (rc2 == 0);
154 if (rc != 0)
155 errno = err;
156 return rc;
157 }
158
xsend(msg_t * msg_)159 int zmq::dish_t::xsend (msg_t *msg_)
160 {
161 LIBZMQ_UNUSED (msg_);
162 errno = ENOTSUP;
163 return -1;
164 }
165
xhas_out()166 bool zmq::dish_t::xhas_out ()
167 {
168 // Subscription can be added/removed anytime.
169 return true;
170 }
171
xrecv(msg_t * msg_)172 int zmq::dish_t::xrecv (msg_t *msg_)
173 {
174 // If there's already a message prepared by a previous call to zmq_poll,
175 // return it straight ahead.
176 if (_has_message) {
177 const int rc = msg_->move (_message);
178 errno_assert (rc == 0);
179 _has_message = false;
180 return 0;
181 }
182
183 return xxrecv (msg_);
184 }
185
xxrecv(msg_t * msg_)186 int zmq::dish_t::xxrecv (msg_t *msg_)
187 {
188 do {
189 // Get a message using fair queueing algorithm.
190 const int rc = _fq.recv (msg_);
191
192 // If there's no message available, return immediately.
193 // The same when error occurs.
194 if (rc != 0)
195 return -1;
196
197 // Skip non matching messages
198 } while (0 == _subscriptions.count (std::string (msg_->group ())));
199
200 // Found a matching message
201 return 0;
202 }
203
xhas_in()204 bool zmq::dish_t::xhas_in ()
205 {
206 // If there's already a message prepared by a previous call to zmq_poll,
207 // return straight ahead.
208 if (_has_message)
209 return true;
210
211 const int rc = xxrecv (&_message);
212 if (rc != 0) {
213 errno_assert (errno == EAGAIN);
214 return false;
215 }
216
217 // Matching message found
218 _has_message = true;
219 return true;
220 }
221
send_subscriptions(pipe_t * pipe_)222 void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
223 {
224 for (subscriptions_t::iterator it = _subscriptions.begin (),
225 end = _subscriptions.end ();
226 it != end; ++it) {
227 msg_t msg;
228 int rc = msg.init_join ();
229 errno_assert (rc == 0);
230
231 rc = msg.set_group (it->c_str ());
232 errno_assert (rc == 0);
233
234 // Send it to the pipe.
235 pipe_->write (&msg);
236 }
237
238 pipe_->flush ();
239 }
240
dish_session_t(io_thread_t * io_thread_,bool connect_,socket_base_t * socket_,const options_t & options_,address_t * addr_)241 zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
242 bool connect_,
243 socket_base_t *socket_,
244 const options_t &options_,
245 address_t *addr_) :
246 session_base_t (io_thread_, connect_, socket_, options_, addr_),
247 _state (group)
248 {
249 }
250
~dish_session_t()251 zmq::dish_session_t::~dish_session_t ()
252 {
253 }
254
push_msg(msg_t * msg_)255 int zmq::dish_session_t::push_msg (msg_t *msg_)
256 {
257 if (_state == group) {
258 if ((msg_->flags () & msg_t::more) != msg_t::more) {
259 errno = EFAULT;
260 return -1;
261 }
262
263 if (msg_->size () > ZMQ_GROUP_MAX_LENGTH) {
264 errno = EFAULT;
265 return -1;
266 }
267
268 _group_msg = *msg_;
269 _state = body;
270
271 const int rc = msg_->init ();
272 errno_assert (rc == 0);
273 return 0;
274 }
275 const char *group_setting = msg_->group ();
276 int rc;
277 if (group_setting[0] != 0)
278 goto has_group;
279
280 // Set the message group
281 rc = msg_->set_group (static_cast<char *> (_group_msg.data ()),
282 _group_msg.size ());
283 errno_assert (rc == 0);
284
285 // We set the group, so we don't need the group_msg anymore
286 rc = _group_msg.close ();
287 errno_assert (rc == 0);
288 has_group:
289 // Thread safe socket doesn't support multipart messages
290 if ((msg_->flags () & msg_t::more) == msg_t::more) {
291 errno = EFAULT;
292 return -1;
293 }
294
295 // Push message to dish socket
296 rc = session_base_t::push_msg (msg_);
297
298 if (rc == 0)
299 _state = group;
300
301 return rc;
302 }
303
pull_msg(msg_t * msg_)304 int zmq::dish_session_t::pull_msg (msg_t *msg_)
305 {
306 int rc = session_base_t::pull_msg (msg_);
307
308 if (rc != 0)
309 return rc;
310
311 if (!msg_->is_join () && !msg_->is_leave ())
312 return rc;
313
314 const int group_length = static_cast<int> (strlen (msg_->group ()));
315
316 msg_t command;
317 int offset;
318
319 if (msg_->is_join ()) {
320 rc = command.init_size (group_length + 5);
321 errno_assert (rc == 0);
322 offset = 5;
323 memcpy (command.data (), "\4JOIN", 5);
324 } else {
325 rc = command.init_size (group_length + 6);
326 errno_assert (rc == 0);
327 offset = 6;
328 memcpy (command.data (), "\5LEAVE", 6);
329 }
330
331 command.set_flags (msg_t::command);
332 char *command_data = static_cast<char *> (command.data ());
333
334 // Copy the group
335 memcpy (command_data + offset, msg_->group (), group_length);
336
337 // Close the join message
338 rc = msg_->close ();
339 errno_assert (rc == 0);
340
341 *msg_ = command;
342
343 return 0;
344 }
345
reset()346 void zmq::dish_session_t::reset ()
347 {
348 session_base_t::reset ();
349 _state = group;
350 }
351