1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef Transporter_H
26 #define Transporter_H
27
28 #include <ndb_global.h>
29
30 #include <SocketClient.hpp>
31
32 #include <TransporterRegistry.hpp>
33 #include <TransporterCallback.hpp>
34 #include "TransporterDefinitions.hpp"
35 #include "Packer.hpp"
36
37 #include <NdbMutex.h>
38 #include <NdbThread.h>
39
40 #include <ndb_socket.h>
41
42 class Transporter {
43 friend class TransporterRegistry;
44 public:
45 virtual bool initTransporter() = 0;
46
47 /**
48 * Destructor
49 */
50 virtual ~Transporter();
51
52 /**
53 * None blocking
54 * Use isConnected() to check status
55 */
56 virtual bool connect_client();
57 bool connect_client(NDB_SOCKET_TYPE sockfd);
58 bool connect_server(NDB_SOCKET_TYPE socket, BaseString& errormsg);
59
60 /**
61 * Blocking
62 */
63 virtual void doDisconnect();
64
65 /**
66 * Are we currently connected
67 */
68 bool isConnected() const;
69
70 /**
71 * Remote Node Id
72 */
73 NodeId getRemoteNodeId() const;
74
75 /**
76 * Local (own) Node Id
77 */
78 NodeId getLocalNodeId() const;
79
80 /**
81 * Get port we're connecting to (signed)
82 */
get_s_port()83 int get_s_port() { return m_s_port; };
84
85 /**
86 * Set port to connect to (signed)
87 */
set_s_port(int port)88 void set_s_port(int port) {
89 m_s_port = port;
90 if(port<0)
91 port= -port;
92 if(m_socket_client)
93 m_socket_client->set_port(port);
94 };
95
update_status_overloaded(Uint32 used)96 void update_status_overloaded(Uint32 used)
97 {
98 m_transporter_registry.set_status_overloaded(remoteNodeId,
99 used >= m_overload_limit);
100 }
101
102 virtual int doSend() = 0;
103
has_data_to_send()104 bool has_data_to_send()
105 {
106 return get_callback_obj()->has_data_to_send(remoteNodeId);
107 }
108
109 /* Get the configured maximum send buffer usage. */
get_max_send_buffer()110 Uint32 get_max_send_buffer() { return m_max_send_buffer; }
111
112 protected:
113 Transporter(TransporterRegistry &,
114 TransporterType,
115 const char *lHostName,
116 const char *rHostName,
117 int s_port,
118 bool isMgmConnection,
119 NodeId lNodeId,
120 NodeId rNodeId,
121 NodeId serverNodeId,
122 int byteorder,
123 bool compression,
124 bool checksum,
125 bool signalId,
126 Uint32 max_send_buffer);
127
128 virtual bool configure(const TransporterConfiguration* conf);
129 virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
130
131 /**
132 * Blocking, for max timeOut milli seconds
133 * Returns true if connect succeded
134 */
135 virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
136 virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
pre_connect_options(NDB_SOCKET_TYPE sockfd)137 virtual int pre_connect_options(NDB_SOCKET_TYPE sockfd) { return 0;}
138
139 /**
140 * Blocking
141 */
142 virtual void disconnectImpl() = 0;
143
144 /**
145 * Remote host name/and address
146 */
147 char remoteHostName[256];
148 char localHostName[256];
149 struct in_addr remoteHostAddress;
150 struct in_addr localHostAddress;
151
152 int m_s_port;
153
154 const NodeId remoteNodeId;
155 const NodeId localNodeId;
156
157 const bool isServer;
158
159 unsigned createIndex;
160
161 int byteOrder;
162 bool compressionUsed;
163 bool checksumUsed;
164 bool signalIdUsed;
165 Packer m_packer;
166 Uint32 m_max_send_buffer;
167 /* Overload limit, as configured with the OverloadLimit config parameter. */
168 Uint32 m_overload_limit;
169
170 private:
171
172 /**
173 * means that we transform an MGM connection into
174 * a transporter connection
175 */
176 bool isMgmConnection;
177
178 SocketClient *m_socket_client;
179 struct in_addr m_connect_address;
180
181 virtual bool send_is_possible(int timeout_millisec) const = 0;
182 virtual bool send_limit_reached(int bufsize) = 0;
183
184 protected:
185 Uint32 m_os_max_iovec;
186 Uint32 m_timeOutMillis;
187 bool m_connected; // Are we connected
188 TransporterType m_type;
189
190 TransporterRegistry &m_transporter_registry;
get_callback_obj()191 TransporterCallback *get_callback_obj() { return m_transporter_registry.callbackObj; };
do_disconnect(int err)192 void do_disconnect(int err){m_transporter_registry.do_disconnect(remoteNodeId,err);};
report_error(enum TransporterError err,const char * info=0)193 void report_error(enum TransporterError err, const char *info = 0)
194 { m_transporter_registry.report_error(remoteNodeId, err, info); };
195
196 Uint32 fetch_send_iovec_data(struct iovec dst[], Uint32 cnt);
197 void iovec_data_sent(int nBytesSent);
198 };
199
200 inline
201 bool
isConnected() const202 Transporter::isConnected() const {
203 return m_connected;
204 }
205
206 inline
207 NodeId
getRemoteNodeId() const208 Transporter::getRemoteNodeId() const {
209 return remoteNodeId;
210 }
211
212 inline
213 NodeId
getLocalNodeId() const214 Transporter::getLocalNodeId() const {
215 return localNodeId;
216 }
217
218 /**
219 * Get data to send (in addition to data possibly remaining from previous
220 * partial send).
221 */
222 inline
223 Uint32
fetch_send_iovec_data(struct iovec dst[],Uint32 cnt)224 Transporter::fetch_send_iovec_data(struct iovec dst[], Uint32 cnt)
225 {
226 return get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
227 dst, cnt);
228 }
229
230 inline
231 void
iovec_data_sent(int nBytesSent)232 Transporter::iovec_data_sent(int nBytesSent)
233 {
234 Uint32 used_bytes
235 = get_callback_obj()->bytes_sent(remoteNodeId, nBytesSent);
236 update_status_overloaded(used_bytes);
237 }
238
239 #endif // Define of Transporter_H
240