1 /*
2 Copyright (c) 2009-2012 250bpm s.r.o.
3 Copyright (c) 2007-2009 iMatix Corporation
4 Copyright (c) 2011 VMware, Inc.
5 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
6
7 This file is part of Crossroads I/O project.
8
9 Crossroads I/O is free software; you can redistribute it and/or modify it
10 under the terms of the GNU Lesser General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
13
14 Crossroads is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU Lesser General Public License for more details.
18
19 You should have received a copy of the GNU Lesser General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include <new>
24 #include <stddef.h>
25
26 #include "pipe.hpp"
27 #include "err.hpp"
28
pipepair(class object_t * parents_[2],class pipe_t * pipes_[2],int hwms_[2],bool delays_[2],int protocol_)29 int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
30 int hwms_ [2], bool delays_ [2], int protocol_)
31 {
32 // Creates two pipe objects. These objects are connected by two ypipes,
33 // each to pass messages in one direction.
34
35 pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t ();
36 alloc_assert (upipe1);
37 pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
38 alloc_assert (upipe2);
39
40 pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
41 hwms_ [1], hwms_ [0], delays_ [0], protocol_);
42 alloc_assert (pipes_ [0]);
43 pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
44 hwms_ [0], hwms_ [1], delays_ [1], protocol_);
45 alloc_assert (pipes_ [1]);
46
47 pipes_ [0]->set_peer (pipes_ [1]);
48 pipes_ [1]->set_peer (pipes_ [0]);
49
50 return 0;
51 }
52
pipe_t(object_t * parent_,upipe_t * inpipe_,upipe_t * outpipe_,int inhwm_,int outhwm_,bool delay_,int protocol_)53 xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
54 int inhwm_, int outhwm_, bool delay_, int protocol_) :
55 object_t (parent_),
56 inpipe (inpipe_),
57 outpipe (outpipe_),
58 in_active (true),
59 out_active (true),
60 hwm (outhwm_),
61 lwm (compute_lwm (inhwm_)),
62 msgs_read (0),
63 msgs_written (0),
64 peers_msgs_read (0),
65 peer (NULL),
66 sink (NULL),
67 state (active),
68 delay (delay_),
69 protocol (protocol_)
70 {
71 }
72
~pipe_t()73 xs::pipe_t::~pipe_t ()
74 {
75 }
76
set_peer(pipe_t * peer_)77 void xs::pipe_t::set_peer (pipe_t *peer_)
78 {
79 // Peer can be set once only.
80 xs_assert (!peer);
81 peer = peer_;
82 }
83
set_event_sink(i_pipe_events * sink_)84 void xs::pipe_t::set_event_sink (i_pipe_events *sink_)
85 {
86 // Sink can be set once only.
87 xs_assert (!sink);
88 sink = sink_;
89 }
90
set_identity(const blob_t & identity_)91 void xs::pipe_t::set_identity (const blob_t &identity_)
92 {
93 identity = identity_;
94 }
95
get_identity()96 xs::blob_t xs::pipe_t::get_identity ()
97 {
98 return identity;
99 }
100
check_read()101 bool xs::pipe_t::check_read ()
102 {
103 if (unlikely (!in_active || (state != active && state != pending)))
104 return false;
105
106 // Check if there's an item in the pipe.
107 if (!inpipe->check_read ()) {
108 in_active = false;
109 return false;
110 }
111
112 // If the next item in the pipe is message delimiter,
113 // initiate termination process.
114 if (inpipe->probe (is_delimiter)) {
115 msg_t msg;
116 bool ok = inpipe->read (&msg);
117 xs_assert (ok);
118 delimit ();
119 return false;
120 }
121
122 return true;
123 }
124
read(msg_t * msg_)125 bool xs::pipe_t::read (msg_t *msg_)
126 {
127 if (unlikely (!in_active || (state != active && state != pending)))
128 return false;
129
130 if (!inpipe->read (msg_)) {
131 in_active = false;
132 return false;
133 }
134
135 // If delimiter was read, start termination process of the pipe.
136 if (msg_->is_delimiter ()) {
137 delimit ();
138 return false;
139 }
140
141 if (!(msg_->flags () & msg_t::more))
142 msgs_read++;
143
144 if (lwm > 0 && msgs_read % lwm == 0)
145 send_activate_write (peer, msgs_read);
146
147 return true;
148 }
149
check_write(msg_t * msg_)150 bool xs::pipe_t::check_write (msg_t *msg_)
151 {
152 if (unlikely (!out_active || state != active))
153 return false;
154
155 bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
156
157 if (unlikely (full)) {
158 out_active = false;
159 return false;
160 }
161
162 return true;
163 }
164
write(msg_t * msg_)165 bool xs::pipe_t::write (msg_t *msg_)
166 {
167 if (unlikely (!check_write (msg_)))
168 return false;
169
170 bool more = msg_->flags () & msg_t::more ? true : false;
171 outpipe->write (*msg_, more);
172 if (!more)
173 msgs_written++;
174
175 return true;
176 }
177
rollback()178 void xs::pipe_t::rollback ()
179 {
180 // Remove incomplete message from the outbound pipe.
181 msg_t msg;
182 if (outpipe) {
183 while (outpipe->unwrite (&msg)) {
184 xs_assert (msg.flags () & msg_t::more);
185 int rc = msg.close ();
186 errno_assert (rc == 0);
187 }
188 }
189 }
190
flush()191 void xs::pipe_t::flush ()
192 {
193 // If terminate() was already called do nothing.
194 if (state == terminated || state == double_terminated)
195 return;
196
197 // The peer does not exist anymore at this point.
198 if (state == terminating)
199 return;
200
201 if (outpipe && !outpipe->flush ())
202 send_activate_read (peer);
203 }
204
process_activate_read()205 void xs::pipe_t::process_activate_read ()
206 {
207 if (!in_active && (state == active || state == pending)) {
208 in_active = true;
209 sink->read_activated (this);
210 }
211 }
212
process_activate_write(uint64_t msgs_read_)213 void xs::pipe_t::process_activate_write (uint64_t msgs_read_)
214 {
215 // Remember the peers's message sequence number.
216 peers_msgs_read = msgs_read_;
217
218 if (!out_active && state == active) {
219 out_active = true;
220 sink->write_activated (this);
221 }
222 }
223
process_hiccup(void * pipe_)224 void xs::pipe_t::process_hiccup (void *pipe_)
225 {
226 // Destroy old outpipe. Note that the read end of the pipe was already
227 // migrated to this thread.
228 xs_assert (outpipe);
229 outpipe->flush ();
230 msg_t msg;
231 while (outpipe->read (&msg)) {
232 int rc = msg.close ();
233 errno_assert (rc == 0);
234 }
235 delete outpipe;
236
237 // Plug in the new outpipe.
238 xs_assert (pipe_);
239 outpipe = (upipe_t*) pipe_;
240 out_active = true;
241
242 // If appropriate, notify the user about the hiccup.
243 if (state == active)
244 sink->hiccuped (this);
245 }
246
process_pipe_term()247 void xs::pipe_t::process_pipe_term ()
248 {
249 // This is the simple case of peer-induced termination. If there are no
250 // more pending messages to read, or if the pipe was configured to drop
251 // pending messages, we can move directly to the terminating state.
252 // Otherwise we'll hang up in pending state till all the pending messages
253 // are sent.
254 if (state == active) {
255 if (!delay) {
256 state = terminating;
257 outpipe = NULL;
258 send_pipe_term_ack (peer);
259 return;
260 }
261 else {
262 state = pending;
263 return;
264 }
265 }
266
267 // Delimiter happened to arrive before the term command. Now we have the
268 // term command as well, so we can move straight to terminating state.
269 if (state == delimited) {
270 state = terminating;
271 outpipe = NULL;
272 send_pipe_term_ack (peer);
273 return;
274 }
275
276 // This is the case where both ends of the pipe are closed in parallel.
277 // We simply reply to the request by ack and continue waiting for our
278 // own ack.
279 if (state == terminated) {
280 state = double_terminated;
281 outpipe = NULL;
282 send_pipe_term_ack (peer);
283 return;
284 }
285
286 // pipe_term is invalid in other states.
287 xs_assert (false);
288 }
289
process_pipe_term_ack()290 void xs::pipe_t::process_pipe_term_ack ()
291 {
292 // Notify the user that all the references to the pipe should be dropped.
293 xs_assert (sink);
294 sink->terminated (this);
295
296 // In terminating and double_terminated states there's nothing to do.
297 // Simply deallocate the pipe. In terminated state we have to ack the
298 // peer before deallocating this side of the pipe. All the other states
299 // are invalid.
300 if (state == terminating) ;
301 else if (state == double_terminated);
302 else if (state == terminated) {
303 outpipe = NULL;
304 send_pipe_term_ack (peer);
305 }
306 else
307 xs_assert (false);
308
309 // We'll deallocate the inbound pipe, the peer will deallocate the outbound
310 // pipe (which is an inbound pipe from its point of view).
311 // First, delete all the unread messages in the pipe. We have to do it by
312 // hand because msg_t doesn't have automatic destructor. Then deallocate
313 // the ypipe itself.
314 msg_t msg;
315 while (inpipe->read (&msg)) {
316 int rc = msg.close ();
317 errno_assert (rc == 0);
318 }
319 delete inpipe;
320
321 // Deallocate the pipe object
322 delete this;
323 }
324
terminate(bool delay_)325 void xs::pipe_t::terminate (bool delay_)
326 {
327 // Overload the value specified at pipe creation.
328 delay = delay_;
329
330 // If terminate was already called, we can ignore the duplicit invocation.
331 if (state == terminated || state == double_terminated)
332 return;
333
334 // If the pipe is in the final phase of async termination, it's going to
335 // closed anyway. No need to do anything special here.
336 else if (state == terminating)
337 return;
338
339 // The simple sync termination case. Ask the peer to terminate and wait
340 // for the ack.
341 else if (state == active) {
342 send_pipe_term (peer);
343 state = terminated;
344 }
345
346 // There are still pending messages available, but the user calls
347 // 'terminate'. We can act as if all the pending messages were read.
348 else if (state == pending && !delay) {
349 outpipe = NULL;
350 send_pipe_term_ack (peer);
351 state = terminating;
352 }
353
354 // If there are pending messages still availabe, do nothing.
355 else if (state == pending && delay) {
356 }
357
358 // We've already got delimiter, but not term command yet. We can ignore
359 // the delimiter and ack synchronously terminate as if we were in
360 // active state.
361 else if (state == delimited) {
362 send_pipe_term (peer);
363 state = terminated;
364 }
365
366 // There are no other states.
367 else
368 xs_assert (false);
369
370 // Stop outbound flow of messages.
371 out_active = false;
372
373 if (outpipe) {
374
375 // Rollback any unfinished outbound messages.
376 rollback ();
377
378 // Push delimiter into the outbound pipe. Note that watermarks are not
379 // checked thus the delimiter can be written even though the pipe
380 // is full.
381 msg_t msg;
382 msg.init_delimiter ();
383 outpipe->write (msg, false);
384 if (state != terminating && !outpipe->flush ())
385 send_activate_read (peer);
386 }
387 }
388
get_protocol()389 int xs::pipe_t::get_protocol ()
390 {
391 return protocol;
392 }
393
is_delimiter(msg_t & msg_)394 bool xs::pipe_t::is_delimiter (msg_t &msg_)
395 {
396 return msg_.is_delimiter ();
397 }
398
compute_lwm(int hwm_)399 int xs::pipe_t::compute_lwm (int hwm_)
400 {
401 // Compute the low water mark. Following point should be taken
402 // into consideration:
403 //
404 // 1. LWM has to be less than HWM.
405 // 2. LWM cannot be set to very low value (such as zero) as after filling
406 // the queue it would start to refill only after all the messages are
407 // read from it and thus unnecessarily hold the progress back.
408 // 3. LWM cannot be set to very high value (such as HWM-1) as it would
409 // result in lock-step filling of the queue - if a single message is
410 // read from a full queue, writer thread is resumed to write exactly one
411 // message to the queue and go back to sleep immediately. This would
412 // result in low performance.
413 //
414 // Given the 3. it would be good to keep HWM and LWM as far apart as
415 // possible to reduce the thread switching overhead to almost zero,
416 // say HWM-LWM should be max_wm_delta.
417 //
418 // That done, we still we have to account for the cases where
419 // HWM < max_wm_delta thus driving LWM to negative numbers.
420 // Let's make LWM 1/2 of HWM in such cases.
421 int result = (hwm_ > max_wm_delta * 2) ?
422 hwm_ - max_wm_delta : (hwm_ + 1) / 2;
423
424 return result;
425 }
426
delimit()427 void xs::pipe_t::delimit ()
428 {
429 if (state == active) {
430 state = delimited;
431 return;
432 }
433
434 if (state == pending) {
435 outpipe = NULL;
436 send_pipe_term_ack (peer);
437 state = terminating;
438 return;
439 }
440
441 // Delimiter in any other state is invalid.
442 xs_assert (false);
443 }
444
hiccup()445 void xs::pipe_t::hiccup ()
446 {
447 // If termination is already under way do nothing.
448 if (state != active)
449 return;
450
451 // We'll drop the pointer to the inpipe. From now on, the peer is
452 // responsible for deallocating it.
453 inpipe = NULL;
454
455 // Create new inpipe.
456 inpipe = new (std::nothrow) pipe_t::upipe_t ();
457 alloc_assert (inpipe);
458 in_active = true;
459
460 // Notify the peer about the hiccup.
461 send_hiccup (peer, (void*) inpipe);
462 }
463
464