1 /* Copyright (c) 2003-2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #ifndef TCP_TRANSPORTER_HPP
18 #define TCP_TRANSPORTER_HPP
19
20 #include "Transporter.hpp"
21 #include "SendBuffer.hpp"
22
23 #include <NdbTCP.h>
24
25 struct ReceiveBuffer {
26 Uint32 *startOfBuffer; // Pointer to start of the receive buffer
27 Uint32 *readPtr; // Pointer to start reading data
28
29 char *insertPtr; // Pointer to first position in the receiveBuffer
30 // in which to insert received data. Earlier
31 // received incomplete messages (slack) are
32 // copied into the first part of the receiveBuffer
33
34 Uint32 sizeOfData; // In bytes
35 Uint32 sizeOfBuffer;
36
ReceiveBufferReceiveBuffer37 ReceiveBuffer() {}
38 bool init(int bytes);
39 void destroy();
40
41 void clear();
42 void incompleteMessage();
43 };
44
45 class TCP_Transporter : public Transporter {
46 friend class TransporterRegistry;
47 private:
48 // Initialize member variables
49 TCP_Transporter(TransporterRegistry&,
50 int sendBufferSize, int maxReceiveSize,
51 const char *lHostName,
52 const char *rHostName,
53 int r_port,
54 bool isMgmConnection,
55 NodeId lHostId,
56 NodeId rHostId,
57 NodeId serverNodeId,
58 bool checksum, bool signalId,
59 Uint32 reportFreq = 4096);
60
61 // Disconnect, delete send buffers and receive buffer
62 virtual ~TCP_Transporter();
63
64 /**
65 * Allocate buffers for sending and receiving
66 */
67 bool initTransporter();
68
69 Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio);
70 void updateWritePtr(Uint32 lenBytes, Uint32 prio);
71
72 bool hasDataToSend() const ;
73
74 /**
75 * Retrieves the contents of the send buffers and writes it on
76 * the external TCP/IP interface until the send buffers are empty
77 * and as long as write is possible.
78 */
79 bool doSend();
80
81 /**
82 * It reads the external TCP/IP interface once
83 * and puts the data in the receiveBuffer
84 */
85 int doReceive();
86
87 /**
88 * Returns socket (used for select)
89 */
90 NDB_SOCKET_TYPE getSocket() const;
91
92 /**
93 * Get Receive Data
94 *
95 * Returns - no of bytes to read
96 * and set ptr
97 */
98 virtual Uint32 getReceiveData(Uint32 ** ptr);
99
100 /**
101 * Update receive data ptr
102 */
103 virtual void updateReceiveDataPtr(Uint32 bytesRead);
104
105 virtual Uint32 get_free_buffer() const;
106
hasReceiveData() const107 inline bool hasReceiveData () const {
108 return receiveBuffer.sizeOfData > 0;
109 }
110 protected:
111 /**
112 * Setup client/server and perform connect/accept
113 * Is used both by clients and servers
114 * A client connects to the remote server
115 * A server accepts any new connections
116 */
117 virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
118 virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
119 bool connect_common(NDB_SOCKET_TYPE sockfd);
120
121 /**
122 * Disconnects a TCP/IP node. Empty send and receivebuffer.
123 */
124 virtual void disconnectImpl();
125
126 private:
127 /**
128 * Send buffers
129 */
130 SendBuffer m_sendBuffer;
131
132 // Sending/Receiving socket used by both client and server
133 NDB_SOCKET_TYPE theSocket;
134
135 Uint32 maxReceiveSize;
136
137 /**
138 * Socket options
139 */
140 int sockOptRcvBufSize;
141 int sockOptSndBufSize;
142 int sockOptNodelay;
143 int sockOptTcpMaxSeg;
144
145 void setSocketOptions();
146
147 static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
148
149 bool sendIsPossible(struct timeval * timeout);
150
151 /**
152 * Statistics
153 */
154 Uint32 reportFreq;
155 Uint32 receiveCount;
156 Uint64 receiveSize;
157 Uint32 sendCount;
158 Uint64 sendSize;
159
160 ReceiveBuffer receiveBuffer;
161 };
162
163 inline
164 NDB_SOCKET_TYPE
getSocket() const165 TCP_Transporter::getSocket() const {
166 return theSocket;
167 }
168
169 inline
170 Uint32
getReceiveData(Uint32 ** ptr)171 TCP_Transporter::getReceiveData(Uint32 ** ptr){
172 (* ptr) = receiveBuffer.readPtr;
173 return receiveBuffer.sizeOfData;
174 }
175
176 inline
177 void
updateReceiveDataPtr(Uint32 bytesRead)178 TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){
179 char * ptr = (char *)receiveBuffer.readPtr;
180 ptr += bytesRead;
181 receiveBuffer.readPtr = (Uint32*)ptr;
182 receiveBuffer.sizeOfData -= bytesRead;
183 receiveBuffer.incompleteMessage();
184 }
185
186 inline
187 bool
hasDataToSend() const188 TCP_Transporter::hasDataToSend() const {
189 return m_sendBuffer.dataSize > 0;
190 }
191
192 inline
193 bool
init(int bytes)194 ReceiveBuffer::init(int bytes){
195 #ifdef DEBUG_TRANSPORTER
196 ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;
197 #endif
198
199 startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1];
200 sizeOfBuffer = bytes + sizeof(Uint32);
201 clear();
202 return true;
203 }
204
205 inline
206 void
destroy()207 ReceiveBuffer::destroy(){
208 delete[] startOfBuffer;
209 sizeOfBuffer = 0;
210 startOfBuffer = 0;
211 clear();
212 }
213
214 inline
215 void
clear()216 ReceiveBuffer::clear(){
217 readPtr = startOfBuffer;
218 insertPtr = (char *)startOfBuffer;
219 sizeOfData = 0;
220 }
221
222 inline
223 void
incompleteMessage()224 ReceiveBuffer::incompleteMessage() {
225 if(startOfBuffer != readPtr){
226 if(sizeOfData != 0)
227 memmove(startOfBuffer, readPtr, sizeOfData);
228 readPtr = startOfBuffer;
229 insertPtr = ((char *)startOfBuffer) + sizeOfData;
230 }
231 }
232
233
234 #endif // Define of TCP_Transporter_H
235