1 /*
2    Copyright (c) 2003, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 //****************************************************************************
26 //
27 //  NAME
28 //      TransporterRegistry
29 //
30 //  DESCRIPTION
31 //      TransporterRegistry (singelton) is the interface to the
32 //      transporter layer. It handles transporter states and
33 //      holds the transporter arrays.
34 //
35 //***************************************************************************/
36 #ifndef TransporterRegistry_H
37 #define TransporterRegistry_H
38 
39 #if defined(HAVE_EPOLL_CREATE)
40 #include <sys/epoll.h>
41 #endif
42 #include "TransporterDefinitions.hpp"
43 #include <SocketServer.hpp>
44 #include <SocketClient.hpp>
45 
46 #include <NdbTCP.h>
47 
48 #include <mgmapi/mgmapi.h>
49 
50 #include <NodeBitmask.hpp>
51 #include <NdbMutex.h>
52 
53 // A transporter is always in an IOState.
54 // NoHalt is used initially and as long as it is no restrictions on
55 // sending or receiving.
56 enum IOState {
57   NoHalt     = 0,
58   HaltInput  = 1,
59   HaltOutput = 2,
60   HaltIO     = 3
61 };
62 
63 
64 static const char *performStateString[] =
65   { "is connected",
66     "is trying to connect",
67     "does nothing",
68     "is trying to disconnect" };
69 
70 class Transporter;
71 class TCP_Transporter;
72 class SHM_Transporter;
73 class Multi_Transporter;
74 
75 class TransporterRegistry;
76 class SocketAuthenticator;
77 
78 class TransporterService : public SocketServer::Service {
79   SocketAuthenticator * m_auth;
80   TransporterRegistry * m_transporter_registry;
81 public:
TransporterService(SocketAuthenticator * auth=0)82   TransporterService(SocketAuthenticator *auth= 0)
83   {
84     m_auth= auth;
85     m_transporter_registry= 0;
86   }
setTransporterRegistry(TransporterRegistry * t)87   void setTransporterRegistry(TransporterRegistry *t)
88   {
89     m_transporter_registry= t;
90   }
91   SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
92 };
93 
94 /**
95  * TransporterReceiveData
96  *
97  *   State for pollReceive/performReceive
98  *   Moved into own class to enable multiple receive threads
99  */
100 struct TransporterReceiveData
101 {
102   TransporterReceiveData();
103   ~TransporterReceiveData();
104 
105   bool init (unsigned maxTransporters);
106 
107   /**
108    * Add a transporter to epoll_set
109    *   does nothing if epoll not active
110    */
111   bool epoll_add(Transporter*);
112 
113   /**
114    * Bitmask of transporters currently handled by this instance
115    */
116   TrpBitmask m_transporters;
117 
118   /**
119    * Bitmask of transporters having data awaiting to be received
120    * from its transporter.
121    */
122   TrpBitmask m_recv_transporters;
123 
124   /**
125    * Bitmask of transporters that has already received data buffered
126    * inside its transporter. Possibly "carried over" from last
127    * performReceive
128    */
129   TrpBitmask m_has_data_transporters;
130 
131   /**
132    * Subset of m_has_data_transporters which we completed handling
133    * of in previous ::performReceive before we was interrupted due
134    * to lack of job buffers. Will skip these when we later retry
135    * ::performReceive in order to avoid starvation of non-handled
136    * transporters.
137    */
138   TrpBitmask m_handled_transporters;
139 
140   /**
141    * Bitmask of transporters having received corrupted or unsupported
142    * message. No more unpacking and delivery of messages allowed.
143    */
144   NodeBitmask m_bad_data_transporters;
145 
146   /**
147    * Last node received from if unable to complete all transporters
148    * in previous ::performReceive(). Next ::performReceive will
149    * resume from first transporter after this.
150    */
151   Uint32 m_last_trp_id;
152 
153   /**
154    * Spintime calculated as maximum of currently connected transporters.
155    * Only applies to shared memory transporters.
156    */
157   Uint32 m_spintime;
158 
159   /**
160    * Total spintime
161    */
162   Uint32 m_total_spintime;
163 
164 #if defined(HAVE_EPOLL_CREATE)
165   int m_epoll_fd;
166   struct epoll_event *m_epoll_events;
167   bool change_epoll(TCP_Transporter *t, bool add);
168 #endif
169 
170   /**
171    * Used in polling if exists TCP_Transporter
172    */
173   ndb_socket_poller m_socket_poller;
174 };
175 
176 #include "TransporterCallback.hpp"
177 
178 /**
179  * @class TransporterRegistry
180  * @brief ...
181  */
182 class TransporterRegistry
183 {
184   friend class SHM_Transporter;
185   friend class SHM_Writer;
186   friend class Transporter;
187   friend class TransporterService;
188 public:
189  /**
190   * Constructor
191   */
192   TransporterRegistry(TransporterCallback *callback,
193                       TransporterReceiveHandle * receiveHandle,
194 		      unsigned maxTransporters = MAX_NTRANSPORTERS);
195 
196   /**
197    * this handle will be used in the client connect thread
198    * to fetch information on dynamic ports.  The old handle
199    * (if set) is destroyed, and this is destroyed by the destructor
200    */
201   void set_mgm_handle(NdbMgmHandle h);
get_mgm_handle(void)202   NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; }
203 
204   bool init(NodeId localNodeId);
205 
206   /**
207    * Iff using non-default TransporterReceiveHandle's
208    *   they need to get initalized
209    */
210   bool init(TransporterReceiveHandle&);
211 
212   /**
213      Perform handshaking of a client connection to accept it
214      as transporter.
215 
216      @note Connection should be closed by caller if function
217      returns false
218 
219      @param sockfd           the socket to handshake
220      @param msg              error message describing why handshake failed,
221                              to be filled in when function return
222      @param close_with_reset allows the function to indicate to the caller
223                              how the socket should be closed when function
224                              returns false
225      @param log_failure      whether a failure to connect is log-worthy
226 
227      @returns false on failure and true on success
228   */
229   bool connect_server(NDB_SOCKET_TYPE sockfd,
230                       BaseString& msg,
231                       bool& close_with_reset,
232                       bool& log_failure);
233 
234   bool connect_client(NdbMgmHandle *h);
235 
236   /**
237    * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
238    * and returns the socket.
239    */
240   NDB_SOCKET_TYPE connect_ndb_mgmd(const char* server_name,
241                                    unsigned short server_port);
242 
243   /**
244    * Given a connected NdbMgmHandle, turns it into a transporter
245    * and returns the socket.
246    */
247   NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
248 
249   /**
250    * Manage allTransporters and theNodeIdTransporters when using
251    * Multi_Transporter changes. There is a mutex protecting changes
252    * to those data structures.
253    */
254   void lockMultiTransporters();
255   void unlockMultiTransporters();
256   void insert_allTransporters(Transporter*);
257   void remove_allTransporters(Transporter*);
258   void insert_node_transporter(NodeId, Transporter*);
259   bool isMultiTransporter(Transporter*);
260   void switch_active_trp(Multi_Transporter*);
261   Uint32 get_num_active_transporters(Multi_Transporter*);
262 private:
263 
264   NdbMutex *theMultiTransporterMutex;
265   /**
266    * Report the dynamically allocated ports to ndb_mgmd so that clients
267    * which want to connect to ndbd can ask ndb_mgmd which port to use.
268    */
269   bool report_dynamic_ports(NdbMgmHandle h) const;
270 
271   /**
272    * Remove all transporters
273    */
274   void removeAll();
275 
276   /**
277    * Disconnect all transporters
278    */
279   void disconnectAll();
280 
281   /**
282    * Reset awake state on shared memory transporters before sleep.
283    */
284   int reset_shm_awake_state(TransporterReceiveHandle& recvdata,
285                             bool& sleep_state_set);
286 
287   /**
288    * Set awake state on shared memory transporters after sleep.
289    */
290   void set_shm_awake_state(TransporterReceiveHandle& recvdata);
291 
292 public:
293 
294   /**
295    * Stops the server, disconnects all the transporter
296    * and deletes them and remove it from the transporter arrays
297    */
298   virtual ~TransporterRegistry();
299 
300   bool start_service(SocketServer& server);
301   struct NdbThread* start_clients();
302   bool stop_clients();
303   void start_clients_thread();
304 
305   /**
306    * Start/Stop receiving
307    */
308   void startReceiving();
309   void stopReceiving();
310 
311   /**
312    * Start/Stop sending
313    */
314   void startSending();
315   void stopSending();
316 
317   // A transporter is always in a PerformState.
318   // PerformIO is used initially and as long as any of the events
319   // PerformConnect, ...
320   enum PerformState {
321     CONNECTED         = 0,
322     CONNECTING        = 1,
323     DISCONNECTED      = 2,
324     DISCONNECTING     = 3
325   };
getPerformStateString(NodeId nodeId) const326   const char *getPerformStateString(NodeId nodeId) const
327   { return performStateString[(unsigned)performStates[nodeId]]; }
328 
getPerformState(NodeId nodeId) const329   PerformState getPerformState(NodeId nodeId) const { return performStates[nodeId]; }
330 
331   /**
332    * Get and set methods for PerformState
333    */
334   void do_connect(NodeId node_id);
335   /**
336    * do_disconnect can be issued both from send and recv, it is possible to
337    * specify from where it is called in send_source parameter, this enables
338    * us to provide more detailed information for disconnects.
339    */
340   bool do_disconnect(NodeId node_id, int errnum = 0, bool send_source = true);
is_connected(NodeId node_id) const341   bool is_connected(NodeId node_id) const {
342     return performStates[node_id] == CONNECTED;
343   }
344 private:
345   void report_connect(TransporterReceiveHandle&, NodeId node_id);
346   void report_disconnect(TransporterReceiveHandle&, NodeId node_id, int errnum);
347   void report_error(NodeId nodeId, TransporterError errorCode,
348                     const char *errorInfo = 0);
349   void dump_and_report_bad_message(const char file[], unsigned line,
350                     TransporterReceiveHandle & recvHandle,
351                     Uint32 * readPtr,
352                     size_t sizeOfData,
353                     NodeId remoteNodeId,
354                     IOState state,
355                     TransporterError errorCode);
356 public:
357 
358   /**
359    * Get and set methods for IOState
360    */
361   IOState ioState(NodeId nodeId) const;
362   void setIOState(NodeId nodeId, IOState state);
363 
364   /**
365    * Methods to handle backoff of connection attempts when attempt fails
366    */
367 public:
368   void indicate_node_up(NodeId nodeId);
369   void set_connect_backoff_max_time_in_ms(Uint32 max_time_in_ms);
370 private:
371   Uint32 get_connect_backoff_max_time_in_laps() const;
372   bool get_and_clear_node_up_indicator(NodeId nodeId);
373   void backoff_reset_connecting_time(NodeId nodeId);
374   bool backoff_update_and_check_time_for_connect(NodeId nodeId);
375 
376 private:
377 
378   bool createTCPTransporter(TransporterConfiguration * config);
379   bool createSHMTransporter(TransporterConfiguration * config);
380 
381 public:
382   bool createMultiTransporter(Uint32 node_id, Uint32 num_trps);
383   /**
384    *   configureTransporter
385    *
386    *   Configure a transporter, ie. create new if it
387    *   does not exist otherwise try to reconfigure it
388    *
389    */
390   bool configureTransporter(TransporterConfiguration * config);
391 
392   /**
393    * Get sum of max send buffer over all transporters, to be used as a default
394    * for allocate_send_buffers eg.
395    *
396    * Must be called after creating all transporters for returned value to be
397    * correct.
398    */
get_total_max_send_buffer()399   Uint64 get_total_max_send_buffer() {
400     DBUG_ASSERT(m_total_max_send_buffer > 0);
401     return m_total_max_send_buffer;
402   }
403 
404   /**
405    * Get transporter's connect count
406    */
407   Uint32 get_connect_count(Uint32 nodeId);
408 
409   /**
410    * Set or clear overloaded bit.
411    * Query if any overloaded bit is set.
412    */
413   void set_status_overloaded(Uint32 nodeId, bool val);
414   const NodeBitmask& get_status_overloaded() const;
415 
416   /**
417    * Get transporter's overload count since connect
418    */
419   Uint32 get_overload_count(Uint32 nodeId);
420 
421   /**
422    * Set or clear slowdown bit.
423    * Query if any slowdown bit is set.
424    */
425   void set_status_slowdown(Uint32 nodeId, bool val);
426   const NodeBitmask& get_status_slowdown() const;
427 
428   /**
429    * Get transporter's slowdown count since connect
430    */
431   Uint32 get_slowdown_count(Uint32 nodeId);
432 
433   /**
434    * prepareSend
435    *
436    * When IOState is HaltOutput or HaltIO do not send or insert any
437    * signals in the SendBuffer, unless it is intended for the remote
438    * QMGR block (blockno 252)
439    * Perform prepareSend on the transporter.
440    *
441    * NOTE signalHeader->xxxBlockRef should contain block numbers and
442    *                                not references
443    */
444 
445 private:
446   template <typename AnySectionArg>
447   SendStatus prepareSendTemplate(
448                          TransporterSendBufferHandle *sendHandle,
449                          const SignalHeader *signalHeader,
450                          Uint8 prio,
451                          const Uint32 *signalData,
452                          NodeId nodeId,
453                          TrpId &trp_id,
454                          AnySectionArg section);
455 
456 
457 public:
458   SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
459                          const SignalHeader *signalHeader,
460                          Uint8 prio,
461                          const Uint32 *signalData,
462                          NodeId nodeId,
463                          TrpId &trp_id,
464                          const LinearSectionPtr ptr[3]);
465 
466   SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
467                          const SignalHeader *signalHeader,
468                          Uint8 prio,
469                          const Uint32 *signalData,
470                          NodeId nodeId,
471                          TrpId &trp_id,
472                          class SectionSegmentPool & pool,
473                          const SegmentedSectionPtr ptr[3]);
474 
475   SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
476                          const SignalHeader *signalHeader,
477                          Uint8 prio,
478                          const Uint32 *signalData,
479                          NodeId nodeId,
480                          const GenericSectionPtr ptr[3]);
481 
482   /* Send on a specific transporter */
483   bool performSend(TrpId id, bool need_wakeup = true);
484   /* performSendNode is only used from NDB API */
485   bool performSendNode(NodeId nodeId, bool need_wakeup = true);
486   void performSend();
487 
488   void printState();
489 
490   class Transporter_interface {
491   public:
492     NodeId m_remote_nodeId;
493     int m_s_service_port;			// signed port number
494     const char *m_interface;
495   };
496   Vector<Transporter_interface> m_transporter_interface;
497   void add_transporter_interface(NodeId remoteNodeId, const char *interf,
498 		  		 int s_port);	// signed port. <0 is dynamic
499 
500   int get_transporter_count() const;
501   Transporter* get_transporter(TrpId id) const;
502   Transporter* get_node_transporter(NodeId nodeId) const;
503   bool is_shm_transporter(NodeId nodeId);
504   struct in_addr get_connect_address(NodeId node_id) const;
505 
506   Uint64 get_bytes_sent(NodeId nodeId) const;
507   Uint64 get_bytes_received(NodeId nodeId) const;
508 
509   Uint32 get_num_multi_transporters();
510   Multi_Transporter* get_multi_transporter(Uint32 index);
511   Multi_Transporter* get_node_multi_transporter(NodeId node_id);
512 
513 private:
514   TransporterCallback *const callbackObj;
515   TransporterReceiveHandle *const receiveHandle;
516 
517   NdbMgmHandle m_mgm_handle;
518 
519   struct NdbThread   *m_start_clients_thread;
520   bool                m_run_start_clients_thread;
521 
522   int sendCounter;
523   NodeId localNodeId;
524   unsigned maxTransporters;
525   Uint32 nTransporters;
526   Uint32 nMultiTransporters;
527   Uint32 nTCPTransporters;
528   Uint32 nSHMTransporters;
529 
530 #ifdef ERROR_INSERT
531   NodeBitmask m_blocked;
532   TrpBitmask m_blocked_trp;
533   NodeBitmask m_blocked_disconnected;
534   int m_disconnect_errors[MAX_NTRANSPORTERS];
535 
536   NodeBitmask m_sendBlocked;
537 
538   Uint32 m_mixology_level;
539 #endif
540 
541   /**
542    * Arrays holding all transporters in the order they are created
543    */
544   Transporter**     allTransporters;
545   Multi_Transporter** theMultiTransporters;
546   TCP_Transporter** theTCPTransporters;
547   SHM_Transporter** theSHMTransporters;
548 
549   /**
550    * Array, indexed by nodeId, holding all transporters
551    */
552   TransporterType* theTransporterTypes;
553   Transporter**    theNodeIdTransporters;
554 
555   /**
556    * State arrays, index by host id
557    */
558   PerformState* performStates;
559   int*          m_disconnect_errnum;
560   Uint32*       m_disconnect_enomem_error;
561   IOState*      ioStates;
562   struct ErrorState {
563     TransporterError m_code;
564     const char *m_info;
565   };
566   struct ErrorState *m_error_states;
567 
568   /**
569    * peerUpIndicators[nodeId] is set by receiver thread
570    * to indicate that node is probable up.
571    * It is read and cleared by start clients thread.
572    */
573   volatile bool* peerUpIndicators;
574 
575   /**
576    * Count of how long time one have been attempting to
577    * connect to node nodeId, in units of 100ms.
578    */
579   Uint32*       connectingTime;
580 
581   /**
582    * The current maximal time between connection attempts to a
583    * node in units of 100ms.
584    * Updated by receive thread, read by start clients thread
585    */
586   volatile Uint32 connectBackoffMaxTime;
587 
588   /**
589    * Overloaded bits, for fast check.
590    * Similarly slowdown bits for fast check.
591    */
592   NodeBitmask m_status_overloaded;
593   NodeBitmask m_status_slowdown;
594 
595   /**
596    * Unpack signal data.
597    *
598    * Defined in Packer.cpp.
599    */
600   Uint32 unpack(TransporterReceiveHandle&,
601                 Uint32 * readPtr,
602                 Uint32 bufferSize,
603                 NodeId remoteNodeId,
604                 IOState state,
605 		bool & stopReceiving);
606 
607   Uint32 * unpack(TransporterReceiveHandle&,
608                   Uint32 * readPtr,
609                   Uint32 * eodPtr,
610                   Uint32 * endPtr,
611                   NodeId remoteNodeId,
612                   IOState state,
613 		  bool & stopReceiving);
614 
615   static Uint32 unpack_length_words(const Uint32 *readPtr,
616                                     Uint32 maxWords,
617                                     bool extra_signal);
618 
619   Uint32 poll_TCP(Uint32 timeOutMillis, TransporterReceiveHandle&);
620   Uint32 poll_SHM(TransporterReceiveHandle&, bool &any_connected);
621   Uint32 poll_SHM(TransporterReceiveHandle&,
622                   NDB_TICKS start_time,
623                   Uint32 micros_to_poll);
624   Uint32 check_TCP(TransporterReceiveHandle&, Uint32 timeoutMillis);
625   Uint32 spin_check_transporters(TransporterReceiveHandle&);
626 
627   int m_shm_own_pid;
628   Uint32 m_transp_count;
629 
630 public:
631   bool setup_wakeup_socket(TransporterReceiveHandle&);
632   void wakeup();
633 
setup_wakeup_socket()634   inline bool setup_wakeup_socket() {
635     assert(receiveHandle != 0);
636     return setup_wakeup_socket(* receiveHandle);
637   }
638 private:
639   bool m_has_extra_wakeup_socket;
640   NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
641   void consume_extra_sockets();
642 
643   Uint32 *getWritePtr(TransporterSendBufferHandle *handle,
644                       Transporter*,
645                       Uint32 trp_id,
646                       Uint32 lenBytes,
647                       Uint32 prio,
648                       SendStatus *error);
649   void updateWritePtr(TransporterSendBufferHandle *handle,
650                       Transporter*,
651                       Uint32 trp_id,
652                       Uint32 lenBytes,
653                       Uint32 prio);
654 
655 public:
656   /* Various internal */
657   void inc_overload_count(Uint32 nodeId);
658   void inc_slowdown_count(Uint32 nodeId);
659 
660   void get_trps_for_node(Uint32 nodeId,
661                          TrpId *trp_ids,
662                          Uint32 &num_trp_ids,
663                          Uint32 max_trp_ids);
664 
665   Uint32 get_num_trps();
666 private:
667   /**
668    * Sum of max transporter memory for each transporter.
669    * Used to compute default send buffer size.
670    */
671   Uint64 m_total_max_send_buffer;
672 
673 public:
674   /**
675    * Receiving
676    */
677   Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
678   Uint32 performReceive(TransporterReceiveHandle&, Uint32 receive_thread_idx);
679   Uint32 update_connections(TransporterReceiveHandle&,
680                           Uint32 max_spintime = UINT32_MAX);
681 
pollReceive(Uint32 timeOutMillis)682   inline Uint32 pollReceive(Uint32 timeOutMillis) {
683     assert(receiveHandle != 0);
684     return pollReceive(timeOutMillis, * receiveHandle);
685   }
686 
performReceive()687   inline Uint32 performReceive() {
688     assert(receiveHandle != 0);
689     return performReceive(* receiveHandle, 0);
690   }
691 
update_connections()692   inline void update_connections() {
693     assert(receiveHandle != 0);
694     update_connections(* receiveHandle);
695   }
get_total_spintime()696   inline Uint32 get_total_spintime()
697   {
698     assert(receiveHandle != 0);
699     return receiveHandle->m_total_spintime;
700   }
reset_total_spintime()701   inline void reset_total_spintime()
702   {
703     assert(receiveHandle != 0);
704     receiveHandle->m_total_spintime = 0;
705   }
706 
707   TrpId getTransporterIndex(Transporter* t);
708   void set_recv_thread_idx(Transporter* t, Uint32 recv_thread_idx);
709 
710 #ifdef ERROR_INSERT
711   /* Utils for testing latency issues */
712   bool isBlocked(NodeId nodeId);
713   void blockReceive(TransporterReceiveHandle&, NodeId nodeId);
714   void unblockReceive(TransporterReceiveHandle&, NodeId nodeId);
715   bool isSendBlocked(NodeId nodeId) const;
716   void blockSend(TransporterReceiveHandle& recvdata,
717                  NodeId nodeId);
718   void unblockSend(TransporterReceiveHandle& recvdata,
719                    NodeId nodeId);
720 
721   /* Testing interleaving of signal processing */
722   Uint32 getMixologyLevel() const;
723   void setMixologyLevel(Uint32 l);
724 #endif
725 };
726 
727 inline Uint32
get_num_trps()728 TransporterRegistry::get_num_trps()
729 {
730   return nTransporters;
731 }
732 
733 inline void
set_status_overloaded(Uint32 nodeId,bool val)734 TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
735 {
736   assert(nodeId < MAX_NODES);
737   if (val != m_status_overloaded.get(nodeId))
738   {
739     m_status_overloaded.set(nodeId, val);
740     if (val)
741       inc_overload_count(nodeId);
742   }
743   if (val)
744     set_status_slowdown(nodeId, val);
745 }
746 
747 inline const NodeBitmask&
get_status_overloaded() const748 TransporterRegistry::get_status_overloaded() const
749 {
750   return m_status_overloaded;
751 }
752 
753 inline void
set_status_slowdown(Uint32 nodeId,bool val)754 TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
755 {
756   assert(nodeId < MAX_NODES);
757   if (val != m_status_slowdown.get(nodeId))
758   {
759     m_status_slowdown.set(nodeId, val);
760     if (val)
761       inc_slowdown_count(nodeId);
762   }
763 }
764 
765 inline const NodeBitmask&
get_status_slowdown() const766 TransporterRegistry::get_status_slowdown() const
767 {
768   return m_status_slowdown;
769 }
770 
771 inline void
indicate_node_up(NodeId nodeId)772 TransporterRegistry::indicate_node_up(NodeId nodeId) // Called from receive thread
773 {
774   assert(nodeId < MAX_NODES);
775 
776   if (!peerUpIndicators[nodeId])
777   {
778     peerUpIndicators[nodeId] = true;
779   }
780 }
781 
782 inline bool
get_and_clear_node_up_indicator(NodeId nodeId)783 TransporterRegistry::get_and_clear_node_up_indicator(NodeId nodeId) // Called from start client thread
784 {
785   assert(nodeId < MAX_NODES);
786 
787   bool indicator = peerUpIndicators[nodeId];
788   if (indicator)
789   {
790     peerUpIndicators[nodeId] = false;
791   }
792   return indicator;
793 }
794 
795 inline Uint32
get_connect_backoff_max_time_in_laps() const796 TransporterRegistry::get_connect_backoff_max_time_in_laps() const
797 { /* one lap, 100 ms */
798   return connectBackoffMaxTime;
799 }
800 
801 inline void
set_connect_backoff_max_time_in_ms(Uint32 backoff_max_time_in_ms)802 TransporterRegistry::set_connect_backoff_max_time_in_ms(Uint32 backoff_max_time_in_ms)
803 {
804   /**
805    * Round up backoff_max_time to nearest higher 100ms, since that is lap time
806    * in start_client_threads using this function.
807    */
808   connectBackoffMaxTime = (backoff_max_time_in_ms + 99) / 100;
809 }
810 
811 inline void
backoff_reset_connecting_time(NodeId nodeId)812 TransporterRegistry::backoff_reset_connecting_time(NodeId nodeId)
813 {
814   assert(nodeId < MAX_NODES);
815 
816   connectingTime[nodeId] = 0;
817 }
818 
819 inline bool
backoff_update_and_check_time_for_connect(NodeId nodeId)820 TransporterRegistry::backoff_update_and_check_time_for_connect(NodeId nodeId)
821 {
822   assert(nodeId < MAX_NODES);
823 
824   Uint32 backoff_max_time = get_connect_backoff_max_time_in_laps();
825 
826   if (backoff_max_time == 0)
827   {
828     // Backoff disabled
829     return true;
830   }
831 
832   connectingTime[nodeId] ++;
833 
834   if (connectingTime[nodeId] >= backoff_max_time)
835   {
836     return (connectingTime[nodeId] % backoff_max_time == 0);
837   }
838 
839   /**
840    * Attempt moments from start of connecting.
841    * This function is called from start_clients_thread
842    * roughly every 100ms for each node it is connecting
843    * to.
844    */
845   static const Uint16 attempt_moments[] = {1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
846   static const int attempt_moments_count = sizeof(attempt_moments) / sizeof(attempt_moments[0]);
847   for(int i = 0; i < attempt_moments_count; i ++)
848   {
849     if (connectingTime[nodeId] == attempt_moments[i])
850     {
851       return true;
852     }
853     else if (connectingTime[nodeId] < attempt_moments[i])
854     {
855       return false;
856     }
857   }
858   return (connectingTime[nodeId] % attempt_moments[attempt_moments_count - 1] == 0);
859 }
860 
861 /**
862  * A function used to calculate a send buffer level given the size of the node
863  * send buffer and the total send buffer size for all nodes and the total send
864  * buffer used for all nodes. There is also a thread parameter that specifies
865  * the number of threads used (this is 0 except for ndbmtd).
866  */
867 void calculate_send_buffer_level(Uint64 node_send_buffer_size,
868                                  Uint64 total_send_buffer_size,
869                                  Uint64 total_used_send_buffer_size,
870                                  Uint32 num_threads,
871                                  SB_LevelType &level);
872 #endif // Define of TransporterRegistry_H
873