1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "dist.hpp"
32 #include "pipe.hpp"
33 #include "err.hpp"
34 #include "msg.hpp"
35 #include "likely.hpp"
36 
dist_t()37 zmq::dist_t::dist_t () :
38     _matching (0),
39     _active (0),
40     _eligible (0),
41     _more (false)
42 {
43 }
44 
~dist_t()45 zmq::dist_t::~dist_t ()
46 {
47     zmq_assert (_pipes.empty ());
48 }
49 
attach(pipe_t * pipe_)50 void zmq::dist_t::attach (pipe_t *pipe_)
51 {
52     //  If we are in the middle of sending a message, we'll add new pipe
53     //  into the list of eligible pipes. Otherwise we add it to the list
54     //  of active pipes.
55     if (_more) {
56         _pipes.push_back (pipe_);
57         _pipes.swap (_eligible, _pipes.size () - 1);
58         _eligible++;
59     } else {
60         _pipes.push_back (pipe_);
61         _pipes.swap (_active, _pipes.size () - 1);
62         _active++;
63         _eligible++;
64     }
65 }
66 
match(pipe_t * pipe_)67 void zmq::dist_t::match (pipe_t *pipe_)
68 {
69     //  If pipe is already matching do nothing.
70     if (_pipes.index (pipe_) < _matching)
71         return;
72 
73     //  If the pipe isn't eligible, ignore it.
74     if (_pipes.index (pipe_) >= _eligible)
75         return;
76 
77     //  Mark the pipe as matching.
78     _pipes.swap (_pipes.index (pipe_), _matching);
79     _matching++;
80 }
81 
reverse_match()82 void zmq::dist_t::reverse_match ()
83 {
84     const pipes_t::size_type prev_matching = _matching;
85 
86     // Reset matching to 0
87     unmatch ();
88 
89     // Mark all matching pipes as not matching and vice-versa.
90     // To do this, push all pipes that are eligible but not
91     // matched - i.e. between "matching" and "eligible" -
92     // to the beginning of the queue.
93     for (pipes_t::size_type i = prev_matching; i < _eligible; ++i) {
94         _pipes.swap (i, _matching++);
95     }
96 }
97 
unmatch()98 void zmq::dist_t::unmatch ()
99 {
100     _matching = 0;
101 }
102 
pipe_terminated(pipe_t * pipe_)103 void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
104 {
105     //  Remove the pipe from the list; adjust number of matching, active and/or
106     //  eligible pipes accordingly.
107     if (_pipes.index (pipe_) < _matching) {
108         _pipes.swap (_pipes.index (pipe_), _matching - 1);
109         _matching--;
110     }
111     if (_pipes.index (pipe_) < _active) {
112         _pipes.swap (_pipes.index (pipe_), _active - 1);
113         _active--;
114     }
115     if (_pipes.index (pipe_) < _eligible) {
116         _pipes.swap (_pipes.index (pipe_), _eligible - 1);
117         _eligible--;
118     }
119 
120     _pipes.erase (pipe_);
121 }
122 
activated(pipe_t * pipe_)123 void zmq::dist_t::activated (pipe_t *pipe_)
124 {
125     //  Move the pipe from passive to eligible state.
126     if (_eligible < _pipes.size ()) {
127         _pipes.swap (_pipes.index (pipe_), _eligible);
128         _eligible++;
129     }
130 
131     //  If there's no message being sent at the moment, move it to
132     //  the active state.
133     if (!_more && _active < _pipes.size ()) {
134         _pipes.swap (_eligible - 1, _active);
135         _active++;
136     }
137 }
138 
send_to_all(msg_t * msg_)139 int zmq::dist_t::send_to_all (msg_t *msg_)
140 {
141     _matching = _active;
142     return send_to_matching (msg_);
143 }
144 
send_to_matching(msg_t * msg_)145 int zmq::dist_t::send_to_matching (msg_t *msg_)
146 {
147     //  Is this end of a multipart message?
148     const bool msg_more = (msg_->flags () & msg_t::more) != 0;
149 
150     //  Push the message to matching pipes.
151     distribute (msg_);
152 
153     //  If multipart message is fully sent, activate all the eligible pipes.
154     if (!msg_more)
155         _active = _eligible;
156 
157     _more = msg_more;
158 
159     return 0;
160 }
161 
distribute(msg_t * msg_)162 void zmq::dist_t::distribute (msg_t *msg_)
163 {
164     //  If there are no matching pipes available, simply drop the message.
165     if (_matching == 0) {
166         int rc = msg_->close ();
167         errno_assert (rc == 0);
168         rc = msg_->init ();
169         errno_assert (rc == 0);
170         return;
171     }
172 
173     if (msg_->is_vsm ()) {
174         for (pipes_t::size_type i = 0; i < _matching;) {
175             if (!write (_pipes[i], msg_)) {
176                 //  Use same index again because entry will have been removed.
177             } else {
178                 ++i;
179             }
180         }
181         int rc = msg_->init ();
182         errno_assert (rc == 0);
183         return;
184     }
185 
186     //  Add matching-1 references to the message. We already hold one reference,
187     //  that's why -1.
188     msg_->add_refs (static_cast<int> (_matching) - 1);
189 
190     //  Push copy of the message to each matching pipe.
191     int failed = 0;
192     for (pipes_t::size_type i = 0; i < _matching;) {
193         if (!write (_pipes[i], msg_)) {
194             ++failed;
195             //  Use same index again because entry will have been removed.
196         } else {
197             ++i;
198         }
199     }
200     if (unlikely (failed))
201         msg_->rm_refs (failed);
202 
203     //  Detach the original message from the data buffer. Note that we don't
204     //  close the message. That's because we've already used all the references.
205     const int rc = msg_->init ();
206     errno_assert (rc == 0);
207 }
208 
has_out()209 bool zmq::dist_t::has_out ()
210 {
211     return true;
212 }
213 
write(pipe_t * pipe_,msg_t * msg_)214 bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
215 {
216     if (!pipe_->write (msg_)) {
217         _pipes.swap (_pipes.index (pipe_), _matching - 1);
218         _matching--;
219         _pipes.swap (_pipes.index (pipe_), _active - 1);
220         _active--;
221         _pipes.swap (_active, _eligible - 1);
222         _eligible--;
223         return false;
224     }
225     if (!(msg_->flags () & msg_t::more))
226         pipe_->flush ();
227     return true;
228 }
229 
check_hwm()230 bool zmq::dist_t::check_hwm ()
231 {
232     for (pipes_t::size_type i = 0; i < _matching; ++i)
233         if (!_pipes[i]->check_hwm ())
234             return false;
235 
236     return true;
237 }
238