1 /* Copyright (c) 2003-2007 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17
18 #include <TransporterRegistry.hpp>
19 #include <TransporterCallback.hpp>
20 #include "Transporter.hpp"
21 #include "TransporterInternalDefinitions.hpp"
22 #include <NdbSleep.h>
23 #include <SocketAuthenticator.hpp>
24 #include <InputStream.hpp>
25 #include <OutputStream.hpp>
26
27 #include <EventLogger.hpp>
28 extern EventLogger g_eventLogger;
29
Transporter(TransporterRegistry & t_reg,TransporterType _type,const char * lHostName,const char * rHostName,int s_port,bool _isMgmConnection,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,int _byteorder,bool _compression,bool _checksum,bool _signalId)30 Transporter::Transporter(TransporterRegistry &t_reg,
31 TransporterType _type,
32 const char *lHostName,
33 const char *rHostName,
34 int s_port,
35 bool _isMgmConnection,
36 NodeId lNodeId,
37 NodeId rNodeId,
38 NodeId serverNodeId,
39 int _byteorder,
40 bool _compression, bool _checksum, bool _signalId)
41 : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
42 isServer(lNodeId==serverNodeId),
43 m_packer(_signalId, _checksum), isMgmConnection(_isMgmConnection),
44 m_type(_type),
45 m_transporter_registry(t_reg)
46 {
47 DBUG_ENTER("Transporter::Transporter");
48 if (rHostName && strlen(rHostName) > 0){
49 strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
50 Ndb_getInAddr(&remoteHostAddress, rHostName);
51 }
52 else
53 {
54 if (!isServer) {
55 ndbout << "Unable to setup transporter. Node " << rNodeId
56 << " must have hostname. Update configuration." << endl;
57 exit(-1);
58 }
59 remoteHostName[0]= 0;
60 }
61 strncpy(localHostName, lHostName, sizeof(localHostName));
62
63 DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
64 remoteNodeId, localNodeId, isServer,
65 remoteHostName, localHostName,
66 s_port));
67
68 byteOrder = _byteorder;
69 compressionUsed = _compression;
70 checksumUsed = _checksum;
71 signalIdUsed = _signalId;
72
73 m_connected = false;
74 m_timeOutMillis = 30000;
75
76 m_connect_address.s_addr= 0;
77 if(s_port<0)
78 s_port= -s_port; // was dynamic
79
80 if (isServer)
81 m_socket_client= 0;
82 else
83 {
84 m_socket_client= new SocketClient(remoteHostName, s_port,
85 new SocketAuthSimple("ndbd",
86 "ndbd passwd"));
87
88 m_socket_client->set_connect_timeout((m_timeOutMillis+999)/1000);
89 }
90 DBUG_VOID_RETURN;
91 }
92
~Transporter()93 Transporter::~Transporter(){
94 if (m_socket_client)
95 delete m_socket_client;
96 }
97
98 bool
connect_server(NDB_SOCKET_TYPE sockfd)99 Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
100 // all initial negotiation is done in TransporterRegistry::connect_server
101 DBUG_ENTER("Transporter::connect_server");
102
103 if(m_connected)
104 {
105 DBUG_RETURN(false); // TODO assert(0);
106 }
107
108 {
109 struct sockaddr_in addr;
110 SOCKET_SIZE_TYPE addrlen= sizeof(addr);
111 getpeername(sockfd, (struct sockaddr*)&addr, &addrlen);
112 m_connect_address= (&addr)->sin_addr;
113 }
114
115 bool res = connect_server_impl(sockfd);
116 if(res){
117 m_connected = true;
118 m_errorCount = 0;
119 }
120
121 DBUG_RETURN(res);
122 }
123
124 bool
connect_client()125 Transporter::connect_client() {
126 NDB_SOCKET_TYPE sockfd;
127
128 if(m_connected)
129 return true;
130
131 if(isMgmConnection)
132 {
133 sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
134 }
135 else
136 {
137 if (!m_socket_client->init())
138 {
139 return false;
140 }
141 if (strlen(localHostName) > 0)
142 {
143 if (m_socket_client->bind(localHostName, 0) != 0)
144 return false;
145 }
146 sockfd= m_socket_client->connect();
147 }
148
149 return connect_client(sockfd);
150 }
151
152 bool
connect_client(NDB_SOCKET_TYPE sockfd)153 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
154
155 if(m_connected)
156 return true;
157
158 if (sockfd == NDB_INVALID_SOCKET)
159 return false;
160
161 DBUG_ENTER("Transporter::connect_client");
162
163 DBUG_PRINT("info",("port %d isMgmConnection=%d",m_s_port,isMgmConnection));
164
165 SocketOutputStream s_output(sockfd);
166 SocketInputStream s_input(sockfd);
167
168 // send info about own id
169 // send info about own transporter type
170
171 s_output.println("%d %d", localNodeId, m_type);
172 // get remote id
173 int nodeId, remote_transporter_type= -1;
174
175 char buf[256];
176 if (s_input.gets(buf, 256) == 0) {
177 NDB_CLOSE_SOCKET(sockfd);
178 DBUG_RETURN(false);
179 }
180
181 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
182 switch (r) {
183 case 2:
184 break;
185 case 1:
186 // we're running version prior to 4.1.9
187 // ok, but with no checks on transporter configuration compatability
188 break;
189 default:
190 NDB_CLOSE_SOCKET(sockfd);
191 DBUG_RETURN(false);
192 }
193
194 DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
195 nodeId, remote_transporter_type));
196
197 if (remote_transporter_type != -1)
198 {
199 if (remote_transporter_type != m_type)
200 {
201 DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
202 m_type, remote_transporter_type));
203 NDB_CLOSE_SOCKET(sockfd);
204 g_eventLogger.error("Incompatible configuration: transporter type "
205 "mismatch with node %d", nodeId);
206 DBUG_RETURN(false);
207 }
208 }
209 else if (m_type == tt_SHM_TRANSPORTER)
210 {
211 g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
212 }
213
214 {
215 struct sockaddr_in addr;
216 SOCKET_SIZE_TYPE addrlen= sizeof(addr);
217 getpeername(sockfd, (struct sockaddr*)&addr, &addrlen);
218 m_connect_address= (&addr)->sin_addr;
219 }
220
221 bool res = connect_client_impl(sockfd);
222 if(res){
223 m_connected = true;
224 m_errorCount = 0;
225 }
226 DBUG_RETURN(res);
227 }
228
229 void
doDisconnect()230 Transporter::doDisconnect() {
231
232 if(!m_connected)
233 return; //assert(0); TODO will fail
234
235 m_connected= false;
236 disconnectImpl();
237 }
238