1 /*
2     Copyright (c) 2009-2012 250bpm s.r.o.
3     Copyright (c) 2007-2009 iMatix Corporation
4     Copyright (c) 2011 VMware, Inc.
5     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
6 
7     This file is part of Crossroads I/O project.
8 
9     Crossroads I/O is free software; you can redistribute it and/or modify it
10     under the terms of the GNU Lesser General Public License as published by
11     the Free Software Foundation; either version 3 of the License, or
12     (at your option) any later version.
13 
14     Crossroads is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU Lesser General Public License for more details.
18 
19     You should have received a copy of the GNU Lesser General Public License
20     along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22 
23 #ifndef __XS_PIPE_HPP_INCLUDED__
24 #define __XS_PIPE_HPP_INCLUDED__
25 
26 #include "msg.hpp"
27 #include "ypipe.hpp"
28 #include "config.hpp"
29 #include "object.hpp"
30 #include "stdint.hpp"
31 #include "array.hpp"
32 #include "blob.hpp"
33 
34 namespace xs
35 {
36 
37     class object_t;
38     class pipe_t;
39 
40     //  Create a pipepair for bi-directional transfer of messages.
41     //  First HWM is for messages passed from first pipe to the second pipe.
42     //  Second HWM is for messages passed from second pipe to the first pipe.
43     //  Delay specifies how the pipe behaves when the peer terminates. If true
44     //  pipe receives all the pending messages before terminating, otherwise it
45     //  terminates straight away.
46     int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
47         int hwms_ [2], bool delays_ [2], int protocol_);
48 
49     struct i_pipe_events
50     {
~i_pipe_eventsxs::i_pipe_events51         virtual ~i_pipe_events () {}
52 
53         virtual void read_activated (xs::pipe_t *pipe_) = 0;
54         virtual void write_activated (xs::pipe_t *pipe_) = 0;
55         virtual void hiccuped (xs::pipe_t *pipe_) = 0;
56         virtual void terminated (xs::pipe_t *pipe_) = 0;
57     };
58 
59     //  Note that pipe can be stored in three different arrays.
60     //  The array of inbound pipes (1), the array of outbound pipes (2) and
61     //  the generic array of pipes to deallocate (3).
62 
63     class pipe_t :
64         public object_t,
65         public array_item_t <1>,
66         public array_item_t <2>,
67         public array_item_t <3>
68     {
69         //  This allows pipepair to create pipe objects.
70         friend int pipepair (xs::object_t *parents_ [2],
71             xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
72             int protocol_);
73 
74     public:
75 
76         //  Specifies the object to send events to.
77         void set_event_sink (i_pipe_events *sink_);
78 
79         //  Pipe endpoint can store an opaque ID to be used by its clients.
80         void set_identity (const blob_t &identity_);
81         blob_t get_identity ();
82 
83         //  Returns true if there is at least one message to read in the pipe.
84         bool check_read ();
85 
86         //  Reads a message to the underlying pipe.
87         bool read (msg_t *msg_);
88 
89         //  Checks whether messages can be written to the pipe. If writing
90         //  the message would cause high watermark the function returns false.
91         bool check_write (msg_t *msg_);
92 
93         //  Writes a message to the underlying pipe. Returns false if the
94         //  message cannot be written because high watermark was reached.
95         bool write (msg_t *msg_);
96 
97         //  Remove unfinished parts of the outbound message from the pipe.
98         void rollback ();
99 
100         //  Flush the messages downsteam.
101         void flush ();
102 
103         //  Temporaraily disconnects the inbound message stream and drops
104         //  all the messages on the fly. Causes 'hiccuped' event to be generated
105         //  in the peer.
106         void hiccup ();
107 
108         //  Ask pipe to terminate. The termination will happen asynchronously
109         //  and user will be notified about actual deallocation by 'terminated'
110         //  event. If delay is true, the pending messages will be processed
111         //  before actual shutdown.
112         void terminate (bool delay_);
113 
114         //  Returns the ID of the protocol associated with the pipe.
115         int get_protocol ();
116 
117     private:
118 
119         //  Type of the underlying lock-free pipe.
120         typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
121 
122         //  Command handlers.
123         void process_activate_read ();
124         void process_activate_write (uint64_t msgs_read_);
125         void process_hiccup (void *pipe_);
126         void process_pipe_term ();
127         void process_pipe_term_ack ();
128 
129         //  Handler for delimiter read from the pipe.
130         void delimit ();
131 
132         //  Constructor is private. Pipe can only be created using
133         //  pipepair function.
134         pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
135             int inhwm_, int outhwm_, bool delay_, int protocol_);
136 
137         //  Pipepair uses this function to let us know about
138         //  the peer pipe object.
139         void set_peer (pipe_t *pipe_);
140 
141         //  Destructor is private. Pipe objects destroy themselves.
142         ~pipe_t ();
143 
144         //  Underlying pipes for both directions.
145         upipe_t *inpipe;
146         upipe_t *outpipe;
147 
148         //  Can the pipe be read from / written to?
149         bool in_active;
150         bool out_active;
151 
152         //  High watermark for the outbound pipe.
153         int hwm;
154 
155         //  Low watermark for the inbound pipe.
156         int lwm;
157 
158         //  Number of messages read and written so far.
159         uint64_t msgs_read;
160         uint64_t msgs_written;
161 
162         //  Last received peer's msgs_read. The actual number in the peer
163         //  can be higher at the moment.
164         uint64_t peers_msgs_read;
165 
166         //  The pipe object on the other side of the pipepair.
167         pipe_t *peer;
168 
169         //  Sink to send events to.
170         i_pipe_events *sink;
171 
172         //  State of the pipe endpoint. Active is common state before any
173         //  termination begins. Delimited means that delimiter was read from
174         //  pipe before term command was received. Pending means that term
175         //  command was already received from the peer but there are still
176         //  pending messages to read. Terminating means that all pending
177         //  messages were already read and all we are waiting for is ack from
178         //  the peer. Terminated means that 'terminate' was explicitly called
179         //  by the user. Double_terminated means that user called 'terminate'
180         //  and then we've got term command from the peer as well.
181         enum {
182             active,
183             delimited,
184             pending,
185             terminating,
186             terminated,
187             double_terminated
188         } state;
189 
190         //  If true, we receive all the pending inbound messages before
191         //  terminating. If false, we terminate immediately when the peer
192         //  asks us to.
193         bool delay;
194 
195         //  ID of the protocol to use. This value is not used by the pipe
196         //  itself, rather it's used by the users of the pipe.
197         int protocol;
198 
199         //  Identity of the writer. Used uniquely by the reader side.
200         blob_t identity;
201 
202         //  Returns true if the message is delimiter; false otherwise.
203         static bool is_delimiter (msg_t &msg_);
204 
205         //  Computes appropriate low watermark from the given high watermark.
206         static int compute_lwm (int hwm_);
207 
208         //  Disable copying.
209         pipe_t (const pipe_t&);
210         const pipe_t &operator = (const pipe_t&);
211     };
212 
213 }
214 
215 #endif
216