1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef Transporter_H
26 #define Transporter_H
27 
28 #include <ndb_global.h>
29 
30 #include <SocketClient.hpp>
31 
32 #include <TransporterRegistry.hpp>
33 #include <TransporterCallback.hpp>
34 #include "TransporterDefinitions.hpp"
35 #include "Packer.hpp"
36 
37 #include <NdbMutex.h>
38 #include <NdbThread.h>
39 
40 #include <ndb_socket.h>
41 
42 class Transporter {
43   friend class TransporterRegistry;
44 public:
45   virtual bool initTransporter() = 0;
46 
47   /**
48    * Destructor
49    */
50   virtual ~Transporter();
51 
52   /**
53    * None blocking
54    *    Use isConnected() to check status
55    */
56   virtual bool connect_client();
57   bool connect_client(NDB_SOCKET_TYPE sockfd);
58   bool connect_server(NDB_SOCKET_TYPE socket, BaseString& errormsg);
59 
60   /**
61    * Blocking
62    */
63   virtual void doDisconnect();
64 
65   /**
66    * Are we currently connected
67    */
68   bool isConnected() const;
69 
70   /**
71    * Remote Node Id
72    */
73   NodeId getRemoteNodeId() const;
74 
75   /**
76    * Local (own) Node Id
77    */
78   NodeId getLocalNodeId() const;
79 
80   /**
81    * Get port we're connecting to (signed)
82    */
get_s_port()83   int get_s_port() { return m_s_port; };
84 
85   /**
86    * Set port to connect to (signed)
87    */
set_s_port(int port)88   void set_s_port(int port) {
89     m_s_port = port;
90     if(port<0)
91       port= -port;
92     if(m_socket_client)
93       m_socket_client->set_port(port);
94   };
95 
update_status_overloaded(Uint32 used)96   void update_status_overloaded(Uint32 used)
97   {
98     m_transporter_registry.set_status_overloaded(remoteNodeId,
99                                                  used >= m_overload_limit);
100   }
101 
102   virtual int doSend() = 0;
103 
has_data_to_send()104   bool has_data_to_send()
105   {
106     return get_callback_obj()->has_data_to_send(remoteNodeId);
107   }
108 
109   /* Get the configured maximum send buffer usage. */
get_max_send_buffer()110   Uint32 get_max_send_buffer() { return m_max_send_buffer; }
111 
112 protected:
113   Transporter(TransporterRegistry &,
114 	      TransporterType,
115 	      const char *lHostName,
116 	      const char *rHostName,
117 	      int s_port,
118 	      bool isMgmConnection,
119 	      NodeId lNodeId,
120 	      NodeId rNodeId,
121 	      NodeId serverNodeId,
122 	      int byteorder,
123 	      bool compression,
124 	      bool checksum,
125 	      bool signalId,
126               Uint32 max_send_buffer);
127 
128   virtual bool configure(const TransporterConfiguration* conf);
129   virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
130 
131   /**
132    * Blocking, for max timeOut milli seconds
133    *   Returns true if connect succeded
134    */
135   virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
136   virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
pre_connect_options(NDB_SOCKET_TYPE sockfd)137   virtual int pre_connect_options(NDB_SOCKET_TYPE sockfd) { return 0;}
138 
139   /**
140    * Blocking
141    */
142   virtual void disconnectImpl() = 0;
143 
144   /**
145    * Remote host name/and address
146    */
147   char remoteHostName[256];
148   char localHostName[256];
149   struct in_addr remoteHostAddress;
150   struct in_addr localHostAddress;
151 
152   int m_s_port;
153 
154   const NodeId remoteNodeId;
155   const NodeId localNodeId;
156 
157   const bool isServer;
158 
159   unsigned createIndex;
160 
161   int byteOrder;
162   bool compressionUsed;
163   bool checksumUsed;
164   bool signalIdUsed;
165   Packer m_packer;
166   Uint32 m_max_send_buffer;
167   /* Overload limit, as configured with the OverloadLimit config parameter. */
168   Uint32 m_overload_limit;
169 
170 private:
171 
172   /**
173    * means that we transform an MGM connection into
174    * a transporter connection
175    */
176   bool isMgmConnection;
177 
178   SocketClient *m_socket_client;
179   struct in_addr m_connect_address;
180 
181   virtual bool send_is_possible(int timeout_millisec) const = 0;
182   virtual bool send_limit_reached(int bufsize) = 0;
183 
184 protected:
185   Uint32 m_os_max_iovec;
186   Uint32 m_timeOutMillis;
187   bool m_connected;     // Are we connected
188   TransporterType m_type;
189 
190   TransporterRegistry &m_transporter_registry;
get_callback_obj()191   TransporterCallback *get_callback_obj() { return m_transporter_registry.callbackObj; };
do_disconnect(int err)192   void do_disconnect(int err){m_transporter_registry.do_disconnect(remoteNodeId,err);};
report_error(enum TransporterError err,const char * info=0)193   void report_error(enum TransporterError err, const char *info = 0)
194     { m_transporter_registry.report_error(remoteNodeId, err, info); };
195 
196   Uint32 fetch_send_iovec_data(struct iovec dst[], Uint32 cnt);
197   void iovec_data_sent(int nBytesSent);
198 };
199 
200 inline
201 bool
isConnected() const202 Transporter::isConnected() const {
203   return m_connected;
204 }
205 
206 inline
207 NodeId
getRemoteNodeId() const208 Transporter::getRemoteNodeId() const {
209   return remoteNodeId;
210 }
211 
212 inline
213 NodeId
getLocalNodeId() const214 Transporter::getLocalNodeId() const {
215   return localNodeId;
216 }
217 
218 /**
219  * Get data to send (in addition to data possibly remaining from previous
220  * partial send).
221  */
222 inline
223 Uint32
fetch_send_iovec_data(struct iovec dst[],Uint32 cnt)224 Transporter::fetch_send_iovec_data(struct iovec dst[], Uint32 cnt)
225 {
226   return get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
227                                                      dst, cnt);
228 }
229 
230 inline
231 void
iovec_data_sent(int nBytesSent)232 Transporter::iovec_data_sent(int nBytesSent)
233 {
234   Uint32 used_bytes
235     = get_callback_obj()->bytes_sent(remoteNodeId, nBytesSent);
236   update_status_overloaded(used_bytes);
237 }
238 
239 #endif // Define of Transporter_H
240