1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #include "precompiled.hpp"
31 #include <new>
32 #include <stddef.h>
33 
34 #include "macros.hpp"
35 #include "pipe.hpp"
36 #include "err.hpp"
37 
38 #include "ypipe.hpp"
39 #include "ypipe_conflate.hpp"
40 
pipepair(object_t * parents_[2],pipe_t * pipes_[2],const int hwms_[2],const bool conflate_[2])41 int zmq::pipepair (object_t *parents_[2],
42                    pipe_t *pipes_[2],
43                    const int hwms_[2],
44                    const bool conflate_[2])
45 {
46     //   Creates two pipe objects. These objects are connected by two ypipes,
47     //   each to pass messages in one direction.
48 
49     typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
50     typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
51 
52     pipe_t::upipe_t *upipe1;
53     if (conflate_[0])
54         upipe1 = new (std::nothrow) upipe_conflate_t ();
55     else
56         upipe1 = new (std::nothrow) upipe_normal_t ();
57     alloc_assert (upipe1);
58 
59     pipe_t::upipe_t *upipe2;
60     if (conflate_[1])
61         upipe2 = new (std::nothrow) upipe_conflate_t ();
62     else
63         upipe2 = new (std::nothrow) upipe_normal_t ();
64     alloc_assert (upipe2);
65 
66     pipes_[0] = new (std::nothrow)
67       pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]);
68     alloc_assert (pipes_[0]);
69     pipes_[1] = new (std::nothrow)
70       pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]);
71     alloc_assert (pipes_[1]);
72 
73     pipes_[0]->set_peer (pipes_[1]);
74     pipes_[1]->set_peer (pipes_[0]);
75 
76     return 0;
77 }
78 
send_routing_id(pipe_t * pipe_,const options_t & options_)79 void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_)
80 {
81     zmq::msg_t id;
82     const int rc = id.init_size (options_.routing_id_size);
83     errno_assert (rc == 0);
84     memcpy (id.data (), options_.routing_id, options_.routing_id_size);
85     id.set_flags (zmq::msg_t::routing_id);
86     const bool written = pipe_->write (&id);
87     zmq_assert (written);
88     pipe_->flush ();
89 }
90 
send_hello_msg(pipe_t * pipe_,const options_t & options_)91 void zmq::send_hello_msg (pipe_t *pipe_, const options_t &options_)
92 {
93     zmq::msg_t hello;
94     const int rc =
95       hello.init_buffer (&options_.hello_msg[0], options_.hello_msg.size ());
96     errno_assert (rc == 0);
97     const bool written = pipe_->write (&hello);
98     zmq_assert (written);
99     pipe_->flush ();
100 }
101 
pipe_t(object_t * parent_,upipe_t * inpipe_,upipe_t * outpipe_,int inhwm_,int outhwm_,bool conflate_)102 zmq::pipe_t::pipe_t (object_t *parent_,
103                      upipe_t *inpipe_,
104                      upipe_t *outpipe_,
105                      int inhwm_,
106                      int outhwm_,
107                      bool conflate_) :
108     object_t (parent_),
109     _in_pipe (inpipe_),
110     _out_pipe (outpipe_),
111     _in_active (true),
112     _out_active (true),
113     _hwm (outhwm_),
114     _lwm (compute_lwm (inhwm_)),
115     _in_hwm_boost (-1),
116     _out_hwm_boost (-1),
117     _msgs_read (0),
118     _msgs_written (0),
119     _peers_msgs_read (0),
120     _peer (NULL),
121     _sink (NULL),
122     _state (active),
123     _delay (true),
124     _server_socket_routing_id (0),
125     _conflate (conflate_)
126 {
127     _disconnect_msg.init ();
128 }
129 
~pipe_t()130 zmq::pipe_t::~pipe_t ()
131 {
132     _disconnect_msg.close ();
133 }
134 
set_peer(pipe_t * peer_)135 void zmq::pipe_t::set_peer (pipe_t *peer_)
136 {
137     //  Peer can be set once only.
138     zmq_assert (!_peer);
139     _peer = peer_;
140 }
141 
set_event_sink(i_pipe_events * sink_)142 void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
143 {
144     // Sink can be set once only.
145     zmq_assert (!_sink);
146     _sink = sink_;
147 }
148 
set_server_socket_routing_id(uint32_t server_socket_routing_id_)149 void zmq::pipe_t::set_server_socket_routing_id (
150   uint32_t server_socket_routing_id_)
151 {
152     _server_socket_routing_id = server_socket_routing_id_;
153 }
154 
get_server_socket_routing_id() const155 uint32_t zmq::pipe_t::get_server_socket_routing_id () const
156 {
157     return _server_socket_routing_id;
158 }
159 
set_router_socket_routing_id(const blob_t & router_socket_routing_id_)160 void zmq::pipe_t::set_router_socket_routing_id (
161   const blob_t &router_socket_routing_id_)
162 {
163     _router_socket_routing_id.set_deep_copy (router_socket_routing_id_);
164 }
165 
get_routing_id() const166 const zmq::blob_t &zmq::pipe_t::get_routing_id () const
167 {
168     return _router_socket_routing_id;
169 }
170 
check_read()171 bool zmq::pipe_t::check_read ()
172 {
173     if (unlikely (!_in_active))
174         return false;
175     if (unlikely (_state != active && _state != waiting_for_delimiter))
176         return false;
177 
178     //  Check if there's an item in the pipe.
179     if (!_in_pipe->check_read ()) {
180         _in_active = false;
181         return false;
182     }
183 
184     //  If the next item in the pipe is message delimiter,
185     //  initiate termination process.
186     if (_in_pipe->probe (is_delimiter)) {
187         msg_t msg;
188         const bool ok = _in_pipe->read (&msg);
189         zmq_assert (ok);
190         process_delimiter ();
191         return false;
192     }
193 
194     return true;
195 }
196 
read(msg_t * msg_)197 bool zmq::pipe_t::read (msg_t *msg_)
198 {
199     if (unlikely (!_in_active))
200         return false;
201     if (unlikely (_state != active && _state != waiting_for_delimiter))
202         return false;
203 
204     while (true) {
205         if (!_in_pipe->read (msg_)) {
206             _in_active = false;
207             return false;
208         }
209 
210         //  If this is a credential, ignore it and receive next message.
211         if (unlikely (msg_->is_credential ())) {
212             const int rc = msg_->close ();
213             zmq_assert (rc == 0);
214         } else {
215             break;
216         }
217     }
218 
219     //  If delimiter was read, start termination process of the pipe.
220     if (msg_->is_delimiter ()) {
221         process_delimiter ();
222         return false;
223     }
224 
225     if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
226         _msgs_read++;
227 
228     if (_lwm > 0 && _msgs_read % _lwm == 0)
229         send_activate_write (_peer, _msgs_read);
230 
231     return true;
232 }
233 
check_write()234 bool zmq::pipe_t::check_write ()
235 {
236     if (unlikely (!_out_active || _state != active))
237         return false;
238 
239     const bool full = !check_hwm ();
240 
241     if (unlikely (full)) {
242         _out_active = false;
243         return false;
244     }
245 
246     return true;
247 }
248 
write(const msg_t * msg_)249 bool zmq::pipe_t::write (const msg_t *msg_)
250 {
251     if (unlikely (!check_write ()))
252         return false;
253 
254     const bool more = (msg_->flags () & msg_t::more) != 0;
255     const bool is_routing_id = msg_->is_routing_id ();
256     _out_pipe->write (*msg_, more);
257     if (!more && !is_routing_id)
258         _msgs_written++;
259 
260     return true;
261 }
262 
rollback() const263 void zmq::pipe_t::rollback () const
264 {
265     //  Remove incomplete message from the outbound pipe.
266     msg_t msg;
267     if (_out_pipe) {
268         while (_out_pipe->unwrite (&msg)) {
269             zmq_assert (msg.flags () & msg_t::more);
270             const int rc = msg.close ();
271             errno_assert (rc == 0);
272         }
273     }
274 }
275 
flush()276 void zmq::pipe_t::flush ()
277 {
278     //  The peer does not exist anymore at this point.
279     if (_state == term_ack_sent)
280         return;
281 
282     if (_out_pipe && !_out_pipe->flush ())
283         send_activate_read (_peer);
284 }
285 
process_activate_read()286 void zmq::pipe_t::process_activate_read ()
287 {
288     if (!_in_active && (_state == active || _state == waiting_for_delimiter)) {
289         _in_active = true;
290         _sink->read_activated (this);
291     }
292 }
293 
process_activate_write(uint64_t msgs_read_)294 void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
295 {
296     //  Remember the peer's message sequence number.
297     _peers_msgs_read = msgs_read_;
298 
299     if (!_out_active && _state == active) {
300         _out_active = true;
301         _sink->write_activated (this);
302     }
303 }
304 
process_hiccup(void * pipe_)305 void zmq::pipe_t::process_hiccup (void *pipe_)
306 {
307     //  Destroy old outpipe. Note that the read end of the pipe was already
308     //  migrated to this thread.
309     zmq_assert (_out_pipe);
310     _out_pipe->flush ();
311     msg_t msg;
312     while (_out_pipe->read (&msg)) {
313         if (!(msg.flags () & msg_t::more))
314             _msgs_written--;
315         const int rc = msg.close ();
316         errno_assert (rc == 0);
317     }
318     LIBZMQ_DELETE (_out_pipe);
319 
320     //  Plug in the new outpipe.
321     zmq_assert (pipe_);
322     _out_pipe = static_cast<upipe_t *> (pipe_);
323     _out_active = true;
324 
325     //  If appropriate, notify the user about the hiccup.
326     if (_state == active)
327         _sink->hiccuped (this);
328 }
329 
process_pipe_term()330 void zmq::pipe_t::process_pipe_term ()
331 {
332     zmq_assert (_state == active || _state == delimiter_received
333                 || _state == term_req_sent1);
334 
335     //  This is the simple case of peer-induced termination. If there are no
336     //  more pending messages to read, or if the pipe was configured to drop
337     //  pending messages, we can move directly to the term_ack_sent state.
338     //  Otherwise we'll hang up in waiting_for_delimiter state till all
339     //  pending messages are read.
340     if (_state == active) {
341         if (_delay)
342             _state = waiting_for_delimiter;
343         else {
344             _state = term_ack_sent;
345             _out_pipe = NULL;
346             send_pipe_term_ack (_peer);
347         }
348     }
349 
350     //  Delimiter happened to arrive before the term command. Now we have the
351     //  term command as well, so we can move straight to term_ack_sent state.
352     else if (_state == delimiter_received) {
353         _state = term_ack_sent;
354         _out_pipe = NULL;
355         send_pipe_term_ack (_peer);
356     }
357 
358     //  This is the case where both ends of the pipe are closed in parallel.
359     //  We simply reply to the request by ack and continue waiting for our
360     //  own ack.
361     else if (_state == term_req_sent1) {
362         _state = term_req_sent2;
363         _out_pipe = NULL;
364         send_pipe_term_ack (_peer);
365     }
366 }
367 
process_pipe_term_ack()368 void zmq::pipe_t::process_pipe_term_ack ()
369 {
370     //  Notify the user that all the references to the pipe should be dropped.
371     zmq_assert (_sink);
372     _sink->pipe_terminated (this);
373 
374     //  In term_ack_sent and term_req_sent2 states there's nothing to do.
375     //  Simply deallocate the pipe. In term_req_sent1 state we have to ack
376     //  the peer before deallocating this side of the pipe.
377     //  All the other states are invalid.
378     if (_state == term_req_sent1) {
379         _out_pipe = NULL;
380         send_pipe_term_ack (_peer);
381     } else
382         zmq_assert (_state == term_ack_sent || _state == term_req_sent2);
383 
384     //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
385     //  pipe (which is an inbound pipe from its point of view).
386     //  First, delete all the unread messages in the pipe. We have to do it by
387     //  hand because msg_t doesn't have automatic destructor. Then deallocate
388     //  the ypipe itself.
389 
390     if (!_conflate) {
391         msg_t msg;
392         while (_in_pipe->read (&msg)) {
393             const int rc = msg.close ();
394             errno_assert (rc == 0);
395         }
396     }
397 
398     LIBZMQ_DELETE (_in_pipe);
399 
400     //  Deallocate the pipe object
401     delete this;
402 }
403 
process_pipe_hwm(int inhwm_,int outhwm_)404 void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_)
405 {
406     set_hwms (inhwm_, outhwm_);
407 }
408 
set_nodelay()409 void zmq::pipe_t::set_nodelay ()
410 {
411     this->_delay = false;
412 }
413 
terminate(bool delay_)414 void zmq::pipe_t::terminate (bool delay_)
415 {
416     //  Overload the value specified at pipe creation.
417     _delay = delay_;
418 
419     //  If terminate was already called, we can ignore the duplicate invocation.
420     if (_state == term_req_sent1 || _state == term_req_sent2) {
421         return;
422     }
423     //  If the pipe is in the final phase of async termination, it's going to
424     //  closed anyway. No need to do anything special here.
425     if (_state == term_ack_sent) {
426         return;
427     }
428     //  The simple sync termination case. Ask the peer to terminate and wait
429     //  for the ack.
430     if (_state == active) {
431         send_pipe_term (_peer);
432         _state = term_req_sent1;
433     }
434     //  There are still pending messages available, but the user calls
435     //  'terminate'. We can act as if all the pending messages were read.
436     else if (_state == waiting_for_delimiter && !_delay) {
437         //  Drop any unfinished outbound messages.
438         rollback ();
439         _out_pipe = NULL;
440         send_pipe_term_ack (_peer);
441         _state = term_ack_sent;
442     }
443     //  If there are pending messages still available, do nothing.
444     else if (_state == waiting_for_delimiter) {
445     }
446     //  We've already got delimiter, but not term command yet. We can ignore
447     //  the delimiter and ack synchronously terminate as if we were in
448     //  active state.
449     else if (_state == delimiter_received) {
450         send_pipe_term (_peer);
451         _state = term_req_sent1;
452     }
453     //  There are no other states.
454     else {
455         zmq_assert (false);
456     }
457 
458     //  Stop outbound flow of messages.
459     _out_active = false;
460 
461     if (_out_pipe) {
462         //  Drop any unfinished outbound messages.
463         rollback ();
464 
465         //  Write the delimiter into the pipe. Note that watermarks are not
466         //  checked; thus the delimiter can be written even when the pipe is full.
467         msg_t msg;
468         msg.init_delimiter ();
469         _out_pipe->write (msg, false);
470         flush ();
471     }
472 }
473 
is_delimiter(const msg_t & msg_)474 bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
475 {
476     return msg_.is_delimiter ();
477 }
478 
compute_lwm(int hwm_)479 int zmq::pipe_t::compute_lwm (int hwm_)
480 {
481     //  Compute the low water mark. Following point should be taken
482     //  into consideration:
483     //
484     //  1. LWM has to be less than HWM.
485     //  2. LWM cannot be set to very low value (such as zero) as after filling
486     //     the queue it would start to refill only after all the messages are
487     //     read from it and thus unnecessarily hold the progress back.
488     //  3. LWM cannot be set to very high value (such as HWM-1) as it would
489     //     result in lock-step filling of the queue - if a single message is
490     //     read from a full queue, writer thread is resumed to write exactly one
491     //     message to the queue and go back to sleep immediately. This would
492     //     result in low performance.
493     //
494     //  Given the 3. it would be good to keep HWM and LWM as far apart as
495     //  possible to reduce the thread switching overhead to almost zero.
496     //  Let's make LWM 1/2 of HWM.
497     const int result = (hwm_ + 1) / 2;
498 
499     return result;
500 }
501 
process_delimiter()502 void zmq::pipe_t::process_delimiter ()
503 {
504     zmq_assert (_state == active || _state == waiting_for_delimiter);
505 
506     if (_state == active)
507         _state = delimiter_received;
508     else {
509         rollback ();
510         _out_pipe = NULL;
511         send_pipe_term_ack (_peer);
512         _state = term_ack_sent;
513     }
514 }
515 
hiccup()516 void zmq::pipe_t::hiccup ()
517 {
518     //  If termination is already under way do nothing.
519     if (_state != active)
520         return;
521 
522     //  We'll drop the pointer to the inpipe. From now on, the peer is
523     //  responsible for deallocating it.
524 
525     //  Create new inpipe.
526     _in_pipe =
527       _conflate
528         ? static_cast<upipe_t *> (new (std::nothrow) ypipe_conflate_t<msg_t> ())
529         : new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> ();
530 
531     alloc_assert (_in_pipe);
532     _in_active = true;
533 
534     //  Notify the peer about the hiccup.
535     send_hiccup (_peer, _in_pipe);
536 }
537 
set_hwms(int inhwm_,int outhwm_)538 void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
539 {
540     int in = inhwm_ + std::max (_in_hwm_boost, 0);
541     int out = outhwm_ + std::max (_out_hwm_boost, 0);
542 
543     // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
544     if (inhwm_ <= 0 || _in_hwm_boost == 0)
545         in = 0;
546 
547     if (outhwm_ <= 0 || _out_hwm_boost == 0)
548         out = 0;
549 
550     _lwm = compute_lwm (in);
551     _hwm = out;
552 }
553 
set_hwms_boost(int inhwmboost_,int outhwmboost_)554 void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_)
555 {
556     _in_hwm_boost = inhwmboost_;
557     _out_hwm_boost = outhwmboost_;
558 }
559 
check_hwm() const560 bool zmq::pipe_t::check_hwm () const
561 {
562     const bool full =
563       _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm);
564     return !full;
565 }
566 
send_hwms_to_peer(int inhwm_,int outhwm_)567 void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_)
568 {
569     send_pipe_hwm (_peer, inhwm_, outhwm_);
570 }
571 
set_endpoint_pair(zmq::endpoint_uri_pair_t endpoint_pair_)572 void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_)
573 {
574     _endpoint_pair = ZMQ_MOVE (endpoint_pair_);
575 }
576 
get_endpoint_pair() const577 const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
578 {
579     return _endpoint_pair;
580 }
581 
send_stats_to_peer(own_t * socket_base_)582 void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
583 {
584     endpoint_uri_pair_t *ep =
585       new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
586     send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_,
587                           ep);
588 }
589 
process_pipe_peer_stats(uint64_t queue_count_,own_t * socket_base_,endpoint_uri_pair_t * endpoint_pair_)590 void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
591                                            own_t *socket_base_,
592                                            endpoint_uri_pair_t *endpoint_pair_)
593 {
594     send_pipe_stats_publish (socket_base_, queue_count_,
595                              _msgs_written - _peers_msgs_read, endpoint_pair_);
596 }
597 
send_disconnect_msg()598 void zmq::pipe_t::send_disconnect_msg ()
599 {
600     if (_disconnect_msg.size () > 0) {
601         // Rollback any incomplete message in the pipe, and push the disconnect message.
602         rollback ();
603 
604         _out_pipe->write (_disconnect_msg, false);
605         flush ();
606         _disconnect_msg.init ();
607     }
608 }
609 
set_disconnect_msg(const std::vector<unsigned char> & disconnect_)610 void zmq::pipe_t::set_disconnect_msg (
611   const std::vector<unsigned char> &disconnect_)
612 {
613     _disconnect_msg.close ();
614     const int rc =
615       _disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
616     errno_assert (rc == 0);
617 }
618