1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <TransporterRegistry.hpp>
27 #include <TransporterCallback.hpp>
28 #include "Transporter.hpp"
29 #include "TransporterInternalDefinitions.hpp"
30 #include <NdbSleep.h>
31 #include <SocketAuthenticator.hpp>
32 #include <InputStream.hpp>
33 #include <OutputStream.hpp>
34 
35 #include <EventLogger.hpp>
36 extern EventLogger * g_eventLogger;
37 
Transporter(TransporterRegistry & t_reg,TransporterType _type,const char * lHostName,const char * rHostName,int s_port,bool _isMgmConnection,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,int _byteorder,bool _compression,bool _checksum,bool _signalId,Uint32 max_send_buffer)38 Transporter::Transporter(TransporterRegistry &t_reg,
39 			 TransporterType _type,
40 			 const char *lHostName,
41 			 const char *rHostName,
42 			 int s_port,
43 			 bool _isMgmConnection,
44 			 NodeId lNodeId,
45 			 NodeId rNodeId,
46 			 NodeId serverNodeId,
47 			 int _byteorder,
48 			 bool _compression, bool _checksum, bool _signalId,
49                          Uint32 max_send_buffer)
50   : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
51     isServer(lNodeId==serverNodeId),
52     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
53     m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
54     m_bytes_sent(0), m_bytes_received(0),
55     m_connect_count(0),
56     m_overload_count(0), m_slowdown_count(0),
57     isMgmConnection(_isMgmConnection),
58     m_connected(false),
59     m_type(_type),
60     m_transporter_registry(t_reg)
61 {
62   DBUG_ENTER("Transporter::Transporter");
63   if (rHostName && strlen(rHostName) > 0){
64     strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
65   }
66   else
67   {
68     if (!isServer) {
69       ndbout << "Unable to setup transporter. Node " << rNodeId
70 	     << " must have hostname. Update configuration." << endl;
71       exit(-1);
72     }
73     remoteHostName[0]= 0;
74   }
75   strncpy(localHostName, lHostName, sizeof(localHostName));
76 
77   DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
78 		     remoteNodeId, localNodeId, isServer,
79 		     remoteHostName, localHostName,
80 		     s_port));
81 
82   byteOrder       = _byteorder;
83   compressionUsed = _compression;
84   checksumUsed    = _checksum;
85   signalIdUsed    = _signalId;
86 
87   m_timeOutMillis = 3000;
88 
89   m_connect_address.s_addr= 0;
90 
91   if (isServer)
92     m_socket_client= 0;
93   else
94   {
95     m_socket_client= new SocketClient(new SocketAuthSimple("ndbd",
96                                                            "ndbd passwd"));
97 
98     m_socket_client->set_connect_timeout(m_timeOutMillis);
99   }
100 
101   m_os_max_iovec = 16;
102 #if defined (_SC_IOV_MAX) && defined (HAVE_SYSCONF)
103   long res = sysconf(_SC_IOV_MAX);
104   if (res != (long)-1)
105   {
106     m_os_max_iovec = (Uint32)res;
107   }
108 #endif
109 
110   DBUG_VOID_RETURN;
111 }
112 
~Transporter()113 Transporter::~Transporter(){
114   delete m_socket_client;
115 }
116 
117 
118 bool
configure(const TransporterConfiguration * conf)119 Transporter::configure(const TransporterConfiguration* conf)
120 {
121   if (configure_derived(conf) &&
122       conf->s_port == m_s_port &&
123       strcmp(conf->remoteHostName, remoteHostName) == 0 &&
124       strcmp(conf->localHostName, localHostName) == 0 &&
125       conf->remoteNodeId == remoteNodeId &&
126       conf->localNodeId == localNodeId &&
127       (conf->serverNodeId == conf->localNodeId) == isServer &&
128       conf->checksum == checksumUsed &&
129       conf->signalId == signalIdUsed &&
130       conf->isMgmConnection == isMgmConnection &&
131       conf->type == m_type)
132     return true; // No change
133   return false; // Can't reconfigure
134 }
135 
136 
137 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg)138 Transporter::connect_server(NDB_SOCKET_TYPE sockfd,
139                             BaseString& msg) {
140   // all initial negotiation is done in TransporterRegistry::connect_server
141   DBUG_ENTER("Transporter::connect_server");
142 
143   if (m_connected)
144   {
145     msg.assfmt("line: %u : already connected ??", __LINE__);
146     DBUG_RETURN(false);
147   }
148 
149   // Cache the connect address
150   my_socket_connect_address(sockfd, &m_connect_address);
151 
152   if (!connect_server_impl(sockfd))
153   {
154     msg.assfmt("line: %u : connect_server_impl failed", __LINE__);
155     DBUG_RETURN(false);
156   }
157 
158   m_connect_count++;
159   resetCounters();
160 
161   m_connected  = true;
162 
163   DBUG_RETURN(true);
164 }
165 
166 
167 bool
connect_client()168 Transporter::connect_client() {
169   NDB_SOCKET_TYPE sockfd;
170   DBUG_ENTER("Transporter::connect_client");
171 
172   if(m_connected)
173     DBUG_RETURN(true);
174 
175   int port = m_s_port;
176   if (port<0)
177   {
178     // The port number is stored as negative to indicate it's a port number
179     // which the server side setup dynamically and thus was communicated
180     // to the client via the ndb_mgmd.
181     // Reverse the negative port number to get the connectable port
182     port= -port;
183   }
184 
185   if(isMgmConnection)
186   {
187     sockfd= m_transporter_registry.connect_ndb_mgmd(remoteHostName,
188                                                     port);
189   }
190   else
191   {
192     if (!m_socket_client->init())
193       DBUG_RETURN(false);
194 
195     if (pre_connect_options(m_socket_client->m_sockfd) != 0)
196       DBUG_RETURN(false);
197 
198     if (strlen(localHostName) > 0)
199     {
200       if (m_socket_client->bind(localHostName, 0) != 0)
201         DBUG_RETURN(false);
202     }
203 
204     sockfd= m_socket_client->connect(remoteHostName,
205                                      port);
206   }
207 
208   DBUG_RETURN(connect_client(sockfd));
209 }
210 
211 
212 bool
connect_client(NDB_SOCKET_TYPE sockfd)213 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
214 
215   DBUG_ENTER("Transporter::connect_client(sockfd)");
216 
217   if(m_connected)
218   {
219     DBUG_PRINT("error", ("Already connected"));
220     DBUG_RETURN(true);
221   }
222 
223   if (!my_socket_valid(sockfd))
224   {
225     DBUG_PRINT("error", ("Socket " MY_SOCKET_FORMAT " is not valid",
226                          MY_SOCKET_FORMAT_VALUE(sockfd)));
227     DBUG_RETURN(false);
228   }
229 
230   DBUG_PRINT("info",("server port: %d, isMgmConnection: %d",
231                      m_s_port, isMgmConnection));
232 
233   // Send "hello"
234   DBUG_PRINT("info", ("Sending own nodeid: %d and transporter type: %d",
235                       localNodeId, m_type));
236   SocketOutputStream s_output(sockfd);
237   if (s_output.println("%d %d", localNodeId, m_type) < 0)
238   {
239     DBUG_PRINT("error", ("Send of 'hello' failed"));
240     NDB_CLOSE_SOCKET(sockfd);
241     DBUG_RETURN(false);
242   }
243 
244   // Read reply
245   DBUG_PRINT("info", ("Reading reply"));
246   char buf[256];
247   SocketInputStream s_input(sockfd);
248   if (s_input.gets(buf, 256) == 0)
249   {
250     DBUG_PRINT("error", ("Failed to read reply"));
251     NDB_CLOSE_SOCKET(sockfd);
252     DBUG_RETURN(false);
253   }
254 
255   // Parse reply
256   int nodeId, remote_transporter_type= -1;
257   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
258   switch (r) {
259   case 2:
260     break;
261   case 1:
262     // we're running version prior to 4.1.9
263     // ok, but with no checks on transporter configuration compatability
264     break;
265   default:
266     DBUG_PRINT("error", ("Failed to parse reply"));
267     NDB_CLOSE_SOCKET(sockfd);
268     DBUG_RETURN(false);
269   }
270 
271   DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
272 		      nodeId, remote_transporter_type));
273 
274   // Check nodeid
275   if (nodeId != remoteNodeId)
276   {
277     g_eventLogger->error("Connected to wrong nodeid: %d, expected: %d",
278                          nodeId, remoteNodeId);
279     NDB_CLOSE_SOCKET(sockfd);
280     DBUG_RETURN(false);
281   }
282 
283   // Check transporter type
284   if (remote_transporter_type != -1 &&
285       remote_transporter_type != m_type)
286   {
287     g_eventLogger->error("Connection to node: %d uses different transporter "
288                          "type: %d, expected type: %d",
289                          nodeId, remote_transporter_type, m_type);
290     NDB_CLOSE_SOCKET(sockfd);
291     DBUG_RETURN(false);
292   }
293 
294   // Cache the connect address
295   my_socket_connect_address(sockfd, &m_connect_address);
296 
297   if (!connect_client_impl(sockfd))
298     DBUG_RETURN(false);
299 
300   m_connect_count++;
301   resetCounters();
302 
303   m_connected = true;
304 
305   DBUG_RETURN(true);
306 }
307 
308 void
doDisconnect()309 Transporter::doDisconnect() {
310 
311   if(!m_connected)
312     return;
313 
314   m_connected = false;
315 
316   disconnectImpl();
317 }
318 
319 void
resetCounters()320 Transporter::resetCounters()
321 {
322   m_bytes_sent = 0;
323   m_bytes_received = 0;
324   m_overload_count = 0;
325   m_slowdown_count = 0;
326 };
327