1 /*
2     Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include "lb.hpp"
32 #include "pipe.hpp"
33 #include "err.hpp"
34 #include "msg.hpp"
35 
lb_t()36 zmq::lb_t::lb_t () : _active (0), _current (0), _more (false), _dropping (false)
37 {
38 }
39 
~lb_t()40 zmq::lb_t::~lb_t ()
41 {
42     zmq_assert (_pipes.empty ());
43 }
44 
attach(pipe_t * pipe_)45 void zmq::lb_t::attach (pipe_t *pipe_)
46 {
47     _pipes.push_back (pipe_);
48     activated (pipe_);
49 }
50 
pipe_terminated(pipe_t * pipe_)51 void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
52 {
53     const pipes_t::size_type index = _pipes.index (pipe_);
54 
55     //  If we are in the middle of multipart message and current pipe
56     //  have disconnected, we have to drop the remainder of the message.
57     if (index == _current && _more)
58         _dropping = true;
59 
60     //  Remove the pipe from the list; adjust number of active pipes
61     //  accordingly.
62     if (index < _active) {
63         _active--;
64         _pipes.swap (index, _active);
65         if (_current == _active)
66             _current = 0;
67     }
68     _pipes.erase (pipe_);
69 }
70 
activated(pipe_t * pipe_)71 void zmq::lb_t::activated (pipe_t *pipe_)
72 {
73     //  Move the pipe to the list of active pipes.
74     _pipes.swap (_pipes.index (pipe_), _active);
75     _active++;
76 }
77 
send(msg_t * msg_)78 int zmq::lb_t::send (msg_t *msg_)
79 {
80     return sendpipe (msg_, NULL);
81 }
82 
sendpipe(msg_t * msg_,pipe_t ** pipe_)83 int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
84 {
85     //  Drop the message if required. If we are at the end of the message
86     //  switch back to non-dropping mode.
87     if (_dropping) {
88         _more = (msg_->flags () & msg_t::more) != 0;
89         _dropping = _more;
90 
91         int rc = msg_->close ();
92         errno_assert (rc == 0);
93         rc = msg_->init ();
94         errno_assert (rc == 0);
95         return 0;
96     }
97 
98     while (_active > 0) {
99         if (_pipes[_current]->write (msg_)) {
100             if (pipe_)
101                 *pipe_ = _pipes[_current];
102             break;
103         }
104 
105         // If send fails for multi-part msg rollback other
106         // parts sent earlier and return EAGAIN.
107         // Application should handle this as suitable
108         if (_more) {
109             _pipes[_current]->rollback ();
110             // At this point the pipe is already being deallocated
111             // and the first N frames are unreachable (_outpipe is
112             // most likely already NULL so rollback won't actually do
113             // anything and they can't be un-written to deliver later).
114             // Return EFAULT to socket_base caller to drop current message
115             // and any other subsequent frames to avoid them being
116             // "stuck" and received when a new client reconnects, which
117             // would break atomicity of multi-part messages (in blocking mode
118             // socket_base just tries again and again to send the same message)
119             // Note that given dropping mode returns 0, the user will
120             // never know that the message could not be delivered, but
121             // can't really fix it without breaking backward compatibility.
122             // -2/EAGAIN will make sure socket_base caller does not re-enter
123             // immediately or after a short sleep in blocking mode.
124             _dropping = (msg_->flags () & msg_t::more) != 0;
125             _more = false;
126             errno = EAGAIN;
127             return -2;
128         }
129 
130         _active--;
131         if (_current < _active)
132             _pipes.swap (_current, _active);
133         else
134             _current = 0;
135     }
136 
137     //  If there are no pipes we cannot send the message.
138     if (_active == 0) {
139         errno = EAGAIN;
140         return -1;
141     }
142 
143     //  If it's final part of the message we can flush it downstream and
144     //  continue round-robining (load balance).
145     _more = (msg_->flags () & msg_t::more) != 0;
146     if (!_more) {
147         _pipes[_current]->flush ();
148 
149         if (++_current >= _active)
150             _current = 0;
151     }
152 
153     //  Detach the message from the data buffer.
154     const int rc = msg_->init ();
155     errno_assert (rc == 0);
156 
157     return 0;
158 }
159 
has_out()160 bool zmq::lb_t::has_out ()
161 {
162     //  If one part of the message was already written we can definitely
163     //  write the rest of the message.
164     if (_more)
165         return true;
166 
167     while (_active > 0) {
168         //  Check whether a pipe has room for another message.
169         if (_pipes[_current]->check_write ())
170             return true;
171 
172         //  Deactivate the pipe.
173         _active--;
174         _pipes.swap (_current, _active);
175         if (_current == _active)
176             _current = 0;
177     }
178 
179     return false;
180 }
181