1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include "dist.hpp"
32 #include "pipe.hpp"
33 #include "err.hpp"
34 #include "msg.hpp"
35 #include "likely.hpp"
36
dist_t()37 zmq::dist_t::dist_t () :
38 _matching (0),
39 _active (0),
40 _eligible (0),
41 _more (false)
42 {
43 }
44
~dist_t()45 zmq::dist_t::~dist_t ()
46 {
47 zmq_assert (_pipes.empty ());
48 }
49
attach(pipe_t * pipe_)50 void zmq::dist_t::attach (pipe_t *pipe_)
51 {
52 // If we are in the middle of sending a message, we'll add new pipe
53 // into the list of eligible pipes. Otherwise we add it to the list
54 // of active pipes.
55 if (_more) {
56 _pipes.push_back (pipe_);
57 _pipes.swap (_eligible, _pipes.size () - 1);
58 _eligible++;
59 } else {
60 _pipes.push_back (pipe_);
61 _pipes.swap (_active, _pipes.size () - 1);
62 _active++;
63 _eligible++;
64 }
65 }
66
match(pipe_t * pipe_)67 void zmq::dist_t::match (pipe_t *pipe_)
68 {
69 // If pipe is already matching do nothing.
70 if (_pipes.index (pipe_) < _matching)
71 return;
72
73 // If the pipe isn't eligible, ignore it.
74 if (_pipes.index (pipe_) >= _eligible)
75 return;
76
77 // Mark the pipe as matching.
78 _pipes.swap (_pipes.index (pipe_), _matching);
79 _matching++;
80 }
81
reverse_match()82 void zmq::dist_t::reverse_match ()
83 {
84 const pipes_t::size_type prev_matching = _matching;
85
86 // Reset matching to 0
87 unmatch ();
88
89 // Mark all matching pipes as not matching and vice-versa.
90 // To do this, push all pipes that are eligible but not
91 // matched - i.e. between "matching" and "eligible" -
92 // to the beginning of the queue.
93 for (pipes_t::size_type i = prev_matching; i < _eligible; ++i) {
94 _pipes.swap (i, _matching++);
95 }
96 }
97
unmatch()98 void zmq::dist_t::unmatch ()
99 {
100 _matching = 0;
101 }
102
pipe_terminated(pipe_t * pipe_)103 void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
104 {
105 // Remove the pipe from the list; adjust number of matching, active and/or
106 // eligible pipes accordingly.
107 if (_pipes.index (pipe_) < _matching) {
108 _pipes.swap (_pipes.index (pipe_), _matching - 1);
109 _matching--;
110 }
111 if (_pipes.index (pipe_) < _active) {
112 _pipes.swap (_pipes.index (pipe_), _active - 1);
113 _active--;
114 }
115 if (_pipes.index (pipe_) < _eligible) {
116 _pipes.swap (_pipes.index (pipe_), _eligible - 1);
117 _eligible--;
118 }
119
120 _pipes.erase (pipe_);
121 }
122
activated(pipe_t * pipe_)123 void zmq::dist_t::activated (pipe_t *pipe_)
124 {
125 // Move the pipe from passive to eligible state.
126 if (_eligible < _pipes.size ()) {
127 _pipes.swap (_pipes.index (pipe_), _eligible);
128 _eligible++;
129 }
130
131 // If there's no message being sent at the moment, move it to
132 // the active state.
133 if (!_more && _active < _pipes.size ()) {
134 _pipes.swap (_eligible - 1, _active);
135 _active++;
136 }
137 }
138
send_to_all(msg_t * msg_)139 int zmq::dist_t::send_to_all (msg_t *msg_)
140 {
141 _matching = _active;
142 return send_to_matching (msg_);
143 }
144
send_to_matching(msg_t * msg_)145 int zmq::dist_t::send_to_matching (msg_t *msg_)
146 {
147 // Is this end of a multipart message?
148 const bool msg_more = (msg_->flags () & msg_t::more) != 0;
149
150 // Push the message to matching pipes.
151 distribute (msg_);
152
153 // If multipart message is fully sent, activate all the eligible pipes.
154 if (!msg_more)
155 _active = _eligible;
156
157 _more = msg_more;
158
159 return 0;
160 }
161
distribute(msg_t * msg_)162 void zmq::dist_t::distribute (msg_t *msg_)
163 {
164 // If there are no matching pipes available, simply drop the message.
165 if (_matching == 0) {
166 int rc = msg_->close ();
167 errno_assert (rc == 0);
168 rc = msg_->init ();
169 errno_assert (rc == 0);
170 return;
171 }
172
173 if (msg_->is_vsm ()) {
174 for (pipes_t::size_type i = 0; i < _matching;) {
175 if (!write (_pipes[i], msg_)) {
176 // Use same index again because entry will have been removed.
177 } else {
178 ++i;
179 }
180 }
181 int rc = msg_->init ();
182 errno_assert (rc == 0);
183 return;
184 }
185
186 // Add matching-1 references to the message. We already hold one reference,
187 // that's why -1.
188 msg_->add_refs (static_cast<int> (_matching) - 1);
189
190 // Push copy of the message to each matching pipe.
191 int failed = 0;
192 for (pipes_t::size_type i = 0; i < _matching;) {
193 if (!write (_pipes[i], msg_)) {
194 ++failed;
195 // Use same index again because entry will have been removed.
196 } else {
197 ++i;
198 }
199 }
200 if (unlikely (failed))
201 msg_->rm_refs (failed);
202
203 // Detach the original message from the data buffer. Note that we don't
204 // close the message. That's because we've already used all the references.
205 const int rc = msg_->init ();
206 errno_assert (rc == 0);
207 }
208
has_out()209 bool zmq::dist_t::has_out ()
210 {
211 return true;
212 }
213
write(pipe_t * pipe_,msg_t * msg_)214 bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
215 {
216 if (!pipe_->write (msg_)) {
217 _pipes.swap (_pipes.index (pipe_), _matching - 1);
218 _matching--;
219 _pipes.swap (_pipes.index (pipe_), _active - 1);
220 _active--;
221 _pipes.swap (_active, _eligible - 1);
222 _eligible--;
223 return false;
224 }
225 if (!(msg_->flags () & msg_t::more))
226 pipe_->flush ();
227 return true;
228 }
229
check_hwm()230 bool zmq::dist_t::check_hwm ()
231 {
232 for (pipes_t::size_type i = 0; i < _matching; ++i)
233 if (!_pipes[i]->check_hwm ())
234 return false;
235
236 return true;
237 }
238