1 /*
2 Copyright (c) 2003, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 //****************************************************************************
26 //
27 // NAME
28 // TransporterRegistry
29 //
30 // DESCRIPTION
31 // TransporterRegistry (singelton) is the interface to the
32 // transporter layer. It handles transporter states and
33 // holds the transporter arrays.
34 //
35 //***************************************************************************/
36 #ifndef TransporterRegistry_H
37 #define TransporterRegistry_H
38
39 #if defined(HAVE_EPOLL_CREATE)
40 #include <sys/epoll.h>
41 #endif
42 #include "TransporterDefinitions.hpp"
43 #include <SocketServer.hpp>
44 #include <SocketClient.hpp>
45
46 #include <NdbTCP.h>
47
48 #include <mgmapi/mgmapi.h>
49
50 #include <NodeBitmask.hpp>
51 #include <NdbMutex.h>
52
53 // A transporter is always in an IOState.
54 // NoHalt is used initially and as long as it is no restrictions on
55 // sending or receiving.
56 enum IOState {
57 NoHalt = 0,
58 HaltInput = 1,
59 HaltOutput = 2,
60 HaltIO = 3
61 };
62
63
64 static const char *performStateString[] =
65 { "is connected",
66 "is trying to connect",
67 "does nothing",
68 "is trying to disconnect" };
69
70 class Transporter;
71 class TCP_Transporter;
72 class SHM_Transporter;
73 class Multi_Transporter;
74
75 class TransporterRegistry;
76 class SocketAuthenticator;
77
78 class TransporterService : public SocketServer::Service {
79 SocketAuthenticator * m_auth;
80 TransporterRegistry * m_transporter_registry;
81 public:
TransporterService(SocketAuthenticator * auth=0)82 TransporterService(SocketAuthenticator *auth= 0)
83 {
84 m_auth= auth;
85 m_transporter_registry= 0;
86 }
setTransporterRegistry(TransporterRegistry * t)87 void setTransporterRegistry(TransporterRegistry *t)
88 {
89 m_transporter_registry= t;
90 }
91 SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
92 };
93
94 /**
95 * TransporterReceiveData
96 *
97 * State for pollReceive/performReceive
98 * Moved into own class to enable multiple receive threads
99 */
100 struct TransporterReceiveData
101 {
102 TransporterReceiveData();
103 ~TransporterReceiveData();
104
105 bool init (unsigned maxTransporters);
106
107 /**
108 * Add a transporter to epoll_set
109 * does nothing if epoll not active
110 */
111 bool epoll_add(Transporter*);
112
113 /**
114 * Bitmask of transporters currently handled by this instance
115 */
116 TrpBitmask m_transporters;
117
118 /**
119 * Bitmask of transporters having data awaiting to be received
120 * from its transporter.
121 */
122 TrpBitmask m_recv_transporters;
123
124 /**
125 * Bitmask of transporters that has already received data buffered
126 * inside its transporter. Possibly "carried over" from last
127 * performReceive
128 */
129 TrpBitmask m_has_data_transporters;
130
131 /**
132 * Subset of m_has_data_transporters which we completed handling
133 * of in previous ::performReceive before we was interrupted due
134 * to lack of job buffers. Will skip these when we later retry
135 * ::performReceive in order to avoid starvation of non-handled
136 * transporters.
137 */
138 TrpBitmask m_handled_transporters;
139
140 /**
141 * Bitmask of transporters having received corrupted or unsupported
142 * message. No more unpacking and delivery of messages allowed.
143 */
144 NodeBitmask m_bad_data_transporters;
145
146 /**
147 * Last node received from if unable to complete all transporters
148 * in previous ::performReceive(). Next ::performReceive will
149 * resume from first transporter after this.
150 */
151 Uint32 m_last_trp_id;
152
153 /**
154 * Spintime calculated as maximum of currently connected transporters.
155 * Only applies to shared memory transporters.
156 */
157 Uint32 m_spintime;
158
159 /**
160 * Total spintime
161 */
162 Uint32 m_total_spintime;
163
164 #if defined(HAVE_EPOLL_CREATE)
165 int m_epoll_fd;
166 struct epoll_event *m_epoll_events;
167 bool change_epoll(TCP_Transporter *t, bool add);
168 #endif
169
170 /**
171 * Used in polling if exists TCP_Transporter
172 */
173 ndb_socket_poller m_socket_poller;
174 };
175
176 #include "TransporterCallback.hpp"
177
178 /**
179 * @class TransporterRegistry
180 * @brief ...
181 */
182 class TransporterRegistry
183 {
184 friend class SHM_Transporter;
185 friend class SHM_Writer;
186 friend class Transporter;
187 friend class TransporterService;
188 public:
189 /**
190 * Constructor
191 */
192 TransporterRegistry(TransporterCallback *callback,
193 TransporterReceiveHandle * receiveHandle,
194 unsigned maxTransporters = MAX_NTRANSPORTERS);
195
196 /**
197 * this handle will be used in the client connect thread
198 * to fetch information on dynamic ports. The old handle
199 * (if set) is destroyed, and this is destroyed by the destructor
200 */
201 void set_mgm_handle(NdbMgmHandle h);
get_mgm_handle(void)202 NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; }
203
204 bool init(NodeId localNodeId);
205
206 /**
207 * Iff using non-default TransporterReceiveHandle's
208 * they need to get initalized
209 */
210 bool init(TransporterReceiveHandle&);
211
212 /**
213 Perform handshaking of a client connection to accept it
214 as transporter.
215
216 @note Connection should be closed by caller if function
217 returns false
218
219 @param sockfd the socket to handshake
220 @param msg error message describing why handshake failed,
221 to be filled in when function return
222 @param close_with_reset allows the function to indicate to the caller
223 how the socket should be closed when function
224 returns false
225 @param log_failure whether a failure to connect is log-worthy
226
227 @returns false on failure and true on success
228 */
229 bool connect_server(NDB_SOCKET_TYPE sockfd,
230 BaseString& msg,
231 bool& close_with_reset,
232 bool& log_failure);
233
234 bool connect_client(NdbMgmHandle *h);
235
236 /**
237 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
238 * and returns the socket.
239 */
240 NDB_SOCKET_TYPE connect_ndb_mgmd(const char* server_name,
241 unsigned short server_port);
242
243 /**
244 * Given a connected NdbMgmHandle, turns it into a transporter
245 * and returns the socket.
246 */
247 NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
248
249 /**
250 * Manage allTransporters and theNodeIdTransporters when using
251 * Multi_Transporter changes. There is a mutex protecting changes
252 * to those data structures.
253 */
254 void lockMultiTransporters();
255 void unlockMultiTransporters();
256 void insert_allTransporters(Transporter*);
257 void remove_allTransporters(Transporter*);
258 void insert_node_transporter(NodeId, Transporter*);
259 bool isMultiTransporter(Transporter*);
260 void switch_active_trp(Multi_Transporter*);
261 Uint32 get_num_active_transporters(Multi_Transporter*);
262 private:
263
264 NdbMutex *theMultiTransporterMutex;
265 /**
266 * Report the dynamically allocated ports to ndb_mgmd so that clients
267 * which want to connect to ndbd can ask ndb_mgmd which port to use.
268 */
269 bool report_dynamic_ports(NdbMgmHandle h) const;
270
271 /**
272 * Remove all transporters
273 */
274 void removeAll();
275
276 /**
277 * Disconnect all transporters
278 */
279 void disconnectAll();
280
281 /**
282 * Reset awake state on shared memory transporters before sleep.
283 */
284 int reset_shm_awake_state(TransporterReceiveHandle& recvdata,
285 bool& sleep_state_set);
286
287 /**
288 * Set awake state on shared memory transporters after sleep.
289 */
290 void set_shm_awake_state(TransporterReceiveHandle& recvdata);
291
292 public:
293
294 /**
295 * Stops the server, disconnects all the transporter
296 * and deletes them and remove it from the transporter arrays
297 */
298 virtual ~TransporterRegistry();
299
300 bool start_service(SocketServer& server);
301 struct NdbThread* start_clients();
302 bool stop_clients();
303 void start_clients_thread();
304
305 /**
306 * Start/Stop receiving
307 */
308 void startReceiving();
309 void stopReceiving();
310
311 /**
312 * Start/Stop sending
313 */
314 void startSending();
315 void stopSending();
316
317 // A transporter is always in a PerformState.
318 // PerformIO is used initially and as long as any of the events
319 // PerformConnect, ...
320 enum PerformState {
321 CONNECTED = 0,
322 CONNECTING = 1,
323 DISCONNECTED = 2,
324 DISCONNECTING = 3
325 };
getPerformStateString(NodeId nodeId) const326 const char *getPerformStateString(NodeId nodeId) const
327 { return performStateString[(unsigned)performStates[nodeId]]; }
328
getPerformState(NodeId nodeId) const329 PerformState getPerformState(NodeId nodeId) const { return performStates[nodeId]; }
330
331 /**
332 * Get and set methods for PerformState
333 */
334 void do_connect(NodeId node_id);
335 /**
336 * do_disconnect can be issued both from send and recv, it is possible to
337 * specify from where it is called in send_source parameter, this enables
338 * us to provide more detailed information for disconnects.
339 */
340 bool do_disconnect(NodeId node_id, int errnum = 0, bool send_source = true);
is_connected(NodeId node_id) const341 bool is_connected(NodeId node_id) const {
342 return performStates[node_id] == CONNECTED;
343 }
344 private:
345 void report_connect(TransporterReceiveHandle&, NodeId node_id);
346 void report_disconnect(TransporterReceiveHandle&, NodeId node_id, int errnum);
347 void report_error(NodeId nodeId, TransporterError errorCode,
348 const char *errorInfo = 0);
349 void dump_and_report_bad_message(const char file[], unsigned line,
350 TransporterReceiveHandle & recvHandle,
351 Uint32 * readPtr,
352 size_t sizeOfData,
353 NodeId remoteNodeId,
354 IOState state,
355 TransporterError errorCode);
356 public:
357
358 /**
359 * Get and set methods for IOState
360 */
361 IOState ioState(NodeId nodeId) const;
362 void setIOState(NodeId nodeId, IOState state);
363
364 /**
365 * Methods to handle backoff of connection attempts when attempt fails
366 */
367 public:
368 void indicate_node_up(NodeId nodeId);
369 void set_connect_backoff_max_time_in_ms(Uint32 max_time_in_ms);
370 private:
371 Uint32 get_connect_backoff_max_time_in_laps() const;
372 bool get_and_clear_node_up_indicator(NodeId nodeId);
373 void backoff_reset_connecting_time(NodeId nodeId);
374 bool backoff_update_and_check_time_for_connect(NodeId nodeId);
375
376 private:
377
378 bool createTCPTransporter(TransporterConfiguration * config);
379 bool createSHMTransporter(TransporterConfiguration * config);
380
381 public:
382 bool createMultiTransporter(Uint32 node_id, Uint32 num_trps);
383 /**
384 * configureTransporter
385 *
386 * Configure a transporter, ie. create new if it
387 * does not exist otherwise try to reconfigure it
388 *
389 */
390 bool configureTransporter(TransporterConfiguration * config);
391
392 /**
393 * Get sum of max send buffer over all transporters, to be used as a default
394 * for allocate_send_buffers eg.
395 *
396 * Must be called after creating all transporters for returned value to be
397 * correct.
398 */
get_total_max_send_buffer()399 Uint64 get_total_max_send_buffer() {
400 DBUG_ASSERT(m_total_max_send_buffer > 0);
401 return m_total_max_send_buffer;
402 }
403
404 /**
405 * Get transporter's connect count
406 */
407 Uint32 get_connect_count(Uint32 nodeId);
408
409 /**
410 * Set or clear overloaded bit.
411 * Query if any overloaded bit is set.
412 */
413 void set_status_overloaded(Uint32 nodeId, bool val);
414 const NodeBitmask& get_status_overloaded() const;
415
416 /**
417 * Get transporter's overload count since connect
418 */
419 Uint32 get_overload_count(Uint32 nodeId);
420
421 /**
422 * Set or clear slowdown bit.
423 * Query if any slowdown bit is set.
424 */
425 void set_status_slowdown(Uint32 nodeId, bool val);
426 const NodeBitmask& get_status_slowdown() const;
427
428 /**
429 * Get transporter's slowdown count since connect
430 */
431 Uint32 get_slowdown_count(Uint32 nodeId);
432
433 /**
434 * prepareSend
435 *
436 * When IOState is HaltOutput or HaltIO do not send or insert any
437 * signals in the SendBuffer, unless it is intended for the remote
438 * QMGR block (blockno 252)
439 * Perform prepareSend on the transporter.
440 *
441 * NOTE signalHeader->xxxBlockRef should contain block numbers and
442 * not references
443 */
444
445 private:
446 template <typename AnySectionArg>
447 SendStatus prepareSendTemplate(
448 TransporterSendBufferHandle *sendHandle,
449 const SignalHeader *signalHeader,
450 Uint8 prio,
451 const Uint32 *signalData,
452 NodeId nodeId,
453 TrpId &trp_id,
454 AnySectionArg section);
455
456
457 public:
458 SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
459 const SignalHeader *signalHeader,
460 Uint8 prio,
461 const Uint32 *signalData,
462 NodeId nodeId,
463 TrpId &trp_id,
464 const LinearSectionPtr ptr[3]);
465
466 SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
467 const SignalHeader *signalHeader,
468 Uint8 prio,
469 const Uint32 *signalData,
470 NodeId nodeId,
471 TrpId &trp_id,
472 class SectionSegmentPool & pool,
473 const SegmentedSectionPtr ptr[3]);
474
475 SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
476 const SignalHeader *signalHeader,
477 Uint8 prio,
478 const Uint32 *signalData,
479 NodeId nodeId,
480 const GenericSectionPtr ptr[3]);
481
482 /* Send on a specific transporter */
483 bool performSend(TrpId id, bool need_wakeup = true);
484 /* performSendNode is only used from NDB API */
485 bool performSendNode(NodeId nodeId, bool need_wakeup = true);
486 void performSend();
487
488 void printState();
489
490 class Transporter_interface {
491 public:
492 NodeId m_remote_nodeId;
493 int m_s_service_port; // signed port number
494 const char *m_interface;
495 };
496 Vector<Transporter_interface> m_transporter_interface;
497 void add_transporter_interface(NodeId remoteNodeId, const char *interf,
498 int s_port); // signed port. <0 is dynamic
499
500 int get_transporter_count() const;
501 Transporter* get_transporter(TrpId id) const;
502 Transporter* get_node_transporter(NodeId nodeId) const;
503 bool is_shm_transporter(NodeId nodeId);
504 struct in_addr get_connect_address(NodeId node_id) const;
505
506 Uint64 get_bytes_sent(NodeId nodeId) const;
507 Uint64 get_bytes_received(NodeId nodeId) const;
508
509 Uint32 get_num_multi_transporters();
510 Multi_Transporter* get_multi_transporter(Uint32 index);
511 Multi_Transporter* get_node_multi_transporter(NodeId node_id);
512
513 private:
514 TransporterCallback *const callbackObj;
515 TransporterReceiveHandle *const receiveHandle;
516
517 NdbMgmHandle m_mgm_handle;
518
519 struct NdbThread *m_start_clients_thread;
520 bool m_run_start_clients_thread;
521
522 int sendCounter;
523 NodeId localNodeId;
524 unsigned maxTransporters;
525 Uint32 nTransporters;
526 Uint32 nMultiTransporters;
527 Uint32 nTCPTransporters;
528 Uint32 nSHMTransporters;
529
530 #ifdef ERROR_INSERT
531 NodeBitmask m_blocked;
532 TrpBitmask m_blocked_trp;
533 NodeBitmask m_blocked_disconnected;
534 int m_disconnect_errors[MAX_NTRANSPORTERS];
535
536 NodeBitmask m_sendBlocked;
537
538 Uint32 m_mixology_level;
539 #endif
540
541 /**
542 * Arrays holding all transporters in the order they are created
543 */
544 Transporter** allTransporters;
545 Multi_Transporter** theMultiTransporters;
546 TCP_Transporter** theTCPTransporters;
547 SHM_Transporter** theSHMTransporters;
548
549 /**
550 * Array, indexed by nodeId, holding all transporters
551 */
552 TransporterType* theTransporterTypes;
553 Transporter** theNodeIdTransporters;
554
555 /**
556 * State arrays, index by host id
557 */
558 PerformState* performStates;
559 int* m_disconnect_errnum;
560 Uint32* m_disconnect_enomem_error;
561 IOState* ioStates;
562 struct ErrorState {
563 TransporterError m_code;
564 const char *m_info;
565 };
566 struct ErrorState *m_error_states;
567
568 /**
569 * peerUpIndicators[nodeId] is set by receiver thread
570 * to indicate that node is probable up.
571 * It is read and cleared by start clients thread.
572 */
573 volatile bool* peerUpIndicators;
574
575 /**
576 * Count of how long time one have been attempting to
577 * connect to node nodeId, in units of 100ms.
578 */
579 Uint32* connectingTime;
580
581 /**
582 * The current maximal time between connection attempts to a
583 * node in units of 100ms.
584 * Updated by receive thread, read by start clients thread
585 */
586 volatile Uint32 connectBackoffMaxTime;
587
588 /**
589 * Overloaded bits, for fast check.
590 * Similarly slowdown bits for fast check.
591 */
592 NodeBitmask m_status_overloaded;
593 NodeBitmask m_status_slowdown;
594
595 /**
596 * Unpack signal data.
597 *
598 * Defined in Packer.cpp.
599 */
600 Uint32 unpack(TransporterReceiveHandle&,
601 Uint32 * readPtr,
602 Uint32 bufferSize,
603 NodeId remoteNodeId,
604 IOState state,
605 bool & stopReceiving);
606
607 Uint32 * unpack(TransporterReceiveHandle&,
608 Uint32 * readPtr,
609 Uint32 * eodPtr,
610 Uint32 * endPtr,
611 NodeId remoteNodeId,
612 IOState state,
613 bool & stopReceiving);
614
615 static Uint32 unpack_length_words(const Uint32 *readPtr,
616 Uint32 maxWords,
617 bool extra_signal);
618
619 Uint32 poll_TCP(Uint32 timeOutMillis, TransporterReceiveHandle&);
620 Uint32 poll_SHM(TransporterReceiveHandle&, bool &any_connected);
621 Uint32 poll_SHM(TransporterReceiveHandle&,
622 NDB_TICKS start_time,
623 Uint32 micros_to_poll);
624 Uint32 check_TCP(TransporterReceiveHandle&, Uint32 timeoutMillis);
625 Uint32 spin_check_transporters(TransporterReceiveHandle&);
626
627 int m_shm_own_pid;
628 Uint32 m_transp_count;
629
630 public:
631 bool setup_wakeup_socket(TransporterReceiveHandle&);
632 void wakeup();
633
setup_wakeup_socket()634 inline bool setup_wakeup_socket() {
635 assert(receiveHandle != 0);
636 return setup_wakeup_socket(* receiveHandle);
637 }
638 private:
639 bool m_has_extra_wakeup_socket;
640 NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
641 void consume_extra_sockets();
642
643 Uint32 *getWritePtr(TransporterSendBufferHandle *handle,
644 Transporter*,
645 Uint32 trp_id,
646 Uint32 lenBytes,
647 Uint32 prio,
648 SendStatus *error);
649 void updateWritePtr(TransporterSendBufferHandle *handle,
650 Transporter*,
651 Uint32 trp_id,
652 Uint32 lenBytes,
653 Uint32 prio);
654
655 public:
656 /* Various internal */
657 void inc_overload_count(Uint32 nodeId);
658 void inc_slowdown_count(Uint32 nodeId);
659
660 void get_trps_for_node(Uint32 nodeId,
661 TrpId *trp_ids,
662 Uint32 &num_trp_ids,
663 Uint32 max_trp_ids);
664
665 Uint32 get_num_trps();
666 private:
667 /**
668 * Sum of max transporter memory for each transporter.
669 * Used to compute default send buffer size.
670 */
671 Uint64 m_total_max_send_buffer;
672
673 public:
674 /**
675 * Receiving
676 */
677 Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
678 Uint32 performReceive(TransporterReceiveHandle&, Uint32 receive_thread_idx);
679 Uint32 update_connections(TransporterReceiveHandle&,
680 Uint32 max_spintime = UINT32_MAX);
681
pollReceive(Uint32 timeOutMillis)682 inline Uint32 pollReceive(Uint32 timeOutMillis) {
683 assert(receiveHandle != 0);
684 return pollReceive(timeOutMillis, * receiveHandle);
685 }
686
performReceive()687 inline Uint32 performReceive() {
688 assert(receiveHandle != 0);
689 return performReceive(* receiveHandle, 0);
690 }
691
update_connections()692 inline void update_connections() {
693 assert(receiveHandle != 0);
694 update_connections(* receiveHandle);
695 }
get_total_spintime()696 inline Uint32 get_total_spintime()
697 {
698 assert(receiveHandle != 0);
699 return receiveHandle->m_total_spintime;
700 }
reset_total_spintime()701 inline void reset_total_spintime()
702 {
703 assert(receiveHandle != 0);
704 receiveHandle->m_total_spintime = 0;
705 }
706
707 TrpId getTransporterIndex(Transporter* t);
708 void set_recv_thread_idx(Transporter* t, Uint32 recv_thread_idx);
709
710 #ifdef ERROR_INSERT
711 /* Utils for testing latency issues */
712 bool isBlocked(NodeId nodeId);
713 void blockReceive(TransporterReceiveHandle&, NodeId nodeId);
714 void unblockReceive(TransporterReceiveHandle&, NodeId nodeId);
715 bool isSendBlocked(NodeId nodeId) const;
716 void blockSend(TransporterReceiveHandle& recvdata,
717 NodeId nodeId);
718 void unblockSend(TransporterReceiveHandle& recvdata,
719 NodeId nodeId);
720
721 /* Testing interleaving of signal processing */
722 Uint32 getMixologyLevel() const;
723 void setMixologyLevel(Uint32 l);
724 #endif
725 };
726
727 inline Uint32
get_num_trps()728 TransporterRegistry::get_num_trps()
729 {
730 return nTransporters;
731 }
732
733 inline void
set_status_overloaded(Uint32 nodeId,bool val)734 TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
735 {
736 assert(nodeId < MAX_NODES);
737 if (val != m_status_overloaded.get(nodeId))
738 {
739 m_status_overloaded.set(nodeId, val);
740 if (val)
741 inc_overload_count(nodeId);
742 }
743 if (val)
744 set_status_slowdown(nodeId, val);
745 }
746
747 inline const NodeBitmask&
get_status_overloaded() const748 TransporterRegistry::get_status_overloaded() const
749 {
750 return m_status_overloaded;
751 }
752
753 inline void
set_status_slowdown(Uint32 nodeId,bool val)754 TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
755 {
756 assert(nodeId < MAX_NODES);
757 if (val != m_status_slowdown.get(nodeId))
758 {
759 m_status_slowdown.set(nodeId, val);
760 if (val)
761 inc_slowdown_count(nodeId);
762 }
763 }
764
765 inline const NodeBitmask&
get_status_slowdown() const766 TransporterRegistry::get_status_slowdown() const
767 {
768 return m_status_slowdown;
769 }
770
771 inline void
indicate_node_up(NodeId nodeId)772 TransporterRegistry::indicate_node_up(NodeId nodeId) // Called from receive thread
773 {
774 assert(nodeId < MAX_NODES);
775
776 if (!peerUpIndicators[nodeId])
777 {
778 peerUpIndicators[nodeId] = true;
779 }
780 }
781
782 inline bool
get_and_clear_node_up_indicator(NodeId nodeId)783 TransporterRegistry::get_and_clear_node_up_indicator(NodeId nodeId) // Called from start client thread
784 {
785 assert(nodeId < MAX_NODES);
786
787 bool indicator = peerUpIndicators[nodeId];
788 if (indicator)
789 {
790 peerUpIndicators[nodeId] = false;
791 }
792 return indicator;
793 }
794
795 inline Uint32
get_connect_backoff_max_time_in_laps() const796 TransporterRegistry::get_connect_backoff_max_time_in_laps() const
797 { /* one lap, 100 ms */
798 return connectBackoffMaxTime;
799 }
800
801 inline void
set_connect_backoff_max_time_in_ms(Uint32 backoff_max_time_in_ms)802 TransporterRegistry::set_connect_backoff_max_time_in_ms(Uint32 backoff_max_time_in_ms)
803 {
804 /**
805 * Round up backoff_max_time to nearest higher 100ms, since that is lap time
806 * in start_client_threads using this function.
807 */
808 connectBackoffMaxTime = (backoff_max_time_in_ms + 99) / 100;
809 }
810
811 inline void
backoff_reset_connecting_time(NodeId nodeId)812 TransporterRegistry::backoff_reset_connecting_time(NodeId nodeId)
813 {
814 assert(nodeId < MAX_NODES);
815
816 connectingTime[nodeId] = 0;
817 }
818
819 inline bool
backoff_update_and_check_time_for_connect(NodeId nodeId)820 TransporterRegistry::backoff_update_and_check_time_for_connect(NodeId nodeId)
821 {
822 assert(nodeId < MAX_NODES);
823
824 Uint32 backoff_max_time = get_connect_backoff_max_time_in_laps();
825
826 if (backoff_max_time == 0)
827 {
828 // Backoff disabled
829 return true;
830 }
831
832 connectingTime[nodeId] ++;
833
834 if (connectingTime[nodeId] >= backoff_max_time)
835 {
836 return (connectingTime[nodeId] % backoff_max_time == 0);
837 }
838
839 /**
840 * Attempt moments from start of connecting.
841 * This function is called from start_clients_thread
842 * roughly every 100ms for each node it is connecting
843 * to.
844 */
845 static const Uint16 attempt_moments[] = {1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
846 static const int attempt_moments_count = sizeof(attempt_moments) / sizeof(attempt_moments[0]);
847 for(int i = 0; i < attempt_moments_count; i ++)
848 {
849 if (connectingTime[nodeId] == attempt_moments[i])
850 {
851 return true;
852 }
853 else if (connectingTime[nodeId] < attempt_moments[i])
854 {
855 return false;
856 }
857 }
858 return (connectingTime[nodeId] % attempt_moments[attempt_moments_count - 1] == 0);
859 }
860
861 /**
862 * A function used to calculate a send buffer level given the size of the node
863 * send buffer and the total send buffer size for all nodes and the total send
864 * buffer used for all nodes. There is also a thread parameter that specifies
865 * the number of threads used (this is 0 except for ndbmtd).
866 */
867 void calculate_send_buffer_level(Uint64 node_send_buffer_size,
868 Uint64 total_send_buffer_size,
869 Uint64 total_used_send_buffer_size,
870 Uint32 num_threads,
871 SB_LevelType &level);
872 #endif // Define of TransporterRegistry_H
873