1 /* 2 Copyright (c) 2009-2012 250bpm s.r.o. 3 Copyright (c) 2007-2009 iMatix Corporation 4 Copyright (c) 2011 VMware, Inc. 5 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 6 7 This file is part of Crossroads I/O project. 8 9 Crossroads I/O is free software; you can redistribute it and/or modify it 10 under the terms of the GNU Lesser General Public License as published by 11 the Free Software Foundation; either version 3 of the License, or 12 (at your option) any later version. 13 14 Crossroads is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU Lesser General Public License for more details. 18 19 You should have received a copy of the GNU Lesser General Public License 20 along with this program. If not, see <http://www.gnu.org/licenses/>. 21 */ 22 23 #ifndef __XS_PIPE_HPP_INCLUDED__ 24 #define __XS_PIPE_HPP_INCLUDED__ 25 26 #include "msg.hpp" 27 #include "ypipe.hpp" 28 #include "config.hpp" 29 #include "object.hpp" 30 #include "stdint.hpp" 31 #include "array.hpp" 32 #include "blob.hpp" 33 34 namespace xs 35 { 36 37 class object_t; 38 class pipe_t; 39 40 // Create a pipepair for bi-directional transfer of messages. 41 // First HWM is for messages passed from first pipe to the second pipe. 42 // Second HWM is for messages passed from second pipe to the first pipe. 43 // Delay specifies how the pipe behaves when the peer terminates. If true 44 // pipe receives all the pending messages before terminating, otherwise it 45 // terminates straight away. 46 int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], 47 int hwms_ [2], bool delays_ [2], int protocol_); 48 49 struct i_pipe_events 50 { ~i_pipe_eventsxs::i_pipe_events51 virtual ~i_pipe_events () {} 52 53 virtual void read_activated (xs::pipe_t *pipe_) = 0; 54 virtual void write_activated (xs::pipe_t *pipe_) = 0; 55 virtual void hiccuped (xs::pipe_t *pipe_) = 0; 56 virtual void terminated (xs::pipe_t *pipe_) = 0; 57 }; 58 59 // Note that pipe can be stored in three different arrays. 60 // The array of inbound pipes (1), the array of outbound pipes (2) and 61 // the generic array of pipes to deallocate (3). 62 63 class pipe_t : 64 public object_t, 65 public array_item_t <1>, 66 public array_item_t <2>, 67 public array_item_t <3> 68 { 69 // This allows pipepair to create pipe objects. 70 friend int pipepair (xs::object_t *parents_ [2], 71 xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], 72 int protocol_); 73 74 public: 75 76 // Specifies the object to send events to. 77 void set_event_sink (i_pipe_events *sink_); 78 79 // Pipe endpoint can store an opaque ID to be used by its clients. 80 void set_identity (const blob_t &identity_); 81 blob_t get_identity (); 82 83 // Returns true if there is at least one message to read in the pipe. 84 bool check_read (); 85 86 // Reads a message to the underlying pipe. 87 bool read (msg_t *msg_); 88 89 // Checks whether messages can be written to the pipe. If writing 90 // the message would cause high watermark the function returns false. 91 bool check_write (msg_t *msg_); 92 93 // Writes a message to the underlying pipe. Returns false if the 94 // message cannot be written because high watermark was reached. 95 bool write (msg_t *msg_); 96 97 // Remove unfinished parts of the outbound message from the pipe. 98 void rollback (); 99 100 // Flush the messages downsteam. 101 void flush (); 102 103 // Temporaraily disconnects the inbound message stream and drops 104 // all the messages on the fly. Causes 'hiccuped' event to be generated 105 // in the peer. 106 void hiccup (); 107 108 // Ask pipe to terminate. The termination will happen asynchronously 109 // and user will be notified about actual deallocation by 'terminated' 110 // event. If delay is true, the pending messages will be processed 111 // before actual shutdown. 112 void terminate (bool delay_); 113 114 // Returns the ID of the protocol associated with the pipe. 115 int get_protocol (); 116 117 private: 118 119 // Type of the underlying lock-free pipe. 120 typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; 121 122 // Command handlers. 123 void process_activate_read (); 124 void process_activate_write (uint64_t msgs_read_); 125 void process_hiccup (void *pipe_); 126 void process_pipe_term (); 127 void process_pipe_term_ack (); 128 129 // Handler for delimiter read from the pipe. 130 void delimit (); 131 132 // Constructor is private. Pipe can only be created using 133 // pipepair function. 134 pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, 135 int inhwm_, int outhwm_, bool delay_, int protocol_); 136 137 // Pipepair uses this function to let us know about 138 // the peer pipe object. 139 void set_peer (pipe_t *pipe_); 140 141 // Destructor is private. Pipe objects destroy themselves. 142 ~pipe_t (); 143 144 // Underlying pipes for both directions. 145 upipe_t *inpipe; 146 upipe_t *outpipe; 147 148 // Can the pipe be read from / written to? 149 bool in_active; 150 bool out_active; 151 152 // High watermark for the outbound pipe. 153 int hwm; 154 155 // Low watermark for the inbound pipe. 156 int lwm; 157 158 // Number of messages read and written so far. 159 uint64_t msgs_read; 160 uint64_t msgs_written; 161 162 // Last received peer's msgs_read. The actual number in the peer 163 // can be higher at the moment. 164 uint64_t peers_msgs_read; 165 166 // The pipe object on the other side of the pipepair. 167 pipe_t *peer; 168 169 // Sink to send events to. 170 i_pipe_events *sink; 171 172 // State of the pipe endpoint. Active is common state before any 173 // termination begins. Delimited means that delimiter was read from 174 // pipe before term command was received. Pending means that term 175 // command was already received from the peer but there are still 176 // pending messages to read. Terminating means that all pending 177 // messages were already read and all we are waiting for is ack from 178 // the peer. Terminated means that 'terminate' was explicitly called 179 // by the user. Double_terminated means that user called 'terminate' 180 // and then we've got term command from the peer as well. 181 enum { 182 active, 183 delimited, 184 pending, 185 terminating, 186 terminated, 187 double_terminated 188 } state; 189 190 // If true, we receive all the pending inbound messages before 191 // terminating. If false, we terminate immediately when the peer 192 // asks us to. 193 bool delay; 194 195 // ID of the protocol to use. This value is not used by the pipe 196 // itself, rather it's used by the users of the pipe. 197 int protocol; 198 199 // Identity of the writer. Used uniquely by the reader side. 200 blob_t identity; 201 202 // Returns true if the message is delimiter; false otherwise. 203 static bool is_delimiter (msg_t &msg_); 204 205 // Computes appropriate low watermark from the given high watermark. 206 static int compute_lwm (int hwm_); 207 208 // Disable copying. 209 pipe_t (const pipe_t&); 210 const pipe_t &operator = (const pipe_t&); 211 }; 212 213 } 214 215 #endif 216