1 /*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28 */
29
30 #include "precompiled.hpp"
31 #include <new>
32 #include <stddef.h>
33
34 #include "macros.hpp"
35 #include "pipe.hpp"
36 #include "err.hpp"
37
38 #include "ypipe.hpp"
39 #include "ypipe_conflate.hpp"
40
pipepair(object_t * parents_[2],pipe_t * pipes_[2],const int hwms_[2],const bool conflate_[2])41 int zmq::pipepair (object_t *parents_[2],
42 pipe_t *pipes_[2],
43 const int hwms_[2],
44 const bool conflate_[2])
45 {
46 // Creates two pipe objects. These objects are connected by two ypipes,
47 // each to pass messages in one direction.
48
49 typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t;
50 typedef ypipe_conflate_t<msg_t> upipe_conflate_t;
51
52 pipe_t::upipe_t *upipe1;
53 if (conflate_[0])
54 upipe1 = new (std::nothrow) upipe_conflate_t ();
55 else
56 upipe1 = new (std::nothrow) upipe_normal_t ();
57 alloc_assert (upipe1);
58
59 pipe_t::upipe_t *upipe2;
60 if (conflate_[1])
61 upipe2 = new (std::nothrow) upipe_conflate_t ();
62 else
63 upipe2 = new (std::nothrow) upipe_normal_t ();
64 alloc_assert (upipe2);
65
66 pipes_[0] = new (std::nothrow)
67 pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]);
68 alloc_assert (pipes_[0]);
69 pipes_[1] = new (std::nothrow)
70 pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]);
71 alloc_assert (pipes_[1]);
72
73 pipes_[0]->set_peer (pipes_[1]);
74 pipes_[1]->set_peer (pipes_[0]);
75
76 return 0;
77 }
78
send_routing_id(pipe_t * pipe_,const options_t & options_)79 void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_)
80 {
81 zmq::msg_t id;
82 const int rc = id.init_size (options_.routing_id_size);
83 errno_assert (rc == 0);
84 memcpy (id.data (), options_.routing_id, options_.routing_id_size);
85 id.set_flags (zmq::msg_t::routing_id);
86 const bool written = pipe_->write (&id);
87 zmq_assert (written);
88 pipe_->flush ();
89 }
90
send_hello_msg(pipe_t * pipe_,const options_t & options_)91 void zmq::send_hello_msg (pipe_t *pipe_, const options_t &options_)
92 {
93 zmq::msg_t hello;
94 const int rc =
95 hello.init_buffer (&options_.hello_msg[0], options_.hello_msg.size ());
96 errno_assert (rc == 0);
97 const bool written = pipe_->write (&hello);
98 zmq_assert (written);
99 pipe_->flush ();
100 }
101
pipe_t(object_t * parent_,upipe_t * inpipe_,upipe_t * outpipe_,int inhwm_,int outhwm_,bool conflate_)102 zmq::pipe_t::pipe_t (object_t *parent_,
103 upipe_t *inpipe_,
104 upipe_t *outpipe_,
105 int inhwm_,
106 int outhwm_,
107 bool conflate_) :
108 object_t (parent_),
109 _in_pipe (inpipe_),
110 _out_pipe (outpipe_),
111 _in_active (true),
112 _out_active (true),
113 _hwm (outhwm_),
114 _lwm (compute_lwm (inhwm_)),
115 _in_hwm_boost (-1),
116 _out_hwm_boost (-1),
117 _msgs_read (0),
118 _msgs_written (0),
119 _peers_msgs_read (0),
120 _peer (NULL),
121 _sink (NULL),
122 _state (active),
123 _delay (true),
124 _server_socket_routing_id (0),
125 _conflate (conflate_)
126 {
127 _disconnect_msg.init ();
128 }
129
~pipe_t()130 zmq::pipe_t::~pipe_t ()
131 {
132 _disconnect_msg.close ();
133 }
134
set_peer(pipe_t * peer_)135 void zmq::pipe_t::set_peer (pipe_t *peer_)
136 {
137 // Peer can be set once only.
138 zmq_assert (!_peer);
139 _peer = peer_;
140 }
141
set_event_sink(i_pipe_events * sink_)142 void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
143 {
144 // Sink can be set once only.
145 zmq_assert (!_sink);
146 _sink = sink_;
147 }
148
set_server_socket_routing_id(uint32_t server_socket_routing_id_)149 void zmq::pipe_t::set_server_socket_routing_id (
150 uint32_t server_socket_routing_id_)
151 {
152 _server_socket_routing_id = server_socket_routing_id_;
153 }
154
get_server_socket_routing_id() const155 uint32_t zmq::pipe_t::get_server_socket_routing_id () const
156 {
157 return _server_socket_routing_id;
158 }
159
set_router_socket_routing_id(const blob_t & router_socket_routing_id_)160 void zmq::pipe_t::set_router_socket_routing_id (
161 const blob_t &router_socket_routing_id_)
162 {
163 _router_socket_routing_id.set_deep_copy (router_socket_routing_id_);
164 }
165
get_routing_id() const166 const zmq::blob_t &zmq::pipe_t::get_routing_id () const
167 {
168 return _router_socket_routing_id;
169 }
170
check_read()171 bool zmq::pipe_t::check_read ()
172 {
173 if (unlikely (!_in_active))
174 return false;
175 if (unlikely (_state != active && _state != waiting_for_delimiter))
176 return false;
177
178 // Check if there's an item in the pipe.
179 if (!_in_pipe->check_read ()) {
180 _in_active = false;
181 return false;
182 }
183
184 // If the next item in the pipe is message delimiter,
185 // initiate termination process.
186 if (_in_pipe->probe (is_delimiter)) {
187 msg_t msg;
188 const bool ok = _in_pipe->read (&msg);
189 zmq_assert (ok);
190 process_delimiter ();
191 return false;
192 }
193
194 return true;
195 }
196
read(msg_t * msg_)197 bool zmq::pipe_t::read (msg_t *msg_)
198 {
199 if (unlikely (!_in_active))
200 return false;
201 if (unlikely (_state != active && _state != waiting_for_delimiter))
202 return false;
203
204 while (true) {
205 if (!_in_pipe->read (msg_)) {
206 _in_active = false;
207 return false;
208 }
209
210 // If this is a credential, ignore it and receive next message.
211 if (unlikely (msg_->is_credential ())) {
212 const int rc = msg_->close ();
213 zmq_assert (rc == 0);
214 } else {
215 break;
216 }
217 }
218
219 // If delimiter was read, start termination process of the pipe.
220 if (msg_->is_delimiter ()) {
221 process_delimiter ();
222 return false;
223 }
224
225 if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
226 _msgs_read++;
227
228 if (_lwm > 0 && _msgs_read % _lwm == 0)
229 send_activate_write (_peer, _msgs_read);
230
231 return true;
232 }
233
check_write()234 bool zmq::pipe_t::check_write ()
235 {
236 if (unlikely (!_out_active || _state != active))
237 return false;
238
239 const bool full = !check_hwm ();
240
241 if (unlikely (full)) {
242 _out_active = false;
243 return false;
244 }
245
246 return true;
247 }
248
write(const msg_t * msg_)249 bool zmq::pipe_t::write (const msg_t *msg_)
250 {
251 if (unlikely (!check_write ()))
252 return false;
253
254 const bool more = (msg_->flags () & msg_t::more) != 0;
255 const bool is_routing_id = msg_->is_routing_id ();
256 _out_pipe->write (*msg_, more);
257 if (!more && !is_routing_id)
258 _msgs_written++;
259
260 return true;
261 }
262
rollback() const263 void zmq::pipe_t::rollback () const
264 {
265 // Remove incomplete message from the outbound pipe.
266 msg_t msg;
267 if (_out_pipe) {
268 while (_out_pipe->unwrite (&msg)) {
269 zmq_assert (msg.flags () & msg_t::more);
270 const int rc = msg.close ();
271 errno_assert (rc == 0);
272 }
273 }
274 }
275
flush()276 void zmq::pipe_t::flush ()
277 {
278 // The peer does not exist anymore at this point.
279 if (_state == term_ack_sent)
280 return;
281
282 if (_out_pipe && !_out_pipe->flush ())
283 send_activate_read (_peer);
284 }
285
process_activate_read()286 void zmq::pipe_t::process_activate_read ()
287 {
288 if (!_in_active && (_state == active || _state == waiting_for_delimiter)) {
289 _in_active = true;
290 _sink->read_activated (this);
291 }
292 }
293
process_activate_write(uint64_t msgs_read_)294 void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
295 {
296 // Remember the peer's message sequence number.
297 _peers_msgs_read = msgs_read_;
298
299 if (!_out_active && _state == active) {
300 _out_active = true;
301 _sink->write_activated (this);
302 }
303 }
304
process_hiccup(void * pipe_)305 void zmq::pipe_t::process_hiccup (void *pipe_)
306 {
307 // Destroy old outpipe. Note that the read end of the pipe was already
308 // migrated to this thread.
309 zmq_assert (_out_pipe);
310 _out_pipe->flush ();
311 msg_t msg;
312 while (_out_pipe->read (&msg)) {
313 if (!(msg.flags () & msg_t::more))
314 _msgs_written--;
315 const int rc = msg.close ();
316 errno_assert (rc == 0);
317 }
318 LIBZMQ_DELETE (_out_pipe);
319
320 // Plug in the new outpipe.
321 zmq_assert (pipe_);
322 _out_pipe = static_cast<upipe_t *> (pipe_);
323 _out_active = true;
324
325 // If appropriate, notify the user about the hiccup.
326 if (_state == active)
327 _sink->hiccuped (this);
328 }
329
process_pipe_term()330 void zmq::pipe_t::process_pipe_term ()
331 {
332 zmq_assert (_state == active || _state == delimiter_received
333 || _state == term_req_sent1);
334
335 // This is the simple case of peer-induced termination. If there are no
336 // more pending messages to read, or if the pipe was configured to drop
337 // pending messages, we can move directly to the term_ack_sent state.
338 // Otherwise we'll hang up in waiting_for_delimiter state till all
339 // pending messages are read.
340 if (_state == active) {
341 if (_delay)
342 _state = waiting_for_delimiter;
343 else {
344 _state = term_ack_sent;
345 _out_pipe = NULL;
346 send_pipe_term_ack (_peer);
347 }
348 }
349
350 // Delimiter happened to arrive before the term command. Now we have the
351 // term command as well, so we can move straight to term_ack_sent state.
352 else if (_state == delimiter_received) {
353 _state = term_ack_sent;
354 _out_pipe = NULL;
355 send_pipe_term_ack (_peer);
356 }
357
358 // This is the case where both ends of the pipe are closed in parallel.
359 // We simply reply to the request by ack and continue waiting for our
360 // own ack.
361 else if (_state == term_req_sent1) {
362 _state = term_req_sent2;
363 _out_pipe = NULL;
364 send_pipe_term_ack (_peer);
365 }
366 }
367
process_pipe_term_ack()368 void zmq::pipe_t::process_pipe_term_ack ()
369 {
370 // Notify the user that all the references to the pipe should be dropped.
371 zmq_assert (_sink);
372 _sink->pipe_terminated (this);
373
374 // In term_ack_sent and term_req_sent2 states there's nothing to do.
375 // Simply deallocate the pipe. In term_req_sent1 state we have to ack
376 // the peer before deallocating this side of the pipe.
377 // All the other states are invalid.
378 if (_state == term_req_sent1) {
379 _out_pipe = NULL;
380 send_pipe_term_ack (_peer);
381 } else
382 zmq_assert (_state == term_ack_sent || _state == term_req_sent2);
383
384 // We'll deallocate the inbound pipe, the peer will deallocate the outbound
385 // pipe (which is an inbound pipe from its point of view).
386 // First, delete all the unread messages in the pipe. We have to do it by
387 // hand because msg_t doesn't have automatic destructor. Then deallocate
388 // the ypipe itself.
389
390 if (!_conflate) {
391 msg_t msg;
392 while (_in_pipe->read (&msg)) {
393 const int rc = msg.close ();
394 errno_assert (rc == 0);
395 }
396 }
397
398 LIBZMQ_DELETE (_in_pipe);
399
400 // Deallocate the pipe object
401 delete this;
402 }
403
process_pipe_hwm(int inhwm_,int outhwm_)404 void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_)
405 {
406 set_hwms (inhwm_, outhwm_);
407 }
408
set_nodelay()409 void zmq::pipe_t::set_nodelay ()
410 {
411 this->_delay = false;
412 }
413
terminate(bool delay_)414 void zmq::pipe_t::terminate (bool delay_)
415 {
416 // Overload the value specified at pipe creation.
417 _delay = delay_;
418
419 // If terminate was already called, we can ignore the duplicate invocation.
420 if (_state == term_req_sent1 || _state == term_req_sent2) {
421 return;
422 }
423 // If the pipe is in the final phase of async termination, it's going to
424 // closed anyway. No need to do anything special here.
425 if (_state == term_ack_sent) {
426 return;
427 }
428 // The simple sync termination case. Ask the peer to terminate and wait
429 // for the ack.
430 if (_state == active) {
431 send_pipe_term (_peer);
432 _state = term_req_sent1;
433 }
434 // There are still pending messages available, but the user calls
435 // 'terminate'. We can act as if all the pending messages were read.
436 else if (_state == waiting_for_delimiter && !_delay) {
437 // Drop any unfinished outbound messages.
438 rollback ();
439 _out_pipe = NULL;
440 send_pipe_term_ack (_peer);
441 _state = term_ack_sent;
442 }
443 // If there are pending messages still available, do nothing.
444 else if (_state == waiting_for_delimiter) {
445 }
446 // We've already got delimiter, but not term command yet. We can ignore
447 // the delimiter and ack synchronously terminate as if we were in
448 // active state.
449 else if (_state == delimiter_received) {
450 send_pipe_term (_peer);
451 _state = term_req_sent1;
452 }
453 // There are no other states.
454 else {
455 zmq_assert (false);
456 }
457
458 // Stop outbound flow of messages.
459 _out_active = false;
460
461 if (_out_pipe) {
462 // Drop any unfinished outbound messages.
463 rollback ();
464
465 // Write the delimiter into the pipe. Note that watermarks are not
466 // checked; thus the delimiter can be written even when the pipe is full.
467 msg_t msg;
468 msg.init_delimiter ();
469 _out_pipe->write (msg, false);
470 flush ();
471 }
472 }
473
is_delimiter(const msg_t & msg_)474 bool zmq::pipe_t::is_delimiter (const msg_t &msg_)
475 {
476 return msg_.is_delimiter ();
477 }
478
compute_lwm(int hwm_)479 int zmq::pipe_t::compute_lwm (int hwm_)
480 {
481 // Compute the low water mark. Following point should be taken
482 // into consideration:
483 //
484 // 1. LWM has to be less than HWM.
485 // 2. LWM cannot be set to very low value (such as zero) as after filling
486 // the queue it would start to refill only after all the messages are
487 // read from it and thus unnecessarily hold the progress back.
488 // 3. LWM cannot be set to very high value (such as HWM-1) as it would
489 // result in lock-step filling of the queue - if a single message is
490 // read from a full queue, writer thread is resumed to write exactly one
491 // message to the queue and go back to sleep immediately. This would
492 // result in low performance.
493 //
494 // Given the 3. it would be good to keep HWM and LWM as far apart as
495 // possible to reduce the thread switching overhead to almost zero.
496 // Let's make LWM 1/2 of HWM.
497 const int result = (hwm_ + 1) / 2;
498
499 return result;
500 }
501
process_delimiter()502 void zmq::pipe_t::process_delimiter ()
503 {
504 zmq_assert (_state == active || _state == waiting_for_delimiter);
505
506 if (_state == active)
507 _state = delimiter_received;
508 else {
509 rollback ();
510 _out_pipe = NULL;
511 send_pipe_term_ack (_peer);
512 _state = term_ack_sent;
513 }
514 }
515
hiccup()516 void zmq::pipe_t::hiccup ()
517 {
518 // If termination is already under way do nothing.
519 if (_state != active)
520 return;
521
522 // We'll drop the pointer to the inpipe. From now on, the peer is
523 // responsible for deallocating it.
524
525 // Create new inpipe.
526 _in_pipe =
527 _conflate
528 ? static_cast<upipe_t *> (new (std::nothrow) ypipe_conflate_t<msg_t> ())
529 : new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> ();
530
531 alloc_assert (_in_pipe);
532 _in_active = true;
533
534 // Notify the peer about the hiccup.
535 send_hiccup (_peer, _in_pipe);
536 }
537
set_hwms(int inhwm_,int outhwm_)538 void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
539 {
540 int in = inhwm_ + std::max (_in_hwm_boost, 0);
541 int out = outhwm_ + std::max (_out_hwm_boost, 0);
542
543 // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
544 if (inhwm_ <= 0 || _in_hwm_boost == 0)
545 in = 0;
546
547 if (outhwm_ <= 0 || _out_hwm_boost == 0)
548 out = 0;
549
550 _lwm = compute_lwm (in);
551 _hwm = out;
552 }
553
set_hwms_boost(int inhwmboost_,int outhwmboost_)554 void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_)
555 {
556 _in_hwm_boost = inhwmboost_;
557 _out_hwm_boost = outhwmboost_;
558 }
559
check_hwm() const560 bool zmq::pipe_t::check_hwm () const
561 {
562 const bool full =
563 _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm);
564 return !full;
565 }
566
send_hwms_to_peer(int inhwm_,int outhwm_)567 void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_)
568 {
569 send_pipe_hwm (_peer, inhwm_, outhwm_);
570 }
571
set_endpoint_pair(zmq::endpoint_uri_pair_t endpoint_pair_)572 void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_)
573 {
574 _endpoint_pair = ZMQ_MOVE (endpoint_pair_);
575 }
576
get_endpoint_pair() const577 const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
578 {
579 return _endpoint_pair;
580 }
581
send_stats_to_peer(own_t * socket_base_)582 void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
583 {
584 endpoint_uri_pair_t *ep =
585 new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
586 send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_,
587 ep);
588 }
589
process_pipe_peer_stats(uint64_t queue_count_,own_t * socket_base_,endpoint_uri_pair_t * endpoint_pair_)590 void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
591 own_t *socket_base_,
592 endpoint_uri_pair_t *endpoint_pair_)
593 {
594 send_pipe_stats_publish (socket_base_, queue_count_,
595 _msgs_written - _peers_msgs_read, endpoint_pair_);
596 }
597
send_disconnect_msg()598 void zmq::pipe_t::send_disconnect_msg ()
599 {
600 if (_disconnect_msg.size () > 0) {
601 // Rollback any incomplete message in the pipe, and push the disconnect message.
602 rollback ();
603
604 _out_pipe->write (_disconnect_msg, false);
605 flush ();
606 _disconnect_msg.init ();
607 }
608 }
609
set_disconnect_msg(const std::vector<unsigned char> & disconnect_)610 void zmq::pipe_t::set_disconnect_msg (
611 const std::vector<unsigned char> &disconnect_)
612 {
613 _disconnect_msg.close ();
614 const int rc =
615 _disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
616 errno_assert (rc == 0);
617 }
618