1 /* Copyright (c) 2003-2008 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <ndb_global.h>
18 
19 #include <NdbTCP.h>
20 #include "TCP_Transporter.hpp"
21 #include <NdbOut.hpp>
22 #include <NdbSleep.h>
23 
24 #include <EventLogger.hpp>
25 extern EventLogger g_eventLogger;
26 // End of stuff to be moved
27 
28 #ifdef NDB_WIN32
29 class ndbstrerror
30 {
31 public:
32   ndbstrerror(int iError);
33   ~ndbstrerror(void);
operator char*(void)34   operator char*(void) { return m_szError; };
35 
36 private:
37   int m_iError;
38   char* m_szError;
39 };
40 
ndbstrerror(int iError)41 ndbstrerror::ndbstrerror(int iError)
42 : m_iError(iError)
43 {
44   FormatMessage(
45     FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
46     0,
47     iError,
48     MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
49     (LPTSTR)&m_szError,
50     0,
51     0);
52 }
53 
~ndbstrerror(void)54 ndbstrerror::~ndbstrerror(void)
55 {
56   LocalFree( m_szError );
57   m_szError = 0;
58 }
59 #else
60 #define ndbstrerror strerror
61 #endif
62 
TCP_Transporter(TransporterRegistry & t_reg,int sendBufSize,int maxRecvSize,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection_arg,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,bool chksm,bool signalId,Uint32 _reportFreq)63 TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
64 				 int sendBufSize, int maxRecvSize,
65                                  const char *lHostName,
66                                  const char *rHostName,
67                                  int r_port,
68 				 bool isMgmConnection_arg,
69 				 NodeId lNodeId,
70                                  NodeId rNodeId,
71 				 NodeId serverNodeId,
72                                  bool chksm, bool signalId,
73                                  Uint32 _reportFreq) :
74   Transporter(t_reg, tt_TCP_TRANSPORTER,
75 	      lHostName, rHostName, r_port, isMgmConnection_arg,
76 	      lNodeId, rNodeId, serverNodeId,
77 	      0, false, chksm, signalId),
78   m_sendBuffer(sendBufSize)
79 {
80   maxReceiveSize = maxRecvSize;
81 
82   // Initialize member variables
83   theSocket     = NDB_INVALID_SOCKET;
84 
85   sendCount      = receiveCount = 0;
86   sendSize       = receiveSize  = 0;
87   reportFreq     = _reportFreq;
88 
89   sockOptRcvBufSize = 70080;
90   sockOptSndBufSize = 71540;
91   sockOptNodelay    = 1;
92   sockOptTcpMaxSeg  = 4096;
93 }
94 
~TCP_Transporter()95 TCP_Transporter::~TCP_Transporter() {
96 
97   // Disconnect
98   if (theSocket != NDB_INVALID_SOCKET)
99     doDisconnect();
100 
101   // Delete send buffers
102 
103   // Delete receive buffer!!
104   receiveBuffer.destroy();
105 }
106 
connect_server_impl(NDB_SOCKET_TYPE sockfd)107 bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
108 {
109   DBUG_ENTER("TCP_Transpporter::connect_server_impl");
110   DBUG_RETURN(connect_common(sockfd));
111 }
112 
connect_client_impl(NDB_SOCKET_TYPE sockfd)113 bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
114 {
115   DBUG_ENTER("TCP_Transpporter::connect_client_impl");
116   DBUG_RETURN(connect_common(sockfd));
117 }
118 
connect_common(NDB_SOCKET_TYPE sockfd)119 bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
120 {
121   theSocket = sockfd;
122   setSocketOptions();
123   setSocketNonBlocking(theSocket);
124   DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
125               remoteNodeId));
126   return true;
127 }
128 
129 bool
initTransporter()130 TCP_Transporter::initTransporter() {
131 
132   // Allocate buffer for receiving
133   // Let it be the maximum size we receive plus 8 kB for any earlier received
134   // incomplete messages (slack)
135   Uint32 recBufSize = maxReceiveSize;
136   if(recBufSize < MAX_MESSAGE_SIZE){
137     recBufSize = MAX_MESSAGE_SIZE;
138   }
139 
140   if(!receiveBuffer.init(recBufSize+MAX_MESSAGE_SIZE)){
141     return false;
142   }
143 
144   // Allocate buffers for sending
145   if (!m_sendBuffer.initBuffer(remoteNodeId)) {
146     // XXX What shall be done here?
147     // The same is valid for the other init-methods
148     return false;
149   }
150 
151   return true;
152 }
153 
154 void
setSocketOptions()155 TCP_Transporter::setSocketOptions(){
156   int sockOptKeepAlive  = 1;
157 
158   if (setsockopt(theSocket, SOL_SOCKET, SO_RCVBUF,
159                  (char*)&sockOptRcvBufSize, sizeof(sockOptRcvBufSize)) < 0) {
160 #ifdef DEBUG_TRANSPORTER
161     g_eventLogger.error("The setsockopt SO_RCVBUF error code = %d", InetErrno);
162 #endif
163   }//if
164 
165   if (setsockopt(theSocket, SOL_SOCKET, SO_SNDBUF,
166                  (char*)&sockOptSndBufSize, sizeof(sockOptSndBufSize)) < 0) {
167 #ifdef DEBUG_TRANSPORTER
168     g_eventLogger.error("The setsockopt SO_SNDBUF error code = %d", InetErrno);
169 #endif
170   }//if
171 
172   if (setsockopt(theSocket, SOL_SOCKET, SO_KEEPALIVE,
173                  (char*)&sockOptKeepAlive, sizeof(sockOptKeepAlive)) < 0) {
174     ndbout_c("The setsockopt SO_KEEPALIVE error code = %d", InetErrno);
175   }//if
176 
177   //-----------------------------------------------
178   // Set the TCP_NODELAY option so also small packets are sent
179   // as soon as possible
180   //-----------------------------------------------
181   if (setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
182                  (char*)&sockOptNodelay, sizeof(sockOptNodelay)) < 0) {
183 #ifdef DEBUG_TRANSPORTER
184     g_eventLogger.error("The setsockopt TCP_NODELAY error code = %d", InetErrno);
185 #endif
186   }//if
187 }
188 
189 
190 #ifdef NDB_WIN32
191 
192 bool
setSocketNonBlocking(NDB_SOCKET_TYPE socket)193 TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
194   unsigned long  ul = 1;
195   if(ioctlsocket(socket, FIONBIO, &ul))
196   {
197 #ifdef DEBUG_TRANSPORTER
198     g_eventLogger.error("Set non-blocking server error3: %d", InetErrno);
199 #endif
200   }//if
201   return true;
202 }
203 
204 #else
205 
206 bool
setSocketNonBlocking(NDB_SOCKET_TYPE socket)207 TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
208   int flags;
209   flags = fcntl(socket, F_GETFL, 0);
210   if (flags < 0) {
211 #ifdef DEBUG_TRANSPORTER
212     g_eventLogger.error("Set non-blocking server error1: %s", strerror(InetErrno));
213 #endif
214   }//if
215   flags |= NDB_NONBLOCK;
216   if (fcntl(socket, F_SETFL, flags) == -1) {
217 #ifdef DEBUG_TRANSPORTER
218     g_eventLogger.error("Set non-blocking server error2: %s", strerror(InetErrno));
219 #endif
220   }//if
221   return true;
222 }
223 
224 #endif
225 
226 bool
sendIsPossible(struct timeval * timeout)227 TCP_Transporter::sendIsPossible(struct timeval * timeout) {
228   if(theSocket != NDB_INVALID_SOCKET){
229     fd_set   writeset;
230     FD_ZERO(&writeset);
231     FD_SET(theSocket, &writeset);
232 
233     int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
234 
235     if ((selectReply > 0) && FD_ISSET(theSocket, &writeset))
236       return true;
237     else
238       return false;
239   }
240   return false;
241 }
242 
243 Uint32
get_free_buffer() const244 TCP_Transporter::get_free_buffer() const
245 {
246   return m_sendBuffer.bufferSizeRemaining();
247 }
248 
249 Uint32 *
getWritePtr(Uint32 lenBytes,Uint32 prio)250 TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
251 
252   Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
253 
254   struct timeval timeout = {0, 10000};
255 
256   if (insertPtr == 0) {
257     //-------------------------------------------------
258     // Buffer was completely full. We have severe problems.
259     // We will attempt to wait for a small time
260     //-------------------------------------------------
261     if(sendIsPossible(&timeout)) {
262       //-------------------------------------------------
263       // Send is possible after the small timeout.
264       //-------------------------------------------------
265       if(!doSend()){
266 	return 0;
267       } else {
268 	//-------------------------------------------------
269 	// Since send was successful we will make a renewed
270 	// attempt at inserting the signal into the buffer.
271 	//-------------------------------------------------
272         insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
273       }//if
274     } else {
275       return 0;
276     }//if
277   }
278   return insertPtr;
279 }
280 
281 void
updateWritePtr(Uint32 lenBytes,Uint32 prio)282 TCP_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
283   m_sendBuffer.updateInsertPtr(lenBytes);
284 
285   const int bufsize = m_sendBuffer.bufferSize();
286   if(bufsize > TCP_SEND_LIMIT) {
287     //-------------------------------------------------
288     // Buffer is full and we are ready to send. We will
289     // not wait since the signal is already in the buffer.
290     // Force flag set has the same indication that we
291     // should always send. If it is not possible to send
292     // we will not worry since we will soon be back for
293     // a renewed trial.
294     //-------------------------------------------------
295     struct timeval no_timeout = {0,0};
296     if(sendIsPossible(&no_timeout)) {
297       //-------------------------------------------------
298       // Send was possible, attempt at a send.
299       //-------------------------------------------------
300       doSend();
301     }//if
302   }
303 }
304 
305 #define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
306                (!((sz == -1) && (e == EAGAIN) || (e == EWOULDBLOCK) || (e == EINTR))))
307 
308 
309 bool
doSend()310 TCP_Transporter::doSend() {
311   // If no sendbuffers are used nothing is done
312   // Sends the contents of the SendBuffers until they are empty
313   // or until select does not select the socket for write.
314   // Before calling send, the socket must be selected for write
315   // using "select"
316   // It writes on the external TCP/IP interface until the send buffer is empty
317   // and as long as write is possible (test it using select)
318 
319   // Empty the SendBuffers
320 
321   bool sent_any = true;
322   while (m_sendBuffer.dataSize > 0)
323   {
324     const char * const sendPtr = m_sendBuffer.sendPtr;
325     const Uint32 sizeToSend    = m_sendBuffer.sendDataSize;
326     const int nBytesSent = send(theSocket, sendPtr, sizeToSend, 0);
327 
328     if (nBytesSent > 0)
329     {
330       sent_any = true;
331       m_sendBuffer.bytesSent(nBytesSent);
332 
333       sendCount ++;
334       sendSize  += nBytesSent;
335       if(sendCount == reportFreq)
336       {
337 	reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
338 	sendCount = 0;
339 	sendSize  = 0;
340       }
341     }
342     else
343     {
344       if (nBytesSent < 0 && InetErrno == EAGAIN && sent_any)
345         break;
346 
347       // Send failed
348 #if defined DEBUG_TRANSPORTER
349       g_eventLogger.error("Send Failure(disconnect==%d) to node = %d nBytesSent = %d "
350 	       "errno = %d strerror = %s",
351 	       DISCONNECT_ERRNO(InetErrno, nBytesSent),
352 	       remoteNodeId, nBytesSent, InetErrno,
353 	       (char*)ndbstrerror(InetErrno));
354 #endif
355       if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
356 	doDisconnect();
357 	report_disconnect(InetErrno);
358       }
359 
360       return false;
361     }
362   }
363   return true;
364 }
365 
366 int
doReceive()367 TCP_Transporter::doReceive() {
368   // Select-function must return the socket for read
369   // before this method is called
370   // It reads the external TCP/IP interface once
371   Uint32 size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
372   if(size > 0){
373     const int nBytesRead = recv(theSocket,
374 				receiveBuffer.insertPtr,
375 				size < maxReceiveSize ? size : maxReceiveSize,
376 				0);
377 
378     if (nBytesRead > 0) {
379       receiveBuffer.sizeOfData += nBytesRead;
380       receiveBuffer.insertPtr  += nBytesRead;
381 
382       if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
383 #ifdef DEBUG_TRANSPORTER
384 	g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
385 		 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
386 	g_eventLogger.error("nBytesRead = %d", nBytesRead);
387 #endif
388 	g_eventLogger.error("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
389 		 receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
390 	report_error(TE_INVALID_MESSAGE_LENGTH);
391 	return 0;
392       }
393 
394       receiveCount ++;
395       receiveSize  += nBytesRead;
396 
397       if(receiveCount == reportFreq){
398 	reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
399 	receiveCount = 0;
400 	receiveSize  = 0;
401       }
402       return nBytesRead;
403     } else {
404 #if defined DEBUG_TRANSPORTER
405       g_eventLogger.error("Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
406 	       "errno = %d strerror = %s",
407 	       DISCONNECT_ERRNO(InetErrno, nBytesRead),
408 	       remoteNodeId, nBytesRead, InetErrno,
409 	       (char*)ndbstrerror(InetErrno));
410 #endif
411       if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
412 	// The remote node has closed down
413 	doDisconnect();
414 	report_disconnect(InetErrno);
415       }
416     }
417     return nBytesRead;
418   } else {
419     return 0;
420   }
421 }
422 
423 void
disconnectImpl()424 TCP_Transporter::disconnectImpl() {
425   if(theSocket != NDB_INVALID_SOCKET){
426     if(NDB_CLOSE_SOCKET(theSocket) < 0){
427       report_error(TE_ERROR_CLOSING_SOCKET);
428     }
429   }
430 
431   // Empty send och receive buffers
432   receiveBuffer.clear();
433   m_sendBuffer.emptyBuffer();
434 
435   theSocket = NDB_INVALID_SOCKET;
436 }
437