1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef Transporter_H
26 #define Transporter_H
27 
28 #include <ndb_global.h>
29 
30 #include <SocketClient.hpp>
31 
32 #include <TransporterRegistry.hpp>
33 #include <TransporterCallback.hpp>
34 #include "TransporterDefinitions.hpp"
35 #include "Packer.hpp"
36 
37 #include <NdbMutex.h>
38 #include <NdbThread.h>
39 
40 #include "ndb_socket.h"
41 
42 #define DISCONNECT_ERRNO(e, sz) ( \
43                 (sz == 0) || \
44                  (!((sz == -1) && \
45                   ((e == SOCKET_EAGAIN) || \
46                    (e == SOCKET_EWOULDBLOCK) || \
47                    (e == SOCKET_EINTR)))))
48 
49 class Transporter {
50   friend class TransporterRegistry;
51   friend class Multi_Transporter;
52   friend class Qmgr;
53 public:
54   virtual bool initTransporter() = 0;
55 
56   /**
57    * Destructor
58    */
59   virtual ~Transporter();
60 
61 
62   /**
63    * Disconnect node/socket
64    */
65   bool do_disconnect(int err, bool send_source);
66 
67   /**
68    * Clear any data buffered in the transporter.
69    * Should only be called in a disconnected state.
70    */
resetBuffers()71   virtual void resetBuffers() {}
72 
73   /**
74    * Is this transporter part of a multi transporter.
75    * It is a real transporter, but can be connected
76    * when the node is in the state connected.
77    */
isPartOfMultiTransporter()78   virtual bool isPartOfMultiTransporter()
79   {
80     return (m_multi_transporter_instance != 0);
81   }
82 
get_multi_transporter_instance()83   Uint32 get_multi_transporter_instance()
84   {
85     return m_multi_transporter_instance;
86   }
isMultiTransporter()87   virtual bool isMultiTransporter()
88   {
89     return false;
90   }
91 
set_multi_transporter_instance(Uint32 val)92   void set_multi_transporter_instance(Uint32 val)
93   {
94     m_multi_transporter_instance = val;
95   }
96 
get_bytes_sent() const97   virtual Uint64 get_bytes_sent() const
98   {
99     return m_bytes_sent;
100   }
101 
get_bytes_received() const102   virtual Uint64 get_bytes_received() const
103   {
104     return m_bytes_received;
105   }
106 
107   /**
108    * In most cases we only use transporter per node connection.
109    * But in cases where the transporter is heavily loaded we can
110    * have multiple transporters to send for one node connection.
111    * In this case theNodeIdTransporters points to a Multi_Transporter
112    * object that has implemented a hash algorithm for
113    * get_send_transporter based on sending thread and receiving
114    * thread.
115    */
get_send_transporter(Uint32 recBlock,Uint32 sendBlock)116   virtual Transporter* get_send_transporter(Uint32 recBlock, Uint32 sendBlock)
117   {
118     (void)recBlock;
119     (void)sendBlock;
120     return this;
121   }
122 
123   /**
124    * None blocking
125    *    Use isConnected() to check status
126    */
127   virtual bool connect_client();
128   bool connect_client(NDB_SOCKET_TYPE sockfd);
129   bool connect_server(NDB_SOCKET_TYPE socket, BaseString& errormsg);
130 
131   /**
132    * Returns socket used (sockets are used for all transporters to ensure
133    * we can wake up also shared memory transporters and other types of
134    * transporters in consistent manner.
135    */
136   NDB_SOCKET_TYPE getSocket() const;
137 
138   /**
139    * Blocking
140    */
141   void doDisconnect();
142 
143   /**
144    * Are we currently connected
145    */
146   bool isConnected() const;
147 
148   /**
149    * Remote Node Id
150    */
151   NodeId getRemoteNodeId() const;
152 
153   /**
154    * Index into allTransporters array.
155    */
156   TrpId getTransporterIndex() const;
157   void setTransporterIndex(TrpId);
158   /**
159    * Local (own) Node Id
160    */
161   NodeId getLocalNodeId() const;
162 
163   /**
164    * Get port we're connecting to (signed)
165    */
get_s_port() const166   int get_s_port() const {
167     return m_s_port;
168   }
169 
170   /**
171    * Set port to connect to (signed)
172    */
set_s_port(int port)173   void set_s_port(int port) {
174     m_s_port = port;
175   }
176 
update_status_overloaded(Uint32 used)177   void update_status_overloaded(Uint32 used)
178   {
179     m_transporter_registry.set_status_overloaded(remoteNodeId,
180                                                  used >= m_overload_limit);
181     m_transporter_registry.set_status_slowdown(remoteNodeId,
182                                                used >= m_slowdown_limit);
183   }
184 
185   virtual bool doSend(bool need_wakeup = true) = 0;
186 
187   /* Get the configured maximum send buffer usage. */
get_max_send_buffer()188   Uint32 get_max_send_buffer() { return m_max_send_buffer; }
189 
get_connect_count()190   Uint32 get_connect_count() { return m_connect_count; }
191 
inc_overload_count()192   void inc_overload_count() { m_overload_count++; }
get_overload_count()193   Uint32 get_overload_count() { return m_overload_count; }
inc_slowdown_count()194   void inc_slowdown_count() { m_slowdown_count++; }
get_slowdown_count()195   Uint32 get_slowdown_count() { return m_slowdown_count; }
set_recv_thread_idx(Uint32 recv_thread_idx)196   void set_recv_thread_idx (Uint32 recv_thread_idx)
197   {
198     m_recv_thread_idx = recv_thread_idx;
199   }
set_transporter_active(bool active)200   void set_transporter_active(bool active)
201   {
202     m_is_active = active;
203   }
get_recv_thread_idx()204   Uint32 get_recv_thread_idx() { return m_recv_thread_idx; }
205 
206   TransporterType getTransporterType() const;
207 
208   /**
209    * Only applies to TCP transporter, abort on any other object.
210    * Used as part of shutting down transporter when switching to
211    * multi socket setup.
212    * Shut down only for writes when all data have been sent.
213    */
shutdown()214   virtual void shutdown() { abort();}
215 
216 protected:
217   Transporter(TransporterRegistry &,
218               TrpId transporter_index,
219 	      TransporterType,
220 	      const char *lHostName,
221 	      const char *rHostName,
222 	      int s_port,
223 	      bool isMgmConnection,
224 	      NodeId lNodeId,
225 	      NodeId rNodeId,
226 	      NodeId serverNodeId,
227 	      int byteorder,
228 	      bool compression,
229 	      bool checksum,
230 	      bool signalId,
231               Uint32 max_send_buffer,
232               bool _presend_checksum,
233               Uint32 spintime);
234 
235   virtual bool configure(const TransporterConfiguration* conf);
236   virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
237 
238   /**
239    * Blocking, for max timeOut milli seconds
240    *   Returns true if connect succeded
241    */
242   virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
243   virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
pre_connect_options(NDB_SOCKET_TYPE sockfd)244   virtual int pre_connect_options(NDB_SOCKET_TYPE sockfd) { return 0;}
245 
246   /**
247    * Blocking
248    */
249   virtual void disconnectImpl() = 0;
250 
251   /**
252    * Remote host name/and address
253    */
254   char remoteHostName[256];
255   char localHostName[256];
256 
257   int m_s_port;
258 
259   Uint32 m_spintime;
get_spintime()260   Uint32 get_spintime()
261   {
262     return m_spintime;
263   }
264   const NodeId remoteNodeId;
265   const NodeId localNodeId;
266 
267   TrpId m_transporter_index;
268   const bool isServer;
269 
270   int byteOrder;
271   bool compressionUsed;
272   bool checksumUsed;
273   bool check_send_checksum;
274   bool signalIdUsed;
275   Packer m_packer;
276   Uint32 m_max_send_buffer;
277   /* Overload limit, as configured with the OverloadLimit config parameter. */
278   Uint32 m_overload_limit;
279   Uint32 m_slowdown_limit;
280   void resetCounters();
281   Uint64 m_bytes_sent;
282   Uint64 m_bytes_received;
283   Uint32 m_connect_count;
284   Uint32 m_overload_count;
285   Uint32 m_slowdown_count;
286 
287   // Sending/Receiving socket used by both client and server
288   NDB_SOCKET_TYPE theSocket;
289 private:
290   SocketClient *m_socket_client;
291   struct in_addr m_connect_address;
292 
293   virtual bool send_is_possible(int timeout_millisec) const = 0;
294   virtual bool send_limit_reached(int bufsize) = 0;
295 
296   void update_connect_state(bool connected);
297 
298 protected:
299   /**
300    * means that we transform an MGM connection into
301    * a transporter connection
302    */
303   bool isMgmConnection;
304 
305   Uint32 m_multi_transporter_instance;
306   Uint32 m_recv_thread_idx;
307   bool m_is_active;
308 
309   Uint32 m_os_max_iovec;
310   Uint32 m_timeOutMillis;
311   bool m_connected;     // Are we connected
312   TransporterType m_type;
313 
314   /**
315    * Statistics
316    */
317   Uint32 reportFreq;
318   Uint32 receiveCount;
319   Uint64 receiveSize;
320   Uint32 sendCount;
321   Uint64 sendSize;
322 
323   TransporterRegistry &m_transporter_registry;
get_callback_obj()324   TransporterCallback *get_callback_obj() { return m_transporter_registry.callbackObj; }
report_error(enum TransporterError err,const char * info=0)325   void report_error(enum TransporterError err, const char *info = 0)
326     { m_transporter_registry.report_error(remoteNodeId, err, info); }
327 
328   Uint32 fetch_send_iovec_data(struct iovec dst[], Uint32 cnt);
329   void iovec_data_sent(int nBytesSent);
330 
331   void set_get(NDB_SOCKET_TYPE fd,
332                int level,
333                int optval,
334                const char *optname,
335                int val);
336   /*
337    * Keep checksum state for Protocol6 messages over a byte stream.
338    */
339   class checksum_state {
340     enum cs_states
341     {
342       CS_INIT,
343       CS_MSG_CHECK,
344       CS_MSG_NOCHECK
345     };
346     cs_states state;
347     Uint32 chksum; // of already sent bytes, rotated so next byte to process matches first byte of chksum
348     Uint16 pending; // remaining bytes before state change
349   public:
350     bool computev(const struct iovec *iov, int iovcnt, size_t bytecnt = SIZE_T_MAX);
checksum_state()351     checksum_state(): state(CS_INIT), chksum(0), pending(4) {}
init()352     void init() { state = CS_INIT; chksum = 0; pending = 4; }
353   private:
354     bool compute(const void* bytes, size_t len);
355     static void static_asserts(); // container of static asserts, not to be called
356     void dumpBadChecksumInfo(Uint32 inputSum,
357                              Uint32 badSum,
358                              size_t offset,
359                              Uint32 remaining,
360                              const void* buf,
361                              size_t len) const;
362 
363   };
364   checksum_state send_checksum_state;
365 };
366 
367 inline
368 NDB_SOCKET_TYPE
getSocket() const369 Transporter::getSocket() const {
370   return theSocket;
371 }
372 
373 inline
374 TransporterType
getTransporterType() const375 Transporter::getTransporterType() const
376 {
377   return m_type;
378 }
379 
380 inline
381 bool
isConnected() const382 Transporter::isConnected() const {
383   return m_connected;
384 }
385 
386 inline
387 NodeId
getRemoteNodeId() const388 Transporter::getRemoteNodeId() const {
389   return remoteNodeId;
390 }
391 
392 inline
393 TrpId
getTransporterIndex() const394 Transporter::getTransporterIndex() const {
395   return m_transporter_index;
396 }
397 
398 inline
399 void
setTransporterIndex(TrpId val)400 Transporter::setTransporterIndex(TrpId val)
401 {
402   m_transporter_index = val;
403 }
404 
405 inline
406 NodeId
getLocalNodeId() const407 Transporter::getLocalNodeId() const {
408   return localNodeId;
409 }
410 
411 /**
412  * Get data to send (in addition to data possibly remaining from previous
413  * partial send).
414  */
415 inline
416 Uint32
fetch_send_iovec_data(struct iovec dst[],Uint32 cnt)417 Transporter::fetch_send_iovec_data(struct iovec dst[], Uint32 cnt)
418 {
419   return get_callback_obj()->get_bytes_to_send_iovec(remoteNodeId,
420                                                      m_transporter_index,
421                                                      dst,
422                                                      cnt);
423 }
424 
425 inline
426 void
iovec_data_sent(int nBytesSent)427 Transporter::iovec_data_sent(int nBytesSent)
428 {
429   Uint32 used_bytes = get_callback_obj()->bytes_sent(remoteNodeId,
430                                                      m_transporter_index,
431                                                      nBytesSent);
432   update_status_overloaded(used_bytes);
433 }
434 
435 inline
436 void
static_asserts()437 Transporter::checksum_state::static_asserts()
438 {
439   STATIC_ASSERT(MAX_SEND_MESSAGE_BYTESIZE == (Uint16)MAX_SEND_MESSAGE_BYTESIZE);
440   STATIC_ASSERT(SIZE_T_MAX == (size_t)SIZE_T_MAX);
441 }
442 
443 inline
444 bool
compute(const void * buf,size_t len)445 Transporter::checksum_state::compute(const void* buf, size_t len)
446 {
447   const Uint32 inputSum = chksum;
448   Uint32 off = 0;
449   unsigned char* psum = static_cast<unsigned char*>(static_cast<void*>(&chksum));
450   const unsigned char* bytes = static_cast<const unsigned char*>(buf);
451 
452   while (off < len)
453   {
454     const Uint32 available = len - off;
455     switch (state)
456     {
457     case CS_INIT:
458     {
459       assert(pending <= 4);
460       assert(chksum == 0 || pending < 4);
461       const Uint32 nb = MIN(pending, available);
462       memcpy(psum + (4-pending), bytes + off, nb);
463       off+= nb;
464       pending-= nb;
465 
466       if (pending == 0)
467       {
468         /* Msg header word 0 complete, parse to determine msg length */
469         assert(Protocol6::getMessageLength(chksum) <= (MAX_SEND_MESSAGE_BYTESIZE >> 2));
470         assert(Protocol6::getMessageLength(chksum) >= 2);
471         pending = (Protocol6::getMessageLength(chksum) * 4) - 4; /* Word 0 eaten */
472         state = (Protocol6::getCheckSumIncluded(chksum)? CS_MSG_CHECK : CS_MSG_NOCHECK);
473       }
474       break;
475     }
476     case CS_MSG_CHECK:
477     case CS_MSG_NOCHECK:
478     {
479       if (available < pending)
480       {
481         /* Only part of current msg body present */
482         if (state == CS_MSG_CHECK)
483         {
484           /* Add available content to the checksum */
485           chksum = computeXorChecksumBytes(bytes + off, available, chksum);
486         }
487         off += available;
488         pending -= available;
489       }
490       else
491       {
492         /* All of current msg body present, consume and check it */
493         if (state == CS_MSG_CHECK)
494         {
495           chksum = computeXorChecksumBytes(bytes + off, pending, chksum);
496           if (chksum != 0)
497           {
498             dumpBadChecksumInfo(inputSum,
499                                 chksum,
500                                 off,
501                                 pending,
502                                 buf,
503                                 len);
504             return false;
505           }
506         }
507         off += pending;
508 
509         /* Now we are ready for the next msg */
510         pending = 4;
511         state = CS_INIT;
512       }
513       break;
514     }
515     }
516   } // while (off < len)
517 
518   return true;
519 }
520 
521 
522 inline
523 bool
computev(const struct iovec * iov,int iovcnt,size_t bytecnt)524 Transporter::checksum_state::computev(const struct iovec *iov, int iovcnt, size_t bytecnt)
525 {
526   // bytecnt is SIZE_T_MAX implies use all iovec
527   size_t off = 0;
528   bool ok = true;
529   for(int iovi = 0; ok && bytecnt > off && iovi < iovcnt; iovi ++)
530   {
531     int nb = iov[iovi].iov_len;
532     if (bytecnt < off + nb)
533     {
534       nb = bytecnt - off;
535     }
536     if (!compute(iov[iovi].iov_base, nb))
537     {
538       fprintf(stderr,
539               "Transporter::checksum_state::computev() failed on IOV %u/%u "
540               "byteCount %llu off %llu nb %u\n",
541               iovi,
542               iovcnt,
543               Uint64(bytecnt),
544               Uint64(off),
545               nb);
546       /* TODO : Dump more IOV + bytecnt details */
547       return false;
548     }
549     off += nb;
550   }
551   if (bytecnt != SIZE_T_MAX && bytecnt != off)
552   {
553     fprintf(stderr,
554             "Transporter::checksum_state::computev() failed : "
555             "bytecnt %llu off %llu\n",
556             Uint64(bytecnt),
557             Uint64(off));
558     ok = false;
559   }
560   return ok;
561 }
562 
563 #endif // Define of Transporter_H
564