1 /* Copyright (c) 2003-2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #ifndef TCP_TRANSPORTER_HPP
18 #define TCP_TRANSPORTER_HPP
19 
20 #include "Transporter.hpp"
21 #include "SendBuffer.hpp"
22 
23 #include <NdbTCP.h>
24 
25 struct ReceiveBuffer {
26   Uint32 *startOfBuffer;    // Pointer to start of the receive buffer
27   Uint32 *readPtr;          // Pointer to start reading data
28 
29   char   *insertPtr;        // Pointer to first position in the receiveBuffer
30                             // in which to insert received data. Earlier
31                             // received incomplete messages (slack) are
32                             // copied into the first part of the receiveBuffer
33 
34   Uint32 sizeOfData;        // In bytes
35   Uint32 sizeOfBuffer;
36 
ReceiveBufferReceiveBuffer37   ReceiveBuffer() {}
38   bool init(int bytes);
39   void destroy();
40 
41   void clear();
42   void incompleteMessage();
43 };
44 
45 class TCP_Transporter : public Transporter {
46   friend class TransporterRegistry;
47 private:
48   // Initialize member variables
49   TCP_Transporter(TransporterRegistry&,
50 		  int sendBufferSize, int maxReceiveSize,
51 		  const char *lHostName,
52 		  const char *rHostName,
53 		  int r_port,
54 		  bool isMgmConnection,
55 		  NodeId lHostId,
56 		  NodeId rHostId,
57 		  NodeId serverNodeId,
58 		  bool checksum, bool signalId,
59 		  Uint32 reportFreq = 4096);
60 
61   // Disconnect, delete send buffers and receive buffer
62   virtual ~TCP_Transporter();
63 
64   /**
65    * Allocate buffers for sending and receiving
66    */
67   bool initTransporter();
68 
69   Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio);
70   void updateWritePtr(Uint32 lenBytes, Uint32 prio);
71 
72   bool hasDataToSend() const ;
73 
74   /**
75    * Retrieves the contents of the send buffers and writes it on
76    * the external TCP/IP interface until the send buffers are empty
77    * and as long as write is possible.
78    */
79   bool doSend();
80 
81   /**
82    * It reads the external TCP/IP interface once
83    * and puts the data in the receiveBuffer
84    */
85   int doReceive();
86 
87   /**
88    * Returns socket (used for select)
89    */
90   NDB_SOCKET_TYPE getSocket() const;
91 
92   /**
93    * Get Receive Data
94    *
95    *  Returns - no of bytes to read
96    *            and set ptr
97    */
98   virtual Uint32 getReceiveData(Uint32 ** ptr);
99 
100   /**
101    * Update receive data ptr
102    */
103   virtual void updateReceiveDataPtr(Uint32 bytesRead);
104 
105   virtual Uint32 get_free_buffer() const;
106 
hasReceiveData() const107   inline bool hasReceiveData () const {
108     return receiveBuffer.sizeOfData > 0;
109   }
110 protected:
111   /**
112    * Setup client/server and perform connect/accept
113    * Is used both by clients and servers
114    * A client connects to the remote server
115    * A server accepts any new connections
116    */
117   virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
118   virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
119   bool connect_common(NDB_SOCKET_TYPE sockfd);
120 
121   /**
122    * Disconnects a TCP/IP node. Empty send and receivebuffer.
123    */
124   virtual void disconnectImpl();
125 
126 private:
127   /**
128    * Send buffers
129    */
130   SendBuffer m_sendBuffer;
131 
132   // Sending/Receiving socket used by both client and server
133   NDB_SOCKET_TYPE theSocket;
134 
135   Uint32 maxReceiveSize;
136 
137   /**
138    * Socket options
139    */
140   int sockOptRcvBufSize;
141   int sockOptSndBufSize;
142   int sockOptNodelay;
143   int sockOptTcpMaxSeg;
144 
145   void setSocketOptions();
146 
147   static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
148 
149   bool sendIsPossible(struct timeval * timeout);
150 
151   /**
152    * Statistics
153    */
154   Uint32 reportFreq;
155   Uint32 receiveCount;
156   Uint64 receiveSize;
157   Uint32 sendCount;
158   Uint64 sendSize;
159 
160   ReceiveBuffer receiveBuffer;
161 };
162 
163 inline
164 NDB_SOCKET_TYPE
getSocket() const165 TCP_Transporter::getSocket() const {
166   return theSocket;
167 }
168 
169 inline
170 Uint32
getReceiveData(Uint32 ** ptr)171 TCP_Transporter::getReceiveData(Uint32 ** ptr){
172   (* ptr) = receiveBuffer.readPtr;
173   return receiveBuffer.sizeOfData;
174 }
175 
176 inline
177 void
updateReceiveDataPtr(Uint32 bytesRead)178 TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){
179   char * ptr = (char *)receiveBuffer.readPtr;
180   ptr += bytesRead;
181   receiveBuffer.readPtr = (Uint32*)ptr;
182   receiveBuffer.sizeOfData -= bytesRead;
183   receiveBuffer.incompleteMessage();
184 }
185 
186 inline
187 bool
hasDataToSend() const188 TCP_Transporter::hasDataToSend() const {
189   return m_sendBuffer.dataSize > 0;
190 }
191 
192 inline
193 bool
init(int bytes)194 ReceiveBuffer::init(int bytes){
195 #ifdef DEBUG_TRANSPORTER
196   ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;
197 #endif
198 
199   startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1];
200   sizeOfBuffer  = bytes + sizeof(Uint32);
201   clear();
202   return true;
203 }
204 
205 inline
206 void
destroy()207 ReceiveBuffer::destroy(){
208   delete[] startOfBuffer;
209   sizeOfBuffer  = 0;
210   startOfBuffer = 0;
211   clear();
212 }
213 
214 inline
215 void
clear()216 ReceiveBuffer::clear(){
217   readPtr    = startOfBuffer;
218   insertPtr  = (char *)startOfBuffer;
219   sizeOfData = 0;
220 }
221 
222 inline
223 void
incompleteMessage()224 ReceiveBuffer::incompleteMessage() {
225   if(startOfBuffer != readPtr){
226     if(sizeOfData != 0)
227       memmove(startOfBuffer, readPtr, sizeOfData);
228     readPtr   = startOfBuffer;
229     insertPtr = ((char *)startOfBuffer) + sizeOfData;
230   }
231 }
232 
233 
234 #endif // Define of TCP_Transporter_H
235