1 /* Copyright (c) 2003-2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 
18 #include <TransporterRegistry.hpp>
19 #include <TransporterCallback.hpp>
20 #include "Transporter.hpp"
21 #include "TransporterInternalDefinitions.hpp"
22 #include <NdbSleep.h>
23 #include <SocketAuthenticator.hpp>
24 #include <InputStream.hpp>
25 #include <OutputStream.hpp>
26 
27 #include <EventLogger.hpp>
28 extern EventLogger g_eventLogger;
29 
Transporter(TransporterRegistry & t_reg,TransporterType _type,const char * lHostName,const char * rHostName,int s_port,bool _isMgmConnection,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,int _byteorder,bool _compression,bool _checksum,bool _signalId)30 Transporter::Transporter(TransporterRegistry &t_reg,
31 			 TransporterType _type,
32 			 const char *lHostName,
33 			 const char *rHostName,
34 			 int s_port,
35 			 bool _isMgmConnection,
36 			 NodeId lNodeId,
37 			 NodeId rNodeId,
38 			 NodeId serverNodeId,
39 			 int _byteorder,
40 			 bool _compression, bool _checksum, bool _signalId)
41   : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
42     isServer(lNodeId==serverNodeId),
43     m_packer(_signalId, _checksum),  isMgmConnection(_isMgmConnection),
44     m_type(_type),
45     m_transporter_registry(t_reg)
46 {
47   DBUG_ENTER("Transporter::Transporter");
48   if (rHostName && strlen(rHostName) > 0){
49     strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
50     Ndb_getInAddr(&remoteHostAddress, rHostName);
51   }
52   else
53   {
54     if (!isServer) {
55       ndbout << "Unable to setup transporter. Node " << rNodeId
56 	     << " must have hostname. Update configuration." << endl;
57       exit(-1);
58     }
59     remoteHostName[0]= 0;
60   }
61   strncpy(localHostName, lHostName, sizeof(localHostName));
62 
63   DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
64 		     remoteNodeId, localNodeId, isServer,
65 		     remoteHostName, localHostName,
66 		     s_port));
67 
68   byteOrder       = _byteorder;
69   compressionUsed = _compression;
70   checksumUsed    = _checksum;
71   signalIdUsed    = _signalId;
72 
73   m_connected     = false;
74   m_timeOutMillis = 30000;
75 
76   m_connect_address.s_addr= 0;
77   if(s_port<0)
78     s_port= -s_port; // was dynamic
79 
80   if (isServer)
81     m_socket_client= 0;
82   else
83   {
84     m_socket_client= new SocketClient(remoteHostName, s_port,
85 				      new SocketAuthSimple("ndbd",
86 							   "ndbd passwd"));
87 
88     m_socket_client->set_connect_timeout((m_timeOutMillis+999)/1000);
89   }
90   DBUG_VOID_RETURN;
91 }
92 
~Transporter()93 Transporter::~Transporter(){
94   if (m_socket_client)
95     delete m_socket_client;
96 }
97 
98 bool
connect_server(NDB_SOCKET_TYPE sockfd)99 Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
100   // all initial negotiation is done in TransporterRegistry::connect_server
101   DBUG_ENTER("Transporter::connect_server");
102 
103   if(m_connected)
104   {
105     DBUG_RETURN(false); // TODO assert(0);
106   }
107 
108   {
109     struct sockaddr_in addr;
110     SOCKET_SIZE_TYPE addrlen= sizeof(addr);
111     getpeername(sockfd, (struct sockaddr*)&addr, &addrlen);
112     m_connect_address= (&addr)->sin_addr;
113   }
114 
115   bool res = connect_server_impl(sockfd);
116   if(res){
117     m_connected  = true;
118     m_errorCount = 0;
119   }
120 
121   DBUG_RETURN(res);
122 }
123 
124 bool
connect_client()125 Transporter::connect_client() {
126   NDB_SOCKET_TYPE sockfd;
127 
128   if(m_connected)
129     return true;
130 
131   if(isMgmConnection)
132   {
133     sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
134   }
135   else
136   {
137     if (!m_socket_client->init())
138     {
139       return false;
140     }
141     if (strlen(localHostName) > 0)
142     {
143       if (m_socket_client->bind(localHostName, 0) != 0)
144 	return false;
145     }
146     sockfd= m_socket_client->connect();
147   }
148 
149   return connect_client(sockfd);
150 }
151 
152 bool
connect_client(NDB_SOCKET_TYPE sockfd)153 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
154 
155   if(m_connected)
156     return true;
157 
158   if (sockfd == NDB_INVALID_SOCKET)
159     return false;
160 
161   DBUG_ENTER("Transporter::connect_client");
162 
163   DBUG_PRINT("info",("port %d isMgmConnection=%d",m_s_port,isMgmConnection));
164 
165   SocketOutputStream s_output(sockfd);
166   SocketInputStream s_input(sockfd);
167 
168   // send info about own id
169   // send info about own transporter type
170 
171   s_output.println("%d %d", localNodeId, m_type);
172   // get remote id
173   int nodeId, remote_transporter_type= -1;
174 
175   char buf[256];
176   if (s_input.gets(buf, 256) == 0) {
177     NDB_CLOSE_SOCKET(sockfd);
178     DBUG_RETURN(false);
179   }
180 
181   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
182   switch (r) {
183   case 2:
184     break;
185   case 1:
186     // we're running version prior to 4.1.9
187     // ok, but with no checks on transporter configuration compatability
188     break;
189   default:
190     NDB_CLOSE_SOCKET(sockfd);
191     DBUG_RETURN(false);
192   }
193 
194   DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
195 		      nodeId, remote_transporter_type));
196 
197   if (remote_transporter_type != -1)
198   {
199     if (remote_transporter_type != m_type)
200     {
201       DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
202 			   m_type, remote_transporter_type));
203       NDB_CLOSE_SOCKET(sockfd);
204       g_eventLogger.error("Incompatible configuration: transporter type "
205 			  "mismatch with node %d", nodeId);
206       DBUG_RETURN(false);
207     }
208   }
209   else if (m_type == tt_SHM_TRANSPORTER)
210   {
211     g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
212   }
213 
214   {
215     struct sockaddr_in addr;
216     SOCKET_SIZE_TYPE addrlen= sizeof(addr);
217     getpeername(sockfd, (struct sockaddr*)&addr, &addrlen);
218     m_connect_address= (&addr)->sin_addr;
219   }
220 
221   bool res = connect_client_impl(sockfd);
222   if(res){
223     m_connected  = true;
224     m_errorCount = 0;
225   }
226   DBUG_RETURN(res);
227 }
228 
229 void
doDisconnect()230 Transporter::doDisconnect() {
231 
232   if(!m_connected)
233     return; //assert(0); TODO will fail
234 
235   m_connected= false;
236   disconnectImpl();
237 }
238