1 /* Copyright (c) 2003-2008 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17 #include <ndb_global.h>
18
19 #include <NdbTCP.h>
20 #include "TCP_Transporter.hpp"
21 #include <NdbOut.hpp>
22 #include <NdbSleep.h>
23
24 #include <EventLogger.hpp>
25 extern EventLogger g_eventLogger;
26 // End of stuff to be moved
27
28 #ifdef NDB_WIN32
29 class ndbstrerror
30 {
31 public:
32 ndbstrerror(int iError);
33 ~ndbstrerror(void);
operator char*(void)34 operator char*(void) { return m_szError; };
35
36 private:
37 int m_iError;
38 char* m_szError;
39 };
40
ndbstrerror(int iError)41 ndbstrerror::ndbstrerror(int iError)
42 : m_iError(iError)
43 {
44 FormatMessage(
45 FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
46 0,
47 iError,
48 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
49 (LPTSTR)&m_szError,
50 0,
51 0);
52 }
53
~ndbstrerror(void)54 ndbstrerror::~ndbstrerror(void)
55 {
56 LocalFree( m_szError );
57 m_szError = 0;
58 }
59 #else
60 #define ndbstrerror strerror
61 #endif
62
TCP_Transporter(TransporterRegistry & t_reg,int sendBufSize,int maxRecvSize,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection_arg,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,bool chksm,bool signalId,Uint32 _reportFreq)63 TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
64 int sendBufSize, int maxRecvSize,
65 const char *lHostName,
66 const char *rHostName,
67 int r_port,
68 bool isMgmConnection_arg,
69 NodeId lNodeId,
70 NodeId rNodeId,
71 NodeId serverNodeId,
72 bool chksm, bool signalId,
73 Uint32 _reportFreq) :
74 Transporter(t_reg, tt_TCP_TRANSPORTER,
75 lHostName, rHostName, r_port, isMgmConnection_arg,
76 lNodeId, rNodeId, serverNodeId,
77 0, false, chksm, signalId),
78 m_sendBuffer(sendBufSize)
79 {
80 maxReceiveSize = maxRecvSize;
81
82 // Initialize member variables
83 theSocket = NDB_INVALID_SOCKET;
84
85 sendCount = receiveCount = 0;
86 sendSize = receiveSize = 0;
87 reportFreq = _reportFreq;
88
89 sockOptRcvBufSize = 70080;
90 sockOptSndBufSize = 71540;
91 sockOptNodelay = 1;
92 sockOptTcpMaxSeg = 4096;
93 }
94
~TCP_Transporter()95 TCP_Transporter::~TCP_Transporter() {
96
97 // Disconnect
98 if (theSocket != NDB_INVALID_SOCKET)
99 doDisconnect();
100
101 // Delete send buffers
102
103 // Delete receive buffer!!
104 receiveBuffer.destroy();
105 }
106
connect_server_impl(NDB_SOCKET_TYPE sockfd)107 bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
108 {
109 DBUG_ENTER("TCP_Transpporter::connect_server_impl");
110 DBUG_RETURN(connect_common(sockfd));
111 }
112
connect_client_impl(NDB_SOCKET_TYPE sockfd)113 bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
114 {
115 DBUG_ENTER("TCP_Transpporter::connect_client_impl");
116 DBUG_RETURN(connect_common(sockfd));
117 }
118
connect_common(NDB_SOCKET_TYPE sockfd)119 bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
120 {
121 theSocket = sockfd;
122 setSocketOptions();
123 setSocketNonBlocking(theSocket);
124 DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
125 remoteNodeId));
126 return true;
127 }
128
129 bool
initTransporter()130 TCP_Transporter::initTransporter() {
131
132 // Allocate buffer for receiving
133 // Let it be the maximum size we receive plus 8 kB for any earlier received
134 // incomplete messages (slack)
135 Uint32 recBufSize = maxReceiveSize;
136 if(recBufSize < MAX_MESSAGE_SIZE){
137 recBufSize = MAX_MESSAGE_SIZE;
138 }
139
140 if(!receiveBuffer.init(recBufSize+MAX_MESSAGE_SIZE)){
141 return false;
142 }
143
144 // Allocate buffers for sending
145 if (!m_sendBuffer.initBuffer(remoteNodeId)) {
146 // XXX What shall be done here?
147 // The same is valid for the other init-methods
148 return false;
149 }
150
151 return true;
152 }
153
154 void
setSocketOptions()155 TCP_Transporter::setSocketOptions(){
156 int sockOptKeepAlive = 1;
157
158 if (setsockopt(theSocket, SOL_SOCKET, SO_RCVBUF,
159 (char*)&sockOptRcvBufSize, sizeof(sockOptRcvBufSize)) < 0) {
160 #ifdef DEBUG_TRANSPORTER
161 g_eventLogger.error("The setsockopt SO_RCVBUF error code = %d", InetErrno);
162 #endif
163 }//if
164
165 if (setsockopt(theSocket, SOL_SOCKET, SO_SNDBUF,
166 (char*)&sockOptSndBufSize, sizeof(sockOptSndBufSize)) < 0) {
167 #ifdef DEBUG_TRANSPORTER
168 g_eventLogger.error("The setsockopt SO_SNDBUF error code = %d", InetErrno);
169 #endif
170 }//if
171
172 if (setsockopt(theSocket, SOL_SOCKET, SO_KEEPALIVE,
173 (char*)&sockOptKeepAlive, sizeof(sockOptKeepAlive)) < 0) {
174 ndbout_c("The setsockopt SO_KEEPALIVE error code = %d", InetErrno);
175 }//if
176
177 //-----------------------------------------------
178 // Set the TCP_NODELAY option so also small packets are sent
179 // as soon as possible
180 //-----------------------------------------------
181 if (setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
182 (char*)&sockOptNodelay, sizeof(sockOptNodelay)) < 0) {
183 #ifdef DEBUG_TRANSPORTER
184 g_eventLogger.error("The setsockopt TCP_NODELAY error code = %d", InetErrno);
185 #endif
186 }//if
187 }
188
189
190 #ifdef NDB_WIN32
191
192 bool
setSocketNonBlocking(NDB_SOCKET_TYPE socket)193 TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
194 unsigned long ul = 1;
195 if(ioctlsocket(socket, FIONBIO, &ul))
196 {
197 #ifdef DEBUG_TRANSPORTER
198 g_eventLogger.error("Set non-blocking server error3: %d", InetErrno);
199 #endif
200 }//if
201 return true;
202 }
203
204 #else
205
206 bool
setSocketNonBlocking(NDB_SOCKET_TYPE socket)207 TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
208 int flags;
209 flags = fcntl(socket, F_GETFL, 0);
210 if (flags < 0) {
211 #ifdef DEBUG_TRANSPORTER
212 g_eventLogger.error("Set non-blocking server error1: %s", strerror(InetErrno));
213 #endif
214 }//if
215 flags |= NDB_NONBLOCK;
216 if (fcntl(socket, F_SETFL, flags) == -1) {
217 #ifdef DEBUG_TRANSPORTER
218 g_eventLogger.error("Set non-blocking server error2: %s", strerror(InetErrno));
219 #endif
220 }//if
221 return true;
222 }
223
224 #endif
225
226 bool
sendIsPossible(struct timeval * timeout)227 TCP_Transporter::sendIsPossible(struct timeval * timeout) {
228 if(theSocket != NDB_INVALID_SOCKET){
229 fd_set writeset;
230 FD_ZERO(&writeset);
231 FD_SET(theSocket, &writeset);
232
233 int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
234
235 if ((selectReply > 0) && FD_ISSET(theSocket, &writeset))
236 return true;
237 else
238 return false;
239 }
240 return false;
241 }
242
243 Uint32
get_free_buffer() const244 TCP_Transporter::get_free_buffer() const
245 {
246 return m_sendBuffer.bufferSizeRemaining();
247 }
248
249 Uint32 *
getWritePtr(Uint32 lenBytes,Uint32 prio)250 TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
251
252 Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
253
254 struct timeval timeout = {0, 10000};
255
256 if (insertPtr == 0) {
257 //-------------------------------------------------
258 // Buffer was completely full. We have severe problems.
259 // We will attempt to wait for a small time
260 //-------------------------------------------------
261 if(sendIsPossible(&timeout)) {
262 //-------------------------------------------------
263 // Send is possible after the small timeout.
264 //-------------------------------------------------
265 if(!doSend()){
266 return 0;
267 } else {
268 //-------------------------------------------------
269 // Since send was successful we will make a renewed
270 // attempt at inserting the signal into the buffer.
271 //-------------------------------------------------
272 insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
273 }//if
274 } else {
275 return 0;
276 }//if
277 }
278 return insertPtr;
279 }
280
281 void
updateWritePtr(Uint32 lenBytes,Uint32 prio)282 TCP_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
283 m_sendBuffer.updateInsertPtr(lenBytes);
284
285 const int bufsize = m_sendBuffer.bufferSize();
286 if(bufsize > TCP_SEND_LIMIT) {
287 //-------------------------------------------------
288 // Buffer is full and we are ready to send. We will
289 // not wait since the signal is already in the buffer.
290 // Force flag set has the same indication that we
291 // should always send. If it is not possible to send
292 // we will not worry since we will soon be back for
293 // a renewed trial.
294 //-------------------------------------------------
295 struct timeval no_timeout = {0,0};
296 if(sendIsPossible(&no_timeout)) {
297 //-------------------------------------------------
298 // Send was possible, attempt at a send.
299 //-------------------------------------------------
300 doSend();
301 }//if
302 }
303 }
304
305 #define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
306 (!((sz == -1) && (e == EAGAIN) || (e == EWOULDBLOCK) || (e == EINTR))))
307
308
309 bool
doSend()310 TCP_Transporter::doSend() {
311 // If no sendbuffers are used nothing is done
312 // Sends the contents of the SendBuffers until they are empty
313 // or until select does not select the socket for write.
314 // Before calling send, the socket must be selected for write
315 // using "select"
316 // It writes on the external TCP/IP interface until the send buffer is empty
317 // and as long as write is possible (test it using select)
318
319 // Empty the SendBuffers
320
321 bool sent_any = true;
322 while (m_sendBuffer.dataSize > 0)
323 {
324 const char * const sendPtr = m_sendBuffer.sendPtr;
325 const Uint32 sizeToSend = m_sendBuffer.sendDataSize;
326 const int nBytesSent = send(theSocket, sendPtr, sizeToSend, 0);
327
328 if (nBytesSent > 0)
329 {
330 sent_any = true;
331 m_sendBuffer.bytesSent(nBytesSent);
332
333 sendCount ++;
334 sendSize += nBytesSent;
335 if(sendCount == reportFreq)
336 {
337 reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
338 sendCount = 0;
339 sendSize = 0;
340 }
341 }
342 else
343 {
344 if (nBytesSent < 0 && InetErrno == EAGAIN && sent_any)
345 break;
346
347 // Send failed
348 #if defined DEBUG_TRANSPORTER
349 g_eventLogger.error("Send Failure(disconnect==%d) to node = %d nBytesSent = %d "
350 "errno = %d strerror = %s",
351 DISCONNECT_ERRNO(InetErrno, nBytesSent),
352 remoteNodeId, nBytesSent, InetErrno,
353 (char*)ndbstrerror(InetErrno));
354 #endif
355 if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
356 doDisconnect();
357 report_disconnect(InetErrno);
358 }
359
360 return false;
361 }
362 }
363 return true;
364 }
365
366 int
doReceive()367 TCP_Transporter::doReceive() {
368 // Select-function must return the socket for read
369 // before this method is called
370 // It reads the external TCP/IP interface once
371 Uint32 size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
372 if(size > 0){
373 const int nBytesRead = recv(theSocket,
374 receiveBuffer.insertPtr,
375 size < maxReceiveSize ? size : maxReceiveSize,
376 0);
377
378 if (nBytesRead > 0) {
379 receiveBuffer.sizeOfData += nBytesRead;
380 receiveBuffer.insertPtr += nBytesRead;
381
382 if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
383 #ifdef DEBUG_TRANSPORTER
384 g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
385 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
386 g_eventLogger.error("nBytesRead = %d", nBytesRead);
387 #endif
388 g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
389 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
390 report_error(TE_INVALID_MESSAGE_LENGTH);
391 return 0;
392 }
393
394 receiveCount ++;
395 receiveSize += nBytesRead;
396
397 if(receiveCount == reportFreq){
398 reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
399 receiveCount = 0;
400 receiveSize = 0;
401 }
402 return nBytesRead;
403 } else {
404 #if defined DEBUG_TRANSPORTER
405 g_eventLogger.error("Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
406 "errno = %d strerror = %s",
407 DISCONNECT_ERRNO(InetErrno, nBytesRead),
408 remoteNodeId, nBytesRead, InetErrno,
409 (char*)ndbstrerror(InetErrno));
410 #endif
411 if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
412 // The remote node has closed down
413 doDisconnect();
414 report_disconnect(InetErrno);
415 }
416 }
417 return nBytesRead;
418 } else {
419 return 0;
420 }
421 }
422
423 void
disconnectImpl()424 TCP_Transporter::disconnectImpl() {
425 if(theSocket != NDB_INVALID_SOCKET){
426 if(NDB_CLOSE_SOCKET(theSocket) < 0){
427 report_error(TE_ERROR_CLOSING_SOCKET);
428 }
429 }
430
431 // Empty send och receive buffers
432 receiveBuffer.clear();
433 m_sendBuffer.emptyBuffer();
434
435 theSocket = NDB_INVALID_SOCKET;
436 }
437