1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <TransporterRegistry.hpp>
27 #include <TransporterCallback.hpp>
28 #include "Transporter.hpp"
29 #include "TransporterInternalDefinitions.hpp"
30 #include <NdbSleep.h>
31 #include <SocketAuthenticator.hpp>
32 #include <InputStream.hpp>
33 #include <OutputStream.hpp>
34
35 #include <EventLogger.hpp>
36 extern EventLogger * g_eventLogger;
37
Transporter(TransporterRegistry & t_reg,TransporterType _type,const char * lHostName,const char * rHostName,int s_port,bool _isMgmConnection,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,int _byteorder,bool _compression,bool _checksum,bool _signalId,Uint32 max_send_buffer)38 Transporter::Transporter(TransporterRegistry &t_reg,
39 TransporterType _type,
40 const char *lHostName,
41 const char *rHostName,
42 int s_port,
43 bool _isMgmConnection,
44 NodeId lNodeId,
45 NodeId rNodeId,
46 NodeId serverNodeId,
47 int _byteorder,
48 bool _compression, bool _checksum, bool _signalId,
49 Uint32 max_send_buffer)
50 : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
51 isServer(lNodeId==serverNodeId),
52 m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
53 m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
54 m_bytes_sent(0), m_bytes_received(0),
55 m_connect_count(0),
56 m_overload_count(0), m_slowdown_count(0),
57 isMgmConnection(_isMgmConnection),
58 m_connected(false),
59 m_type(_type),
60 m_transporter_registry(t_reg)
61 {
62 DBUG_ENTER("Transporter::Transporter");
63 if (rHostName && strlen(rHostName) > 0){
64 strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
65 }
66 else
67 {
68 if (!isServer) {
69 ndbout << "Unable to setup transporter. Node " << rNodeId
70 << " must have hostname. Update configuration." << endl;
71 exit(-1);
72 }
73 remoteHostName[0]= 0;
74 }
75 strncpy(localHostName, lHostName, sizeof(localHostName));
76
77 DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
78 remoteNodeId, localNodeId, isServer,
79 remoteHostName, localHostName,
80 s_port));
81
82 byteOrder = _byteorder;
83 compressionUsed = _compression;
84 checksumUsed = _checksum;
85 signalIdUsed = _signalId;
86
87 m_timeOutMillis = 3000;
88
89 m_connect_address.s_addr= 0;
90
91 if (isServer)
92 m_socket_client= 0;
93 else
94 {
95 m_socket_client= new SocketClient(new SocketAuthSimple("ndbd",
96 "ndbd passwd"));
97
98 m_socket_client->set_connect_timeout(m_timeOutMillis);
99 }
100
101 m_os_max_iovec = 16;
102 #if defined (_SC_IOV_MAX) && defined (HAVE_SYSCONF)
103 long res = sysconf(_SC_IOV_MAX);
104 if (res != (long)-1)
105 {
106 m_os_max_iovec = (Uint32)res;
107 }
108 #endif
109
110 DBUG_VOID_RETURN;
111 }
112
~Transporter()113 Transporter::~Transporter(){
114 delete m_socket_client;
115 }
116
117
118 bool
configure(const TransporterConfiguration * conf)119 Transporter::configure(const TransporterConfiguration* conf)
120 {
121 if (configure_derived(conf) &&
122 conf->s_port == m_s_port &&
123 strcmp(conf->remoteHostName, remoteHostName) == 0 &&
124 strcmp(conf->localHostName, localHostName) == 0 &&
125 conf->remoteNodeId == remoteNodeId &&
126 conf->localNodeId == localNodeId &&
127 (conf->serverNodeId == conf->localNodeId) == isServer &&
128 conf->checksum == checksumUsed &&
129 conf->signalId == signalIdUsed &&
130 conf->isMgmConnection == isMgmConnection &&
131 conf->type == m_type)
132 return true; // No change
133 return false; // Can't reconfigure
134 }
135
136
137 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg)138 Transporter::connect_server(NDB_SOCKET_TYPE sockfd,
139 BaseString& msg) {
140 // all initial negotiation is done in TransporterRegistry::connect_server
141 DBUG_ENTER("Transporter::connect_server");
142
143 if (m_connected)
144 {
145 msg.assfmt("line: %u : already connected ??", __LINE__);
146 DBUG_RETURN(false);
147 }
148
149 // Cache the connect address
150 my_socket_connect_address(sockfd, &m_connect_address);
151
152 if (!connect_server_impl(sockfd))
153 {
154 msg.assfmt("line: %u : connect_server_impl failed", __LINE__);
155 DBUG_RETURN(false);
156 }
157
158 m_connect_count++;
159 resetCounters();
160
161 m_connected = true;
162
163 DBUG_RETURN(true);
164 }
165
166
167 bool
connect_client()168 Transporter::connect_client() {
169 NDB_SOCKET_TYPE sockfd;
170 DBUG_ENTER("Transporter::connect_client");
171
172 if(m_connected)
173 DBUG_RETURN(true);
174
175 int port = m_s_port;
176 if (port<0)
177 {
178 // The port number is stored as negative to indicate it's a port number
179 // which the server side setup dynamically and thus was communicated
180 // to the client via the ndb_mgmd.
181 // Reverse the negative port number to get the connectable port
182 port= -port;
183 }
184
185 if(isMgmConnection)
186 {
187 sockfd= m_transporter_registry.connect_ndb_mgmd(remoteHostName,
188 port);
189 }
190 else
191 {
192 if (!m_socket_client->init())
193 DBUG_RETURN(false);
194
195 if (pre_connect_options(m_socket_client->m_sockfd) != 0)
196 DBUG_RETURN(false);
197
198 if (strlen(localHostName) > 0)
199 {
200 if (m_socket_client->bind(localHostName, 0) != 0)
201 DBUG_RETURN(false);
202 }
203
204 sockfd= m_socket_client->connect(remoteHostName,
205 port);
206 }
207
208 DBUG_RETURN(connect_client(sockfd));
209 }
210
211
212 bool
connect_client(NDB_SOCKET_TYPE sockfd)213 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
214
215 DBUG_ENTER("Transporter::connect_client(sockfd)");
216
217 if(m_connected)
218 {
219 DBUG_PRINT("error", ("Already connected"));
220 DBUG_RETURN(true);
221 }
222
223 if (!my_socket_valid(sockfd))
224 {
225 DBUG_PRINT("error", ("Socket " MY_SOCKET_FORMAT " is not valid",
226 MY_SOCKET_FORMAT_VALUE(sockfd)));
227 DBUG_RETURN(false);
228 }
229
230 DBUG_PRINT("info",("server port: %d, isMgmConnection: %d",
231 m_s_port, isMgmConnection));
232
233 // Send "hello"
234 DBUG_PRINT("info", ("Sending own nodeid: %d and transporter type: %d",
235 localNodeId, m_type));
236 SocketOutputStream s_output(sockfd);
237 if (s_output.println("%d %d", localNodeId, m_type) < 0)
238 {
239 DBUG_PRINT("error", ("Send of 'hello' failed"));
240 NDB_CLOSE_SOCKET(sockfd);
241 DBUG_RETURN(false);
242 }
243
244 // Read reply
245 DBUG_PRINT("info", ("Reading reply"));
246 char buf[256];
247 SocketInputStream s_input(sockfd);
248 if (s_input.gets(buf, 256) == 0)
249 {
250 DBUG_PRINT("error", ("Failed to read reply"));
251 NDB_CLOSE_SOCKET(sockfd);
252 DBUG_RETURN(false);
253 }
254
255 // Parse reply
256 int nodeId, remote_transporter_type= -1;
257 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
258 switch (r) {
259 case 2:
260 break;
261 case 1:
262 // we're running version prior to 4.1.9
263 // ok, but with no checks on transporter configuration compatability
264 break;
265 default:
266 DBUG_PRINT("error", ("Failed to parse reply"));
267 NDB_CLOSE_SOCKET(sockfd);
268 DBUG_RETURN(false);
269 }
270
271 DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
272 nodeId, remote_transporter_type));
273
274 // Check nodeid
275 if (nodeId != remoteNodeId)
276 {
277 g_eventLogger->error("Connected to wrong nodeid: %d, expected: %d",
278 nodeId, remoteNodeId);
279 NDB_CLOSE_SOCKET(sockfd);
280 DBUG_RETURN(false);
281 }
282
283 // Check transporter type
284 if (remote_transporter_type != -1 &&
285 remote_transporter_type != m_type)
286 {
287 g_eventLogger->error("Connection to node: %d uses different transporter "
288 "type: %d, expected type: %d",
289 nodeId, remote_transporter_type, m_type);
290 NDB_CLOSE_SOCKET(sockfd);
291 DBUG_RETURN(false);
292 }
293
294 // Cache the connect address
295 my_socket_connect_address(sockfd, &m_connect_address);
296
297 if (!connect_client_impl(sockfd))
298 DBUG_RETURN(false);
299
300 m_connect_count++;
301 resetCounters();
302
303 m_connected = true;
304
305 DBUG_RETURN(true);
306 }
307
308 void
doDisconnect()309 Transporter::doDisconnect() {
310
311 if(!m_connected)
312 return;
313
314 m_connected = false;
315
316 disconnectImpl();
317 }
318
319 void
resetCounters()320 Transporter::resetCounters()
321 {
322 m_bytes_sent = 0;
323 m_bytes_received = 0;
324 m_overload_count = 0;
325 m_slowdown_count = 0;
326 };
327