1 /*
2     Copyright (c) 2009-2012 250bpm s.r.o.
3     Copyright (c) 2007-2009 iMatix Corporation
4     Copyright (c) 2011 VMware, Inc.
5     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
6 
7     This file is part of Crossroads I/O project.
8 
9     Crossroads I/O is free software; you can redistribute it and/or modify it
10     under the terms of the GNU Lesser General Public License as published by
11     the Free Software Foundation; either version 3 of the License, or
12     (at your option) any later version.
13 
14     Crossroads is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU Lesser General Public License for more details.
18 
19     You should have received a copy of the GNU Lesser General Public License
20     along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22 
23 #include <new>
24 #include <stddef.h>
25 
26 #include "pipe.hpp"
27 #include "err.hpp"
28 
pipepair(class object_t * parents_[2],class pipe_t * pipes_[2],int hwms_[2],bool delays_[2],int protocol_)29 int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
30     int hwms_ [2], bool delays_ [2], int protocol_)
31 {
32     //   Creates two pipe objects. These objects are connected by two ypipes,
33     //   each to pass messages in one direction.
34 
35     pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t ();
36     alloc_assert (upipe1);
37     pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
38     alloc_assert (upipe2);
39 
40     pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
41         hwms_ [1], hwms_ [0], delays_ [0], protocol_);
42     alloc_assert (pipes_ [0]);
43     pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
44         hwms_ [0], hwms_ [1], delays_ [1], protocol_);
45     alloc_assert (pipes_ [1]);
46 
47     pipes_ [0]->set_peer (pipes_ [1]);
48     pipes_ [1]->set_peer (pipes_ [0]);
49 
50     return 0;
51 }
52 
pipe_t(object_t * parent_,upipe_t * inpipe_,upipe_t * outpipe_,int inhwm_,int outhwm_,bool delay_,int protocol_)53 xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
54       int inhwm_, int outhwm_, bool delay_, int protocol_) :
55     object_t (parent_),
56     inpipe (inpipe_),
57     outpipe (outpipe_),
58     in_active (true),
59     out_active (true),
60     hwm (outhwm_),
61     lwm (compute_lwm (inhwm_)),
62     msgs_read (0),
63     msgs_written (0),
64     peers_msgs_read (0),
65     peer (NULL),
66     sink (NULL),
67     state (active),
68     delay (delay_),
69     protocol (protocol_)
70 {
71 }
72 
~pipe_t()73 xs::pipe_t::~pipe_t ()
74 {
75 }
76 
set_peer(pipe_t * peer_)77 void xs::pipe_t::set_peer (pipe_t *peer_)
78 {
79     //  Peer can be set once only.
80     xs_assert (!peer);
81     peer = peer_;
82 }
83 
set_event_sink(i_pipe_events * sink_)84 void xs::pipe_t::set_event_sink (i_pipe_events *sink_)
85 {
86     // Sink can be set once only.
87     xs_assert (!sink);
88     sink = sink_;
89 }
90 
set_identity(const blob_t & identity_)91 void xs::pipe_t::set_identity (const blob_t &identity_)
92 {
93     identity = identity_;
94 }
95 
get_identity()96 xs::blob_t xs::pipe_t::get_identity ()
97 {
98     return identity;
99 }
100 
check_read()101 bool xs::pipe_t::check_read ()
102 {
103     if (unlikely (!in_active || (state != active && state != pending)))
104         return false;
105 
106     //  Check if there's an item in the pipe.
107     if (!inpipe->check_read ()) {
108         in_active = false;
109         return false;
110     }
111 
112     //  If the next item in the pipe is message delimiter,
113     //  initiate termination process.
114     if (inpipe->probe (is_delimiter)) {
115         msg_t msg;
116         bool ok = inpipe->read (&msg);
117         xs_assert (ok);
118         delimit ();
119         return false;
120     }
121 
122     return true;
123 }
124 
read(msg_t * msg_)125 bool xs::pipe_t::read (msg_t *msg_)
126 {
127     if (unlikely (!in_active || (state != active && state != pending)))
128         return false;
129 
130     if (!inpipe->read (msg_)) {
131         in_active = false;
132         return false;
133     }
134 
135     //  If delimiter was read, start termination process of the pipe.
136     if (msg_->is_delimiter ()) {
137         delimit ();
138         return false;
139     }
140 
141     if (!(msg_->flags () & msg_t::more))
142         msgs_read++;
143 
144     if (lwm > 0 && msgs_read % lwm == 0)
145         send_activate_write (peer, msgs_read);
146 
147     return true;
148 }
149 
check_write(msg_t * msg_)150 bool xs::pipe_t::check_write (msg_t *msg_)
151 {
152     if (unlikely (!out_active || state != active))
153         return false;
154 
155     bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
156 
157     if (unlikely (full)) {
158         out_active = false;
159         return false;
160     }
161 
162     return true;
163 }
164 
write(msg_t * msg_)165 bool xs::pipe_t::write (msg_t *msg_)
166 {
167     if (unlikely (!check_write (msg_)))
168         return false;
169 
170     bool more = msg_->flags () & msg_t::more ? true : false;
171     outpipe->write (*msg_, more);
172     if (!more)
173         msgs_written++;
174 
175     return true;
176 }
177 
rollback()178 void xs::pipe_t::rollback ()
179 {
180     //  Remove incomplete message from the outbound pipe.
181     msg_t msg;
182     if (outpipe) {
183 		while (outpipe->unwrite (&msg)) {
184 		    xs_assert (msg.flags () & msg_t::more);
185 		    int rc = msg.close ();
186 		    errno_assert (rc == 0);
187 		}
188     }
189 }
190 
flush()191 void xs::pipe_t::flush ()
192 {
193     //  If terminate() was already called do nothing.
194     if (state == terminated || state == double_terminated)
195         return;
196 
197     //  The peer does not exist anymore at this point.
198     if (state == terminating)
199         return;
200 
201     if (outpipe && !outpipe->flush ())
202         send_activate_read (peer);
203 }
204 
process_activate_read()205 void xs::pipe_t::process_activate_read ()
206 {
207     if (!in_active && (state == active || state == pending)) {
208         in_active = true;
209         sink->read_activated (this);
210     }
211 }
212 
process_activate_write(uint64_t msgs_read_)213 void xs::pipe_t::process_activate_write (uint64_t msgs_read_)
214 {
215     //  Remember the peers's message sequence number.
216     peers_msgs_read = msgs_read_;
217 
218     if (!out_active && state == active) {
219         out_active = true;
220         sink->write_activated (this);
221     }
222 }
223 
process_hiccup(void * pipe_)224 void xs::pipe_t::process_hiccup (void *pipe_)
225 {
226     //  Destroy old outpipe. Note that the read end of the pipe was already
227     //  migrated to this thread.
228     xs_assert (outpipe);
229     outpipe->flush ();
230     msg_t msg;
231     while (outpipe->read (&msg)) {
232        int rc = msg.close ();
233        errno_assert (rc == 0);
234     }
235     delete outpipe;
236 
237     //  Plug in the new outpipe.
238     xs_assert (pipe_);
239     outpipe = (upipe_t*) pipe_;
240     out_active = true;
241 
242     //  If appropriate, notify the user about the hiccup.
243     if (state == active)
244         sink->hiccuped (this);
245 }
246 
process_pipe_term()247 void xs::pipe_t::process_pipe_term ()
248 {
249     //  This is the simple case of peer-induced termination. If there are no
250     //  more pending messages to read, or if the pipe was configured to drop
251     //  pending messages, we can move directly to the terminating state.
252     //  Otherwise we'll hang up in pending state till all the pending messages
253     //  are sent.
254     if (state == active) {
255         if (!delay) {
256             state = terminating;
257             outpipe = NULL;
258             send_pipe_term_ack (peer);
259             return;
260         }
261         else {
262             state = pending;
263             return;
264         }
265     }
266 
267     //  Delimiter happened to arrive before the term command. Now we have the
268     //  term command as well, so we can move straight to terminating state.
269     if (state == delimited) {
270         state = terminating;
271         outpipe = NULL;
272         send_pipe_term_ack (peer);
273         return;
274     }
275 
276     //  This is the case where both ends of the pipe are closed in parallel.
277     //  We simply reply to the request by ack and continue waiting for our
278     //  own ack.
279     if (state == terminated) {
280         state = double_terminated;
281         outpipe = NULL;
282         send_pipe_term_ack (peer);
283         return;
284     }
285 
286     //  pipe_term is invalid in other states.
287     xs_assert (false);
288 }
289 
process_pipe_term_ack()290 void xs::pipe_t::process_pipe_term_ack ()
291 {
292     //  Notify the user that all the references to the pipe should be dropped.
293     xs_assert (sink);
294     sink->terminated (this);
295 
296     //  In terminating and double_terminated states there's nothing to do.
297     //  Simply deallocate the pipe. In terminated state we have to ack the
298     //  peer before deallocating this side of the pipe. All the other states
299     //  are invalid.
300     if (state == terminating) ;
301     else if (state == double_terminated);
302     else if (state == terminated) {
303         outpipe = NULL;
304         send_pipe_term_ack (peer);
305     }
306     else
307         xs_assert (false);
308 
309     //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
310     //  pipe (which is an inbound pipe from its point of view).
311     //  First, delete all the unread messages in the pipe. We have to do it by
312     //  hand because msg_t doesn't have automatic destructor. Then deallocate
313     //  the ypipe itself.
314     msg_t msg;
315     while (inpipe->read (&msg)) {
316        int rc = msg.close ();
317        errno_assert (rc == 0);
318     }
319     delete inpipe;
320 
321     //  Deallocate the pipe object
322     delete this;
323 }
324 
terminate(bool delay_)325 void xs::pipe_t::terminate (bool delay_)
326 {
327     //  Overload the value specified at pipe creation.
328     delay = delay_;
329 
330     //  If terminate was already called, we can ignore the duplicit invocation.
331     if (state == terminated || state == double_terminated)
332         return;
333 
334     //  If the pipe is in the final phase of async termination, it's going to
335     //  closed anyway. No need to do anything special here.
336     else if (state == terminating)
337         return;
338 
339     //  The simple sync termination case. Ask the peer to terminate and wait
340     //  for the ack.
341     else if (state == active) {
342         send_pipe_term (peer);
343         state = terminated;
344     }
345 
346     //  There are still pending messages available, but the user calls
347     //  'terminate'. We can act as if all the pending messages were read.
348     else if (state == pending && !delay) {
349         outpipe = NULL;
350         send_pipe_term_ack (peer);
351         state = terminating;
352     }
353 
354     //  If there are pending messages still availabe, do nothing.
355     else if (state == pending && delay) {
356     }
357 
358     //  We've already got delimiter, but not term command yet. We can ignore
359     //  the delimiter and ack synchronously terminate as if we were in
360     //  active state.
361     else if (state == delimited) {
362         send_pipe_term (peer);
363         state = terminated;
364     }
365 
366     //  There are no other states.
367     else
368         xs_assert (false);
369 
370     //  Stop outbound flow of messages.
371     out_active = false;
372 
373     if (outpipe) {
374 
375 		//  Rollback any unfinished outbound messages.
376 		rollback ();
377 
378 		//  Push delimiter into the outbound pipe. Note that watermarks are not
379 		//  checked thus the delimiter can be written even though the pipe
380         //  is full.
381 		msg_t msg;
382 		msg.init_delimiter ();
383 		outpipe->write (msg, false);
384         if (state != terminating && !outpipe->flush ())
385             send_activate_read (peer);
386     }
387 }
388 
get_protocol()389 int xs::pipe_t::get_protocol ()
390 {
391     return protocol;
392 }
393 
is_delimiter(msg_t & msg_)394 bool xs::pipe_t::is_delimiter (msg_t &msg_)
395 {
396     return msg_.is_delimiter ();
397 }
398 
compute_lwm(int hwm_)399 int xs::pipe_t::compute_lwm (int hwm_)
400 {
401     //  Compute the low water mark. Following point should be taken
402     //  into consideration:
403     //
404     //  1. LWM has to be less than HWM.
405     //  2. LWM cannot be set to very low value (such as zero) as after filling
406     //     the queue it would start to refill only after all the messages are
407     //     read from it and thus unnecessarily hold the progress back.
408     //  3. LWM cannot be set to very high value (such as HWM-1) as it would
409     //     result in lock-step filling of the queue - if a single message is
410     //     read from a full queue, writer thread is resumed to write exactly one
411     //     message to the queue and go back to sleep immediately. This would
412     //     result in low performance.
413     //
414     //  Given the 3. it would be good to keep HWM and LWM as far apart as
415     //  possible to reduce the thread switching overhead to almost zero,
416     //  say HWM-LWM should be max_wm_delta.
417     //
418     //  That done, we still we have to account for the cases where
419     //  HWM < max_wm_delta thus driving LWM to negative numbers.
420     //  Let's make LWM 1/2 of HWM in such cases.
421     int result = (hwm_ > max_wm_delta * 2) ?
422         hwm_ - max_wm_delta : (hwm_ + 1) / 2;
423 
424     return result;
425 }
426 
delimit()427 void xs::pipe_t::delimit ()
428 {
429     if (state == active) {
430         state = delimited;
431         return;
432     }
433 
434     if (state == pending) {
435         outpipe = NULL;
436         send_pipe_term_ack (peer);
437         state = terminating;
438         return;
439     }
440 
441     //  Delimiter in any other state is invalid.
442     xs_assert (false);
443 }
444 
hiccup()445 void xs::pipe_t::hiccup ()
446 {
447     //  If termination is already under way do nothing.
448     if (state != active)
449         return;
450 
451     //  We'll drop the pointer to the inpipe. From now on, the peer is
452     //  responsible for deallocating it.
453     inpipe = NULL;
454 
455     //  Create new inpipe.
456     inpipe = new (std::nothrow) pipe_t::upipe_t ();
457     alloc_assert (inpipe);
458     in_active = true;
459 
460     //  Notify the peer about the hiccup.
461     send_hiccup (peer, (void*) inpipe);
462 }
463 
464