1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25
26 #include <TransporterRegistry.hpp>
27 #include <TransporterCallback.hpp>
28 #include "Transporter.hpp"
29 #include "TransporterInternalDefinitions.hpp"
30 #include <NdbSleep.h>
31 #include <SocketAuthenticator.hpp>
32 #include <InputStream.hpp>
33 #include <OutputStream.hpp>
34
35 #include <EventLogger.hpp>
36 extern EventLogger * g_eventLogger;
37
Transporter(TransporterRegistry & t_reg,TransporterType _type,const char * lHostName,const char * rHostName,int s_port,bool _isMgmConnection,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,int _byteorder,bool _compression,bool _checksum,bool _signalId,Uint32 max_send_buffer)38 Transporter::Transporter(TransporterRegistry &t_reg,
39 TransporterType _type,
40 const char *lHostName,
41 const char *rHostName,
42 int s_port,
43 bool _isMgmConnection,
44 NodeId lNodeId,
45 NodeId rNodeId,
46 NodeId serverNodeId,
47 int _byteorder,
48 bool _compression, bool _checksum, bool _signalId,
49 Uint32 max_send_buffer)
50 : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
51 isServer(lNodeId==serverNodeId),
52 m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
53 m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
54 m_connected(false),
55 m_type(_type),
56 m_transporter_registry(t_reg)
57 {
58 DBUG_ENTER("Transporter::Transporter");
59 if (rHostName && strlen(rHostName) > 0){
60 strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
61 Ndb_getInAddr(&remoteHostAddress, rHostName);
62 }
63 else
64 {
65 if (!isServer) {
66 ndbout << "Unable to setup transporter. Node " << rNodeId
67 << " must have hostname. Update configuration." << endl;
68 exit(-1);
69 }
70 remoteHostName[0]= 0;
71 }
72 strncpy(localHostName, lHostName, sizeof(localHostName));
73
74 DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
75 remoteNodeId, localNodeId, isServer,
76 remoteHostName, localHostName,
77 s_port));
78
79 byteOrder = _byteorder;
80 compressionUsed = _compression;
81 checksumUsed = _checksum;
82 signalIdUsed = _signalId;
83
84 m_timeOutMillis = 30000;
85
86 m_connect_address.s_addr= 0;
87 if(s_port<0)
88 s_port= -s_port; // was dynamic
89
90 if (isServer)
91 m_socket_client= 0;
92 else
93 {
94 m_socket_client= new SocketClient(remoteHostName, s_port,
95 new SocketAuthSimple("ndbd",
96 "ndbd passwd"));
97
98 m_socket_client->set_connect_timeout(m_timeOutMillis);
99 }
100
101 m_os_max_iovec = 16;
102 #if defined (_SC_IOV_MAX) && defined (HAVE_SYSCONF)
103 long res = sysconf(_SC_IOV_MAX);
104 if (res != (long)-1)
105 {
106 m_os_max_iovec = (Uint32)res;
107 }
108 #endif
109
110 DBUG_VOID_RETURN;
111 }
112
~Transporter()113 Transporter::~Transporter(){
114 delete m_socket_client;
115 }
116
117
118 bool
configure(const TransporterConfiguration * conf)119 Transporter::configure(const TransporterConfiguration* conf)
120 {
121 if (configure_derived(conf) &&
122 conf->s_port == m_s_port &&
123 strcmp(conf->remoteHostName, remoteHostName) == 0 &&
124 strcmp(conf->localHostName, localHostName) == 0 &&
125 conf->remoteNodeId == remoteNodeId &&
126 conf->localNodeId == localNodeId &&
127 (conf->serverNodeId == conf->localNodeId) == isServer &&
128 conf->checksum == checksumUsed &&
129 conf->signalId == signalIdUsed &&
130 conf->isMgmConnection == isMgmConnection &&
131 conf->type == m_type)
132 return true; // No change
133 return false; // Can't reconfigure
134 }
135
136
137 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg)138 Transporter::connect_server(NDB_SOCKET_TYPE sockfd,
139 BaseString& msg) {
140 // all initial negotiation is done in TransporterRegistry::connect_server
141 DBUG_ENTER("Transporter::connect_server");
142
143 if (m_connected)
144 {
145 msg.assfmt("line: %u : already connected ??", __LINE__);
146 DBUG_RETURN(false);
147 }
148
149 // Cache the connect address
150 my_socket_connect_address(sockfd, &m_connect_address);
151
152 if (!connect_server_impl(sockfd))
153 {
154 msg.assfmt("line: %u : connect_server_impl failed", __LINE__);
155 DBUG_RETURN(false);
156 }
157
158 m_connected = true;
159
160 DBUG_RETURN(true);
161 }
162
163
164 bool
connect_client()165 Transporter::connect_client() {
166 NDB_SOCKET_TYPE sockfd;
167 DBUG_ENTER("Transporter::connect_client");
168
169 if(m_connected)
170 DBUG_RETURN(true);
171
172 if(isMgmConnection)
173 {
174 sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
175 }
176 else
177 {
178 if (!m_socket_client->init())
179 DBUG_RETURN(false);
180
181 if (pre_connect_options(m_socket_client->m_sockfd) != 0)
182 DBUG_RETURN(false);
183
184 if (strlen(localHostName) > 0)
185 {
186 if (m_socket_client->bind(localHostName, 0) != 0)
187 DBUG_RETURN(false);
188 }
189 sockfd= m_socket_client->connect();
190 }
191
192 DBUG_RETURN(connect_client(sockfd));
193 }
194
195
196 bool
connect_client(NDB_SOCKET_TYPE sockfd)197 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
198
199 DBUG_ENTER("Transporter::connect_client(sockfd)");
200
201 if(m_connected)
202 {
203 DBUG_PRINT("error", ("Already connected"));
204 DBUG_RETURN(true);
205 }
206
207 if (!my_socket_valid(sockfd))
208 {
209 DBUG_PRINT("error", ("Socket " MY_SOCKET_FORMAT " is not valid",
210 MY_SOCKET_FORMAT_VALUE(sockfd)));
211 DBUG_RETURN(false);
212 }
213
214 DBUG_PRINT("info",("server port: %d, isMgmConnection: %d",
215 m_s_port, isMgmConnection));
216
217 // Send "hello"
218 DBUG_PRINT("info", ("Sending own nodeid: %d and transporter type: %d",
219 localNodeId, m_type));
220 SocketOutputStream s_output(sockfd);
221 if (s_output.println("%d %d", localNodeId, m_type) < 0)
222 {
223 DBUG_PRINT("error", ("Send of 'hello' failed"));
224 NDB_CLOSE_SOCKET(sockfd);
225 DBUG_RETURN(false);
226 }
227
228 // Read reply
229 DBUG_PRINT("info", ("Reading reply"));
230 char buf[256];
231 SocketInputStream s_input(sockfd);
232 if (s_input.gets(buf, 256) == 0)
233 {
234 DBUG_PRINT("error", ("Failed to read reply"));
235 NDB_CLOSE_SOCKET(sockfd);
236 DBUG_RETURN(false);
237 }
238
239 // Parse reply
240 int nodeId, remote_transporter_type= -1;
241 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
242 switch (r) {
243 case 2:
244 break;
245 case 1:
246 // we're running version prior to 4.1.9
247 // ok, but with no checks on transporter configuration compatability
248 break;
249 default:
250 DBUG_PRINT("error", ("Failed to parse reply"));
251 NDB_CLOSE_SOCKET(sockfd);
252 DBUG_RETURN(false);
253 }
254
255 DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
256 nodeId, remote_transporter_type));
257
258 // Check nodeid
259 if (nodeId != remoteNodeId)
260 {
261 g_eventLogger->error("Connected to wrong nodeid: %d, expected: %d",
262 nodeId, remoteNodeId);
263 NDB_CLOSE_SOCKET(sockfd);
264 DBUG_RETURN(false);
265 }
266
267 // Check transporter type
268 if (remote_transporter_type != -1 &&
269 remote_transporter_type != m_type)
270 {
271 g_eventLogger->error("Connection to node: %d uses different transporter "
272 "type: %d, expected type: %d",
273 nodeId, remote_transporter_type, m_type);
274 NDB_CLOSE_SOCKET(sockfd);
275 DBUG_RETURN(false);
276 }
277
278 // Cache the connect address
279 my_socket_connect_address(sockfd, &m_connect_address);
280
281 if (!connect_client_impl(sockfd))
282 DBUG_RETURN(false);
283
284 m_connected = true;
285
286 DBUG_RETURN(true);
287 }
288
289 void
doDisconnect()290 Transporter::doDisconnect() {
291
292 if(!m_connected)
293 return;
294
295 m_connected = false;
296
297 disconnectImpl();
298 }
299
300