1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <TransporterRegistry.hpp>
27 #include <TransporterCallback.hpp>
28 #include "Transporter.hpp"
29 #include "TransporterInternalDefinitions.hpp"
30 #include <NdbSleep.h>
31 #include <SocketAuthenticator.hpp>
32 #include <InputStream.hpp>
33 #include <OutputStream.hpp>
34 
35 #include <EventLogger.hpp>
36 extern EventLogger * g_eventLogger;
37 
Transporter(TransporterRegistry & t_reg,TransporterType _type,const char * lHostName,const char * rHostName,int s_port,bool _isMgmConnection,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,int _byteorder,bool _compression,bool _checksum,bool _signalId,Uint32 max_send_buffer)38 Transporter::Transporter(TransporterRegistry &t_reg,
39 			 TransporterType _type,
40 			 const char *lHostName,
41 			 const char *rHostName,
42 			 int s_port,
43 			 bool _isMgmConnection,
44 			 NodeId lNodeId,
45 			 NodeId rNodeId,
46 			 NodeId serverNodeId,
47 			 int _byteorder,
48 			 bool _compression, bool _checksum, bool _signalId,
49                          Uint32 max_send_buffer)
50   : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
51     isServer(lNodeId==serverNodeId),
52     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
53     m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
54     m_connected(false),
55     m_type(_type),
56     m_transporter_registry(t_reg)
57 {
58   DBUG_ENTER("Transporter::Transporter");
59   if (rHostName && strlen(rHostName) > 0){
60     strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
61     Ndb_getInAddr(&remoteHostAddress, rHostName);
62   }
63   else
64   {
65     if (!isServer) {
66       ndbout << "Unable to setup transporter. Node " << rNodeId
67 	     << " must have hostname. Update configuration." << endl;
68       exit(-1);
69     }
70     remoteHostName[0]= 0;
71   }
72   strncpy(localHostName, lHostName, sizeof(localHostName));
73 
74   DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
75 		     remoteNodeId, localNodeId, isServer,
76 		     remoteHostName, localHostName,
77 		     s_port));
78 
79   byteOrder       = _byteorder;
80   compressionUsed = _compression;
81   checksumUsed    = _checksum;
82   signalIdUsed    = _signalId;
83 
84   m_timeOutMillis = 30000;
85 
86   m_connect_address.s_addr= 0;
87   if(s_port<0)
88     s_port= -s_port; // was dynamic
89 
90   if (isServer)
91     m_socket_client= 0;
92   else
93   {
94     m_socket_client= new SocketClient(remoteHostName, s_port,
95 				      new SocketAuthSimple("ndbd",
96 							   "ndbd passwd"));
97 
98     m_socket_client->set_connect_timeout(m_timeOutMillis);
99   }
100 
101   m_os_max_iovec = 16;
102 #if defined (_SC_IOV_MAX) && defined (HAVE_SYSCONF)
103   long res = sysconf(_SC_IOV_MAX);
104   if (res != (long)-1)
105   {
106     m_os_max_iovec = (Uint32)res;
107   }
108 #endif
109 
110   DBUG_VOID_RETURN;
111 }
112 
~Transporter()113 Transporter::~Transporter(){
114   delete m_socket_client;
115 }
116 
117 
118 bool
configure(const TransporterConfiguration * conf)119 Transporter::configure(const TransporterConfiguration* conf)
120 {
121   if (configure_derived(conf) &&
122       conf->s_port == m_s_port &&
123       strcmp(conf->remoteHostName, remoteHostName) == 0 &&
124       strcmp(conf->localHostName, localHostName) == 0 &&
125       conf->remoteNodeId == remoteNodeId &&
126       conf->localNodeId == localNodeId &&
127       (conf->serverNodeId == conf->localNodeId) == isServer &&
128       conf->checksum == checksumUsed &&
129       conf->signalId == signalIdUsed &&
130       conf->isMgmConnection == isMgmConnection &&
131       conf->type == m_type)
132     return true; // No change
133   return false; // Can't reconfigure
134 }
135 
136 
137 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg)138 Transporter::connect_server(NDB_SOCKET_TYPE sockfd,
139                             BaseString& msg) {
140   // all initial negotiation is done in TransporterRegistry::connect_server
141   DBUG_ENTER("Transporter::connect_server");
142 
143   if (m_connected)
144   {
145     msg.assfmt("line: %u : already connected ??", __LINE__);
146     DBUG_RETURN(false);
147   }
148 
149   // Cache the connect address
150   my_socket_connect_address(sockfd, &m_connect_address);
151 
152   if (!connect_server_impl(sockfd))
153   {
154     msg.assfmt("line: %u : connect_server_impl failed", __LINE__);
155     DBUG_RETURN(false);
156   }
157 
158   m_connected  = true;
159 
160   DBUG_RETURN(true);
161 }
162 
163 
164 bool
connect_client()165 Transporter::connect_client() {
166   NDB_SOCKET_TYPE sockfd;
167   DBUG_ENTER("Transporter::connect_client");
168 
169   if(m_connected)
170     DBUG_RETURN(true);
171 
172   if(isMgmConnection)
173   {
174     sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
175   }
176   else
177   {
178     if (!m_socket_client->init())
179       DBUG_RETURN(false);
180 
181     if (pre_connect_options(m_socket_client->m_sockfd) != 0)
182       DBUG_RETURN(false);
183 
184     if (strlen(localHostName) > 0)
185     {
186       if (m_socket_client->bind(localHostName, 0) != 0)
187         DBUG_RETURN(false);
188     }
189     sockfd= m_socket_client->connect();
190   }
191 
192   DBUG_RETURN(connect_client(sockfd));
193 }
194 
195 
196 bool
connect_client(NDB_SOCKET_TYPE sockfd)197 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
198 
199   DBUG_ENTER("Transporter::connect_client(sockfd)");
200 
201   if(m_connected)
202   {
203     DBUG_PRINT("error", ("Already connected"));
204     DBUG_RETURN(true);
205   }
206 
207   if (!my_socket_valid(sockfd))
208   {
209     DBUG_PRINT("error", ("Socket " MY_SOCKET_FORMAT " is not valid",
210                          MY_SOCKET_FORMAT_VALUE(sockfd)));
211     DBUG_RETURN(false);
212   }
213 
214   DBUG_PRINT("info",("server port: %d, isMgmConnection: %d",
215                      m_s_port, isMgmConnection));
216 
217   // Send "hello"
218   DBUG_PRINT("info", ("Sending own nodeid: %d and transporter type: %d",
219                       localNodeId, m_type));
220   SocketOutputStream s_output(sockfd);
221   if (s_output.println("%d %d", localNodeId, m_type) < 0)
222   {
223     DBUG_PRINT("error", ("Send of 'hello' failed"));
224     NDB_CLOSE_SOCKET(sockfd);
225     DBUG_RETURN(false);
226   }
227 
228   // Read reply
229   DBUG_PRINT("info", ("Reading reply"));
230   char buf[256];
231   SocketInputStream s_input(sockfd);
232   if (s_input.gets(buf, 256) == 0)
233   {
234     DBUG_PRINT("error", ("Failed to read reply"));
235     NDB_CLOSE_SOCKET(sockfd);
236     DBUG_RETURN(false);
237   }
238 
239   // Parse reply
240   int nodeId, remote_transporter_type= -1;
241   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
242   switch (r) {
243   case 2:
244     break;
245   case 1:
246     // we're running version prior to 4.1.9
247     // ok, but with no checks on transporter configuration compatability
248     break;
249   default:
250     DBUG_PRINT("error", ("Failed to parse reply"));
251     NDB_CLOSE_SOCKET(sockfd);
252     DBUG_RETURN(false);
253   }
254 
255   DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
256 		      nodeId, remote_transporter_type));
257 
258   // Check nodeid
259   if (nodeId != remoteNodeId)
260   {
261     g_eventLogger->error("Connected to wrong nodeid: %d, expected: %d",
262                          nodeId, remoteNodeId);
263     NDB_CLOSE_SOCKET(sockfd);
264     DBUG_RETURN(false);
265   }
266 
267   // Check transporter type
268   if (remote_transporter_type != -1 &&
269       remote_transporter_type != m_type)
270   {
271     g_eventLogger->error("Connection to node: %d uses different transporter "
272                          "type: %d, expected type: %d",
273                          nodeId, remote_transporter_type, m_type);
274     NDB_CLOSE_SOCKET(sockfd);
275     DBUG_RETURN(false);
276   }
277 
278   // Cache the connect address
279   my_socket_connect_address(sockfd, &m_connect_address);
280 
281   if (!connect_client_impl(sockfd))
282     DBUG_RETURN(false);
283 
284   m_connected = true;
285 
286   DBUG_RETURN(true);
287 }
288 
289 void
doDisconnect()290 Transporter::doDisconnect() {
291 
292   if(!m_connected)
293     return;
294 
295   m_connected = false;
296 
297   disconnectImpl();
298 }
299 
300