1 /*
2    Copyright (c) 2003, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include <TransporterRegistry.hpp>
28 #include "TransporterInternalDefinitions.hpp"
29 
30 #include "Transporter.hpp"
31 #include <SocketAuthenticator.hpp>
32 #include "BlockNumbers.h"
33 
34 #include "TCP_Transporter.hpp"
35 #include "Multi_Transporter.hpp"
36 #include "Loopback_Transporter.hpp"
37 
38 #ifndef WIN32
39 #include "SHM_Transporter.hpp"
40 #endif
41 
42 #include "NdbOut.hpp"
43 #include <NdbSleep.h>
44 #include <NdbMutex.h>
45 #include <NdbSpin.h>
46 #include <InputStream.hpp>
47 #include <OutputStream.hpp>
48 #include <socket_io.h>
49 
50 #include <mgmapi/mgmapi.h>
51 #include <mgmapi_internal.h>
52 #include <mgmapi/mgmapi_debug.h>
53 
54 #include <EventLogger.hpp>
55 extern EventLogger * g_eventLogger;
56 
57 #if 0
58 #define DEBUG_FPRINTF(arglist) do { fprintf arglist ; } while (0)
59 #else
60 #define DEBUG_FPRINTF(a)
61 #endif
62 
63 /**
64  * There is a requirement in the Transporter design that
65  * ::performReceive() and ::update_connections()
66  * on the same 'TransporterReceiveHandle' should not be
67  * run concurrently. class TransporterReceiveWatchdog provides a
68  * simple mechanism to assert that this rule is followed.
69  * Does nothing if NDEBUG is defined (in production code)
70  */
71 class TransporterReceiveWatchdog
72 {
73 public:
74 #ifdef NDEBUG
TransporterReceiveWatchdog(TransporterReceiveHandle & recvdata)75   TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
76   {}
77 
78 #else
79   TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
80     : m_recvdata(recvdata)
81   {
82     assert(m_recvdata.m_active == false);
83     m_recvdata.m_active = true;
84   }
85 
86   ~TransporterReceiveWatchdog()
87   {
88     assert(m_recvdata.m_active == true);
89     m_recvdata.m_active = false;
90   }
91 
92 private:
93   TransporterReceiveHandle& m_recvdata;
94 #endif
95 };
96 
97 
98 struct in_addr
get_connect_address(NodeId node_id) const99 TransporterRegistry::get_connect_address(NodeId node_id) const
100 {
101   return theNodeIdTransporters[node_id]->m_connect_address;
102 }
103 
104 Uint64
get_bytes_sent(NodeId node_id) const105 TransporterRegistry::get_bytes_sent(NodeId node_id) const
106 {
107   return theNodeIdTransporters[node_id]->get_bytes_sent();
108 }
109 
110 Uint64
get_bytes_received(NodeId node_id) const111 TransporterRegistry::get_bytes_received(NodeId node_id) const
112 {
113   return theNodeIdTransporters[node_id]->get_bytes_received();
114 }
115 
newSession(NDB_SOCKET_TYPE sockfd)116 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
117 {
118   DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
119   DEBUG_FPRINTF((stderr, "New session created\n"));
120   if (m_auth && !m_auth->server_authenticate(sockfd))
121   {
122     DEBUG_FPRINTF((stderr, "Failed to authenticate new session\n"));
123     ndb_socket_close_with_reset(sockfd, true); // Close with reset
124     DBUG_RETURN(0);
125   }
126 
127   BaseString msg;
128   bool close_with_reset = true;
129   bool log_failure = false;
130   if (!m_transporter_registry->connect_server(sockfd,
131                                               msg,
132                                               close_with_reset,
133                                               log_failure))
134   {
135     DEBUG_FPRINTF((stderr, "New session failed in connect_server\n"));
136     ndb_socket_close_with_reset(sockfd, close_with_reset);
137     if (log_failure)
138     {
139       g_eventLogger->warning("TR : %s", msg.c_str());
140     }
141     DBUG_RETURN(0);
142   }
143 
144   DBUG_RETURN(0);
145 }
146 
TransporterReceiveData()147 TransporterReceiveData::TransporterReceiveData()
148   : m_transporters(),
149     m_recv_transporters(),
150     m_has_data_transporters(),
151     m_handled_transporters(),
152     m_bad_data_transporters(),
153     m_last_trp_id(0)
154 {
155   /**
156    * With multi receiver threads
157    *   an interface to reassign these is needed...
158    */
159   m_transporters.set();            // Handle all
160   m_transporters.clear(Uint32(0)); // Except wakeup socket...
161 
162 #if defined(HAVE_EPOLL_CREATE)
163   m_epoll_fd = -1;
164   m_epoll_events = 0;
165 #endif
166 }
167 
168 bool
init(unsigned maxTransporters)169 TransporterReceiveData::init(unsigned maxTransporters)
170 {
171   maxTransporters += 1; /* wakeup socket */
172   m_spintime = 0;
173   m_total_spintime = 0;
174 #if defined(HAVE_EPOLL_CREATE)
175   m_epoll_fd = epoll_create(maxTransporters);
176   if (m_epoll_fd == -1)
177   {
178     perror("epoll_create failed... falling back to select!");
179     goto fallback;
180   }
181   m_epoll_events = new struct epoll_event[maxTransporters];
182   if (m_epoll_events == 0)
183   {
184     perror("Failed to alloc epoll-array... falling back to select!");
185     close(m_epoll_fd);
186     m_epoll_fd = -1;
187     goto fallback;
188   }
189   memset(m_epoll_events, 0, maxTransporters * sizeof(struct epoll_event));
190   return true;
191 fallback:
192 #endif
193   return m_socket_poller.set_max_count(maxTransporters);
194 }
195 
196 bool
epoll_add(Transporter * t)197 TransporterReceiveData::epoll_add(Transporter *t)
198 {
199   assert(m_transporters.get(t->getTransporterIndex()));
200 #if defined(HAVE_EPOLL_CREATE)
201   if (m_epoll_fd != -1)
202   {
203     bool add = true;
204     struct epoll_event event_poll;
205     memset(&event_poll, 0, sizeof(event_poll));
206     NDB_SOCKET_TYPE sock_fd = t->getSocket();
207     int node_id = t->getRemoteNodeId();
208     int op = EPOLL_CTL_ADD;
209     int ret_val, error;
210 
211     if (!ndb_socket_valid(sock_fd))
212       return FALSE;
213 
214     event_poll.data.u32 = t->getTransporterIndex();
215     event_poll.events = EPOLLIN;
216     ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
217     if (!ret_val)
218       goto ok;
219     error= errno;
220     if (error == ENOENT && !add)
221     {
222       /*
223        * Could be that socket was closed premature to this call.
224        * Not a problem that this occurs.
225        */
226       goto ok;
227     }
228     if (!add || (add && (error != ENOMEM)))
229     {
230       /*
231        * Serious problems, we are either using wrong parameters,
232        * have permission problems or the socket doesn't support
233        * epoll!!
234        */
235       ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
236                " node %u to epoll-set,"
237                " errno: %u %s",
238                add ? "ADD" : "DEL",
239                m_epoll_fd,
240                MY_SOCKET_FORMAT_VALUE(sock_fd),
241                node_id,
242                error,
243                strerror(error));
244       abort();
245     }
246     ndbout << "We lacked memory to add the socket for node id ";
247     ndbout << node_id << endl;
248     return false;
249   }
250 
251 ok:
252 #endif
253   return true;
254 }
255 
~TransporterReceiveData()256 TransporterReceiveData::~TransporterReceiveData()
257 {
258 #if defined(HAVE_EPOLL_CREATE)
259   if (m_epoll_fd != -1)
260   {
261     close(m_epoll_fd);
262     m_epoll_fd = -1;
263   }
264 
265   if (m_epoll_events)
266   {
267     delete [] m_epoll_events;
268     m_epoll_events = 0;
269   }
270 #endif
271 }
272 
TransporterRegistry(TransporterCallback * callback,TransporterReceiveHandle * recvHandle,unsigned _maxTransporters)273 TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
274                                          TransporterReceiveHandle *recvHandle,
275                                          unsigned _maxTransporters) :
276   callbackObj(callback),
277   receiveHandle(recvHandle),
278   m_mgm_handle(0),
279   sendCounter(1),
280   localNodeId(0),
281   maxTransporters(_maxTransporters),
282   nTransporters(0),
283   nMultiTransporters(0),
284   nTCPTransporters(0), nSHMTransporters(0),
285   connectBackoffMaxTime(0),
286   m_transp_count(1),
287   m_total_max_send_buffer(0)
288 {
289   if (receiveHandle != 0)
290   {
291     receiveHandle->nTCPTransporters = 0;
292     receiveHandle->nSHMTransporters = 0;
293   }
294   DBUG_ENTER("TransporterRegistry::TransporterRegistry");
295 
296   allTransporters     = new Transporter*      [maxTransporters];
297   theTCPTransporters  = new TCP_Transporter * [maxTransporters];
298   theSHMTransporters  = new SHM_Transporter * [maxTransporters];
299   theTransporterTypes = new TransporterType   [MAX_NODES];
300   theNodeIdTransporters = new Transporter   * [MAX_NODES];
301   theMultiTransporters = new Multi_Transporter * [MAX_NODES];
302   performStates       = new PerformState      [MAX_NODES];
303   ioStates            = new IOState           [MAX_NODES];
304   peerUpIndicators    = new bool              [maxTransporters];
305   connectingTime      = new Uint32            [maxTransporters];
306   m_disconnect_errnum = new int               [maxTransporters];
307   m_disconnect_enomem_error = new Uint32      [maxTransporters];
308   m_error_states      = new ErrorState        [maxTransporters];
309 
310   m_has_extra_wakeup_socket = false;
311 
312 #ifdef ERROR_INSERT
313   m_blocked.clear();
314   m_blocked_trp.clear();
315   m_blocked_disconnected.clear();
316   m_sendBlocked.clear();
317 
318   m_mixology_level = 0;
319 #endif
320 
321   // Initialize the transporter arrays
322   ErrorState default_error_state = { TE_NO_ERROR, (const char *)~(UintPtr)0 };
323   for (unsigned i = 0; i < MAX_NODES; i++)
324   {
325     theNodeIdTransporters[i] = NULL;
326     theMultiTransporters[i] = NULL;
327     performStates[i]      = DISCONNECTED;
328     ioStates[i]           = NoHalt;
329     peerUpIndicators[i]   = true; // Assume all nodes are up, will be
330                                   // cleared at first connect attempt
331     connectingTime[i]     = 0;
332     m_disconnect_errnum[i]= 0;
333     m_disconnect_enomem_error[i] = 0;
334     m_error_states[i]     = default_error_state;
335   }
336   for (unsigned i = 0; i < maxTransporters; i++)
337   {
338     allTransporters[i]    = NULL;
339     theTCPTransporters[i] = NULL;
340     theSHMTransporters[i] = NULL;
341   }
342   theMultiTransporterMutex = NdbMutex_Create();
343   DBUG_VOID_RETURN;
344 }
345 
set_mgm_handle(NdbMgmHandle h)346 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
347 {
348   DBUG_ENTER("TransporterRegistry::set_mgm_handle");
349   if (m_mgm_handle)
350     ndb_mgm_destroy_handle(&m_mgm_handle);
351   m_mgm_handle= h;
352   ndb_mgm_set_timeout(m_mgm_handle, 5000);
353 #ifndef DBUG_OFF
354   if (h)
355   {
356     char buf[256];
357     DBUG_PRINT("info",("handle set with connectstring: %s",
358 		       ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
359   }
360   else
361   {
362     DBUG_PRINT("info",("handle set to NULL"));
363   }
364 #endif
365   DBUG_VOID_RETURN;
366 }
367 
~TransporterRegistry()368 TransporterRegistry::~TransporterRegistry()
369 {
370   DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
371 
372   disconnectAll();
373   removeAll();
374 
375   delete[] allTransporters;
376   delete[] theTCPTransporters;
377   delete[] theSHMTransporters;
378   delete[] theTransporterTypes;
379   delete[] theMultiTransporters;
380   delete[] theNodeIdTransporters;
381   delete[] performStates;
382   delete[] ioStates;
383   delete[] peerUpIndicators;
384   delete[] connectingTime;
385   delete[] m_disconnect_errnum;
386   delete[] m_disconnect_enomem_error;
387   delete[] m_error_states;
388 
389   if (m_mgm_handle)
390     ndb_mgm_destroy_handle(&m_mgm_handle);
391 
392   if (m_has_extra_wakeup_socket)
393   {
394     ndb_socket_close(m_extra_wakeup_sockets[0]);
395     ndb_socket_close(m_extra_wakeup_sockets[1]);
396   }
397   NdbMutex_Destroy(theMultiTransporterMutex);
398   DBUG_VOID_RETURN;
399 }
400 
401 void
removeAll()402 TransporterRegistry::removeAll()
403 {
404   for (Uint32 i = 0; i < nTCPTransporters; i++)
405   {
406     delete theTCPTransporters[i];
407   }
408   for (Uint32 i = 0; i < nSHMTransporters; i++)
409   {
410     delete theSHMTransporters[i];
411   }
412   for (Uint32 i = 0; i < nMultiTransporters; i++)
413   {
414     delete theMultiTransporters[i];
415   }
416   nTransporters = 0;
417   nTCPTransporters = 0;
418   nSHMTransporters = 0;
419   nMultiTransporters = 0;
420 }
421 
422 void
disconnectAll()423 TransporterRegistry::disconnectAll(){
424   DEBUG_FPRINTF((stderr, "(%u)doDisconnect(all), line: %d\n",
425                localNodeId, __LINE__));
426   for (Uint32 i = 0; i < nTCPTransporters; i++)
427   {
428     theTCPTransporters[i]->doDisconnect();
429   }
430 #ifndef WIN32
431   for (Uint32 i = 0; i < nSHMTransporters; i++)
432   {
433     theSHMTransporters[i]->doDisconnect();
434   }
435 #endif
436 }
437 
438 bool
init(NodeId nodeId)439 TransporterRegistry::init(NodeId nodeId) {
440   DBUG_ENTER("TransporterRegistry::init");
441   assert(localNodeId == 0 ||
442          localNodeId == nodeId);
443 
444   localNodeId = nodeId;
445 
446   DEBUG("TransporterRegistry started node: " << localNodeId);
447 
448   if (receiveHandle)
449   {
450     if (!init(* receiveHandle))
451       DBUG_RETURN(false);
452   }
453 
454   DBUG_RETURN(true);
455 }
456 
457 bool
init(TransporterReceiveHandle & recvhandle)458 TransporterRegistry::init(TransporterReceiveHandle& recvhandle)
459 {
460   recvhandle.nTCPTransporters = nTCPTransporters;
461   recvhandle.nSHMTransporters = nSHMTransporters;
462   return recvhandle.init(maxTransporters);
463 }
464 
465 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg,bool & close_with_reset,bool & log_failure)466 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
467                                     BaseString & msg,
468                                     bool& close_with_reset,
469                                     bool& log_failure)
470 {
471   DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
472 
473   log_failure = true;
474 
475   // Read "hello" that consists of node id and other info
476   // from client
477   SocketInputStream s_input(sockfd);
478   char buf[256]; // <int> <int> <int> <int> <..expansion..>
479   if (s_input.gets(buf, sizeof(buf)) == 0) {
480     /* Could be spurious connection, need not log */
481     log_failure = false;
482     msg.assfmt("Ignored connection attempt as failed to "
483                "read 'hello' from client");
484     DBUG_PRINT("error", ("%s", msg.c_str()));
485     DEBUG_FPRINTF((stderr, "%s", msg.c_str()));
486     DBUG_RETURN(false);
487   }
488 
489   int nodeId;
490   int remote_transporter_type;
491   int serverNodeId = -1;
492   int multi_transporter_instance = -1;
493   int r= sscanf(buf, "%d %d %d %d",
494                 &nodeId,
495                 &remote_transporter_type,
496                 &serverNodeId,
497                 &multi_transporter_instance);
498   switch (r) {
499   case 4:
500     /* Latest version client */
501     break;
502   case 3:
503     /* Older client, sending just nodeid, transporter type, serverNodeId */
504     break;
505   case 2:
506     /* Older client, sending just nodeid and transporter type */
507     break;
508   default:
509     /* Could be spurious connection, need not log */
510     log_failure = false;
511     msg.assfmt("Ignored connection attempt as failed to "
512                "parse 'hello' from client.  >%s<", buf);
513     DBUG_PRINT("error", ("%s", msg.c_str()));
514     DEBUG_FPRINTF((stderr, "%s", msg.c_str()));
515     DBUG_RETURN(false);
516   }
517 
518   DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d "
519                       "server nodeid %d instance %d",
520 		      nodeId,
521                       remote_transporter_type,
522                       serverNodeId,
523                       multi_transporter_instance));
524   /*
525   DEBUG_FPRINTF((stderr, "Client hello, nodeId: %d transporter type: %d "
526                          "server nodeid %d instance %d",
527 		         nodeId,
528                          remote_transporter_type,
529                          serverNodeId,
530                          multi_transporter_instace));
531   */
532 
533   // Check that nodeid is in range before accessing the arrays
534   if (nodeId < 0 ||
535       nodeId > (int)MAX_NODES)
536   {
537     /* Strange, log it */
538     msg.assfmt("Ignored connection attempt as client "
539                "nodeid %u out of range", nodeId);
540     DBUG_PRINT("error", ("%s", msg.c_str()));
541     DBUG_RETURN(false);
542   }
543 
544   lockMultiTransporters();
545   // Check that transporter is allocated
546   Transporter *t= theNodeIdTransporters[nodeId];
547   if (t == 0)
548   {
549     unlockMultiTransporters();
550     /* Strange, log it */
551     msg.assfmt("Ignored connection attempt as client "
552                "nodeid %u is undefined.",
553                nodeId);
554     DBUG_PRINT("error", ("%s", msg.c_str()));
555     DBUG_RETURN(false);
556   }
557 
558   // Check transporter type
559   if (remote_transporter_type != t->m_type &&
560       t->m_type != tt_Multi_TRANSPORTER) // Checked later
561   {
562     unlockMultiTransporters();
563     /* Strange, log it */
564     msg.assfmt("Connection attempt from client node %u failed as transporter "
565                "type %u is not as expected %u.",
566                nodeId,
567                remote_transporter_type,
568                t->m_type);
569     DBUG_RETURN(false);
570   }
571 
572   // Check that the serverNodeId is correct
573   if (serverNodeId != -1)
574   {
575     /* Check that incoming connection was meant for us */
576     if (serverNodeId != t->getLocalNodeId())
577     {
578       unlockMultiTransporters();
579       /* Strange, log it */
580       msg.assfmt("Ignored connection attempt as client "
581                  "node %u attempting to connect to node %u, "
582                  "but this is node %u.",
583                  nodeId,
584                  serverNodeId,
585                  t->getLocalNodeId());
586       DBUG_PRINT("error", ("%s", msg.c_str()));
587       DBUG_RETURN(false);
588     }
589   }
590 
591   bool correct_state = false;
592   Multi_Transporter *multi_trp =
593     get_node_multi_transporter(nodeId);
594   if (multi_trp &&
595       multi_transporter_instance > 0)
596   {
597     /* Specific path for non-zero multi transporter instances */
598     DEBUG_FPRINTF((stderr, "connect_server multi trp, node %u instance %u\n",
599                    t->getRemoteNodeId(),
600                    multi_transporter_instance));
601 
602     if (performStates[nodeId] == TransporterRegistry::CONNECTED)
603     {
604       /**
605        * The connection is ok if the following is true:
606        * 1) The transporter is a Multi_Transporter object
607        * 2) The instance exists as inactive transporter
608        * 3) The inactive instance referred to isn't already
609        *    connected
610        *
611        * If this all are true we will replace the multi transporter
612        * instance with the transporter instance to connect in this
613        * instance.
614        */
615       if ((multi_transporter_instance - 1) <
616            int(multi_trp->get_num_inactive_transporters()))
617       {
618         Transporter *inst_trp =
619           multi_trp->get_inactive_transporter(multi_transporter_instance - 1);
620 
621         /* Check type now that we have the actual instance */
622         if (remote_transporter_type == inst_trp->m_type)
623         {
624           if (!inst_trp->isConnected())
625           {
626             /**
627              * Continue connection setup with
628              * multi-transporter specific instance
629              */
630             correct_state = true;
631             t = inst_trp;
632           }
633           else
634           {
635             /* Strange, log it */
636             msg.assfmt("Ignored connection attempt from node %u as multi "
637                        "transporter instance %u already connected.",
638                        nodeId,
639                        multi_transporter_instance);
640           }
641         }
642         else
643         {
644           /* Strange, log it */
645           msg.assfmt("Ignored multi transporter connection attempt "
646                      "from node %u instance %u as transporter "
647                      "type %u is not as expected %u",
648                      nodeId,
649                      multi_transporter_instance,
650                      remote_transporter_type,
651                      inst_trp->m_type);
652         }
653       }
654       else
655       {
656         /* Strange, log it */
657         msg.assfmt("Ignored connection attempt from node %u as multi "
658                    "transporter instance %u is not in range.",
659                    nodeId,
660                    multi_transporter_instance);
661       }
662     }
663   }
664   else
665   {
666     /**
667      * Normal connection setup
668      * Sub cases :
669      *   Normal setup for non-multi trp        : multi = 0, instance = 0
670      *   Normal setup from non-multi trp from
671      *     old version                         : multi = 0, instance = -1
672      *   Normal setup for multi trp instance 0 : multi = 1, instance = 0
673      *   Normal setup from for multi trp
674      *     from old version                    : multi = 1, instance = -1
675      *
676      * Not supported :
677      *   multi = 0, instance > 0               : Invalid
678      */
679     if (!multi_trp)
680     {
681       if (multi_transporter_instance > 0)
682       {
683         unlockMultiTransporters();
684         /* Strange, log it */
685         msg.assfmt("Ignored connection attempt from node %u as multi "
686                    "transporter instance %d specified for non multi-transporter",
687                    nodeId, multi_transporter_instance);
688         DBUG_RETURN(false);
689       }
690     }
691     else
692     {
693       /* multi_trp */
694       require(multi_transporter_instance <= 0);
695 
696       /* Continue connection setup with specific instance 0 */
697       require(multi_trp->get_num_active_transporters() == 1);
698       t = multi_trp->get_active_transporter(0);
699     }
700 
701     /**
702      * Normal connection setup requires state to be in CONNECTING
703      */
704     if (performStates[nodeId] == TransporterRegistry::CONNECTING)
705     {
706       correct_state = true;
707     }
708     else
709     {
710       msg.assfmt("Ignored connection attempt as this node "
711                  "is not expecting a connection from node %u. "
712                  "State %u",
713                  nodeId,
714                  performStates[nodeId]);
715 
716       /**
717        * This is expected if the transporter state is DISCONNECTED,
718        * otherwise it's a bit strange
719        */
720       log_failure = (performStates[nodeId] != TransporterRegistry::DISCONNECTED);
721 
722       DEBUG_FPRINTF((stderr, "%s", msg.c_str()));
723     }
724   }
725   unlockMultiTransporters();
726 
727   // Check that the transporter should be connecting
728   if (!correct_state)
729   {
730     DBUG_PRINT("error", ("Transporter for node id %d in wrong state %s",
731                          nodeId, msg.c_str()));
732     /*
733     DEBUG_FPRINTF((stderr, "Transporter for node id %d in wrong state %s\n",
734                            nodeId, msg.c_str()));
735     */
736 
737     // Avoid TIME_WAIT on server by requesting client to close connection
738     SocketOutputStream s_output(sockfd);
739     if (s_output.println("BYE") < 0)
740     {
741       // Failed to request client close
742       DBUG_PRINT("error", ("Failed to send client BYE"));
743       DBUG_RETURN(false);
744     }
745 
746     // Wait for to close connection by reading EOF(i.e read returns 0)
747     const int read_eof_timeout = 1000; // Fairly short timeout
748     if (read_socket(sockfd, read_eof_timeout,
749                     buf, sizeof(buf)) == 0)
750     {
751       // Client gracefully closed connection, turn off close_with_reset
752       close_with_reset = false;
753       DBUG_RETURN(false);
754     }
755 
756     // Failed to request client close
757     DBUG_RETURN(false);
758   }
759 
760   // Send reply to client
761   SocketOutputStream s_output(sockfd);
762   if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
763   {
764     /* Strange, log it */
765     msg.assfmt("Connection attempt failed due to error sending "
766                "reply to client node %u",
767                nodeId);
768     DBUG_PRINT("error", ("%s", msg.c_str()));
769     DBUG_RETURN(false);
770   }
771 
772   // Setup transporter (transporter responsible for closing sockfd)
773   DEBUG_FPRINTF((stderr, "connect_server for trp_id %u\n",
774                  t->getTransporterIndex()));
775   DBUG_RETURN(t->connect_server(sockfd, msg));
776 }
777 
778 void
insert_allTransporters(Transporter * t)779 TransporterRegistry::insert_allTransporters(Transporter *t)
780 {
781   TrpId trp_id = t->getTransporterIndex();
782   if (trp_id == 0)
783   {
784     nTransporters++;
785     require(allTransporters[nTransporters] == 0);
786     allTransporters[nTransporters] = t;
787     t->setTransporterIndex(nTransporters);
788   }
789   else
790   {
791     require(allTransporters[trp_id] == 0);
792     allTransporters[trp_id] = t;
793   }
794 }
795 
796 void
remove_allTransporters(Transporter * t)797 TransporterRegistry::remove_allTransporters(Transporter *t)
798 {
799   TrpId trp_id = t->getTransporterIndex();
800   if (trp_id == 0)
801   {
802     return;
803   }
804   else if (t == allTransporters[trp_id])
805   {
806     DEBUG_FPRINTF((stderr,
807                    "remove trp_id %u for node %u from allTransporters\n",
808                    trp_id,
809                    t->getRemoteNodeId()));
810     allTransporters[trp_id] = 0;
811   }
812 }
813 
814 void
insert_node_transporter(NodeId node_id,Transporter * t)815 TransporterRegistry::insert_node_transporter(NodeId node_id, Transporter *t)
816 {
817   theNodeIdTransporters[node_id] = t;
818 }
819 
820 void
lockMultiTransporters()821 TransporterRegistry::lockMultiTransporters()
822 {
823   NdbMutex_Lock(theMultiTransporterMutex);
824 }
825 
826 void
unlockMultiTransporters()827 TransporterRegistry::unlockMultiTransporters()
828 {
829   NdbMutex_Unlock(theMultiTransporterMutex);
830 }
831 
832 Uint32
get_num_multi_transporters()833 TransporterRegistry::get_num_multi_transporters()
834 {
835   return nMultiTransporters;
836 }
837 
838 Multi_Transporter*
get_multi_transporter(Uint32 index)839 TransporterRegistry::get_multi_transporter(Uint32 index)
840 {
841   require(index < nMultiTransporters);
842   return theMultiTransporters[index];
843 }
844 
845 bool
configureTransporter(TransporterConfiguration * config)846 TransporterRegistry::configureTransporter(TransporterConfiguration *config)
847 {
848   NodeId remoteNodeId = config->remoteNodeId;
849 
850   assert(localNodeId);
851   assert(config->localNodeId == localNodeId);
852 
853   if (remoteNodeId > MAX_NODES)
854     return false;
855 
856   Transporter* t = theNodeIdTransporters[remoteNodeId];
857   if(t != NULL)
858   {
859     // Transporter already exist, try to reconfigure it
860     require(!t->isMultiTransporter());
861     require(!t->isPartOfMultiTransporter());
862     return t->configure(config);
863   }
864 
865   DEBUG("Configuring transporter from " << localNodeId
866 	<< " to " << remoteNodeId);
867 
868   switch (config->type){
869   case tt_TCP_TRANSPORTER:
870     return createTCPTransporter(config);
871   case tt_SHM_TRANSPORTER:
872     return createSHMTransporter(config);
873   default:
874     abort();
875     break;
876   }
877   return false;
878 }
879 
880 bool
createMultiTransporter(Uint32 node_id,Uint32 num_trps)881 TransporterRegistry::createMultiTransporter(Uint32 node_id, Uint32 num_trps)
882 {
883   Multi_Transporter *multi_trp = 0;
884   lockMultiTransporters();
885   Transporter *base_trp = theNodeIdTransporters[node_id];
886   require(!base_trp->isMultiTransporter());
887   require(!base_trp->isPartOfMultiTransporter());
888   multi_trp = new Multi_Transporter(*this, base_trp);
889   theMultiTransporters[nMultiTransporters] = multi_trp;
890   nMultiTransporters++;
891   Uint32 nodeId = base_trp->getRemoteNodeId();
892   TransporterType type = theTransporterTypes[nodeId];
893   for (Uint32 i = 0; i < num_trps; i++)
894   {
895     if (type == tt_TCP_TRANSPORTER)
896     {
897       TCP_Transporter *tcp_trp = (TCP_Transporter*)base_trp;
898       TCP_Transporter *new_trp = new TCP_Transporter(*this, tcp_trp);
899       require(new_trp->initTransporter());
900       multi_trp->add_not_used_trp(new_trp);
901       new_trp->set_multi_transporter_instance(i + 1);
902       theTCPTransporters[nTCPTransporters++] = new_trp;
903     }
904 #ifndef WIN32
905     else if (type == tt_SHM_TRANSPORTER)
906     {
907       SHM_Transporter *shm_trp = (SHM_Transporter*)base_trp;
908       SHM_Transporter *new_trp = new SHM_Transporter(*this, shm_trp);
909       require(new_trp->initTransporter());
910       multi_trp->add_not_used_trp(new_trp);
911       new_trp->set_multi_transporter_instance(i + 1);
912       theSHMTransporters[nSHMTransporters++] = new_trp;
913     }
914 #endif
915     else
916     {
917       require(false);
918     }
919   }
920   multi_trp->add_active_trp(base_trp);
921   unlockMultiTransporters();
922   return true;
923 }
924 
925 bool
createTCPTransporter(TransporterConfiguration * config)926 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
927 
928   TCP_Transporter * t = 0;
929   /* Don't use index 0, special use case for extra  transporters */
930   config->transporterIndex = nTransporters + 1;
931   if (config->remoteNodeId == config->localNodeId)
932   {
933     t = new Loopback_Transporter(* this, config);
934   }
935   else
936   {
937     t = new TCP_Transporter(*this, config);
938   }
939 
940   if (t == NULL)
941     return false;
942   else if (!t->initTransporter())
943   {
944     delete t;
945     return false;
946   }
947 
948   // Put the transporter in the transporter arrays
949   nTransporters++;
950   allTransporters[nTransporters]            = t;
951   theTCPTransporters[nTCPTransporters]      = t;
952   theNodeIdTransporters[t->getRemoteNodeId()] = t;
953   theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
954   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
955   nTCPTransporters++;
956   m_total_max_send_buffer += t->get_max_send_buffer();
957   return true;
958 }
959 
960 bool
createSHMTransporter(TransporterConfiguration * config)961 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config)
962 {
963 #ifndef WIN32
964   DBUG_ENTER("TransporterRegistry::createTransporter SHM");
965 
966   /* Don't use index 0, special use case for extra  transporters */
967   config->transporterIndex = nTransporters + 1;
968 
969   SHM_Transporter * t = new SHM_Transporter(*this,
970                                             config->transporterIndex,
971 					    config->localHostName,
972 					    config->remoteHostName,
973 					    config->s_port,
974 					    config->isMgmConnection,
975 					    localNodeId,
976 					    config->remoteNodeId,
977 					    config->serverNodeId,
978 					    config->checksum,
979 					    config->signalId,
980 					    config->shm.shmKey,
981 					    config->shm.shmSize,
982 					    config->preSendChecksum,
983                                             config->shm.shmSpintime,
984                                             config->shm.sendBufferSize);
985   if (t == NULL)
986     return false;
987 
988   // Put the transporter in the transporter arrays
989   nTransporters++;
990   allTransporters[nTransporters]            = t;
991   theSHMTransporters[nSHMTransporters]      = t;
992   theNodeIdTransporters[t->getRemoteNodeId()]     = t;
993   theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
994   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
995 
996   nSHMTransporters++;
997   m_total_max_send_buffer += t->get_max_send_buffer();
998 
999   DBUG_RETURN(true);
1000 #else
1001   return false;
1002 #endif
1003 }
1004 
1005 /**
1006  * prepareSend() - queue a signal for later asynchronous sending.
1007  *
1008  * A successfull prepareSend() only guarantee that the signal has been
1009  * stored in some send buffers. Normally it will later be sent, but could
1010  * also be discarded if the transporter *later* disconnects.
1011  *
1012  * Signal memory is allocated with the implementation dependent
1013  * ::getWritePtr(). On multithreaded implementations, allocation may
1014  * take place in thread-local buffer pools which is later 'flushed'
1015  * to a global send buffer.
1016  *
1017  * Asynchronous to prepareSend() there may be Transporters
1018  * (dis)connecting which are signaled to the upper layers by calling
1019  * disable_/enable_send_buffers().
1020  *
1021  * The 'sendHandle' interface has the method ::isSendEnabled() which
1022  * provides us with a way to check whether communication with a node
1023  * is possible. Depending on the sendHandle implementation,
1024  * isSendEnabled() may either have a 'synchronized' or 'optimistic'
1025  * implementation:
1026  *  - A (thread-) 'synchronized' implementation guarantee that the
1027  *    send buffers really are enabled, both thread local and global,
1028  *    at the time of send buffer allocation. (May disconnect later though)
1029  *  - An 'optimistic' implementation does not really know whether the
1030  *    send buffers are (globally) enabled. Send buffers may always be
1031  *    allocated and possibly silently discarded later.
1032  *    (SEND_DISCONNECTED will never be returned)
1033  *
1034  * The trp_client implementation is 'synchronized', while the mt-/non-mt
1035  * data node implementation is not. Note that a 'SEND_DISCONNECTED'
1036  * and 'SEND_BLOCKED' return has always been handled as an 'OK' on
1037  * the data nodes. So not being able to detect 'SEND_DISCONNECTED'
1038  * should not matter.
1039  *
1040  * Note that sending behaves differently wrt disconnect / reconnect
1041  * synching compared to 'receive'. Receiver side *is* synchroinized with
1042  * the receiver transporter disconnect / reconnect by both requiring the
1043  * 'poll-right'. Thus receiver logic may check Transporter::isConnected()
1044  * directly.
1045  *
1046  * See further comments as part of ::performReceive().
1047  */
1048 template <typename AnySectionArg>
1049 SendStatus
prepareSendTemplate(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,TrpId & trp_id,AnySectionArg section)1050 TransporterRegistry::prepareSendTemplate(
1051                                  TransporterSendBufferHandle *sendHandle,
1052                                  const SignalHeader * signalHeader,
1053                                  Uint8 prio,
1054                                  const Uint32 * signalData,
1055                                  NodeId nodeId,
1056                                  TrpId &trp_id,
1057                                  AnySectionArg section)
1058 {
1059   Transporter *node_trp = theNodeIdTransporters[nodeId];
1060   if (unlikely(node_trp == NULL))
1061   {
1062     DEBUG("Discarding message to unknown node: " << nodeId);
1063     return SEND_UNKNOWN_NODE;
1064   }
1065   assert(!node_trp->isPartOfMultiTransporter());
1066   Transporter *t;
1067   t = node_trp->get_send_transporter(signalHeader->theReceiversBlockNumber,
1068                                      signalHeader->theSendersBlockRef);
1069   assert(!t->isMultiTransporter());
1070   trp_id = t->getTransporterIndex();
1071   if (unlikely(trp_id == 0))
1072   {
1073     /**
1074      * Can happen in disconnect situations, node is disconnected, so send
1075      * to it is successful since the node won't be there to receive the
1076      * message.
1077      */
1078     DEBUG("Discarding message due to trp_id = 0");
1079     return SEND_OK;
1080   }
1081   if(
1082     likely((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
1083            (signalHeader->theReceiversBlockNumber == QMGR) ||
1084            (signalHeader->theReceiversBlockNumber == API_CLUSTERMGR))
1085   {
1086     if (likely(sendHandle->isSendEnabled(nodeId)))
1087     {
1088       const Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, section.m_ptr);
1089       if (likely(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE))
1090       {
1091         SendStatus error = SEND_OK;
1092 	Uint32 *insertPtr = getWritePtr(sendHandle,
1093                                         t,
1094                                         trp_id,
1095                                         lenBytes,
1096                                         prio,
1097                                         &error);
1098         if (likely(insertPtr != nullptr))
1099 	{
1100 	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, section);
1101 	  updateWritePtr(sendHandle, t, trp_id, lenBytes, prio);
1102 	  return SEND_OK;
1103 	}
1104         if (unlikely(error == SEND_MESSAGE_TOO_BIG))
1105         {
1106           g_eventLogger->info("Send message too big");
1107 	  return SEND_MESSAGE_TOO_BIG;
1108         }
1109         set_status_overloaded(nodeId, true);
1110         const int sleepTime = 2;
1111 
1112 	/**
1113 	 * @note: on linux/i386 the granularity is 10ms
1114 	 *        so sleepTime = 2 generates a 10 ms sleep.
1115 	 */
1116 	for (int i = 0; i < 100; i++)
1117 	{
1118 	  NdbSleep_MilliSleep(sleepTime);
1119           /* FC : Consider counting sleeps here */
1120 	  insertPtr = getWritePtr(sendHandle,
1121                                   t,
1122                                   trp_id,
1123                                   lenBytes,
1124                                   prio,
1125                                   &error);
1126 	  if (likely(insertPtr != nullptr))
1127 	  {
1128 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, section);
1129 	    updateWritePtr(sendHandle, t, trp_id, lenBytes, prio);
1130 	    DEBUG_FPRINTF((stderr, "TE_SEND_BUFFER_FULL\n"));
1131 	    /**
1132 	     * Send buffer full, but resend works
1133 	     */
1134 	    report_error(nodeId, TE_SEND_BUFFER_FULL);
1135 	    return SEND_OK;
1136 	  }
1137           if (unlikely(error == SEND_MESSAGE_TOO_BIG))
1138           {
1139             g_eventLogger->info("Send message too big");
1140 	    return SEND_MESSAGE_TOO_BIG;
1141           }
1142 	}
1143 
1144 	WARNING("Signal to " << nodeId << " lost(buffer)");
1145 	DEBUG_FPRINTF((stderr, "TE_SIGNAL_LOST_SEND_BUFFER_FULL\n"));
1146 	report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
1147 	return SEND_BUFFER_FULL;
1148       }
1149       else
1150       {
1151         g_eventLogger->info("Send message too big: length %u", lenBytes);
1152 	return SEND_MESSAGE_TOO_BIG;
1153       }
1154     }
1155     else
1156     {
1157 #ifdef ERROR_INSERT
1158       if (m_blocked.get(nodeId))
1159       {
1160         /* Looks like it disconnected while blocked.  We'll pretend
1161          * not to notice for now
1162          */
1163         WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
1164         return SEND_OK;
1165       }
1166 #endif
1167       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
1168       return SEND_DISCONNECTED;
1169     }
1170   }
1171   else
1172   {
1173     DEBUG("Discarding message to block: "
1174 	  << signalHeader->theReceiversBlockNumber
1175 	  << " node: " << nodeId);
1176 
1177     return SEND_BLOCKED;
1178   }
1179 }
1180 
1181 
1182 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,TrpId & trp_id,const LinearSectionPtr ptr[3])1183 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1184                                  const SignalHeader *signalHeader,
1185                                  Uint8 prio,
1186                                  const Uint32 *signalData,
1187                                  NodeId nodeId,
1188                                  TrpId &trp_id,
1189                                  const LinearSectionPtr ptr[3])
1190 {
1191   const Packer::LinearSectionArg section(ptr);
1192   return prepareSendTemplate(sendHandle,
1193                              signalHeader,
1194                              prio,
1195                              signalData,
1196                              nodeId,
1197                              trp_id,
1198                              section);
1199 }
1200 
1201 
1202 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,TrpId & trp_id,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])1203 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1204                                  const SignalHeader *signalHeader,
1205                                  Uint8 prio,
1206                                  const Uint32 *signalData,
1207                                  NodeId nodeId,
1208                                  TrpId &trp_id,
1209                                  class SectionSegmentPool &thePool,
1210                                  const SegmentedSectionPtr ptr[3])
1211 {
1212   const Packer::SegmentedSectionArg section(thePool,ptr);
1213   return prepareSendTemplate(sendHandle,
1214                              signalHeader,
1215                              prio,
1216                              signalData,
1217                              nodeId,
1218                              trp_id,
1219                              section);
1220 }
1221 
1222 
1223 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,const GenericSectionPtr ptr[3])1224 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1225                                  const SignalHeader *signalHeader,
1226                                  Uint8 prio,
1227                                  const Uint32 *signalData,
1228                                  NodeId nodeId,
1229                                  const GenericSectionPtr ptr[3])
1230 {
1231   TrpId trp_id = 0;
1232   const Packer::GenericSectionArg section(ptr);
1233   return prepareSendTemplate(sendHandle,
1234                              signalHeader,
1235                              prio,
1236                              signalData,
1237                              nodeId,
1238                              trp_id,
1239                              section);
1240 }
1241 
1242 bool
setup_wakeup_socket(TransporterReceiveHandle & recvdata)1243 TransporterRegistry::setup_wakeup_socket(TransporterReceiveHandle& recvdata)
1244 {
1245   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1246 
1247   if (m_has_extra_wakeup_socket)
1248   {
1249     return true;
1250   }
1251 
1252   assert(!recvdata.m_transporters.get(0));
1253 
1254   if (ndb_socketpair(m_extra_wakeup_sockets))
1255   {
1256     perror("socketpair failed!");
1257     return false;
1258   }
1259 
1260   if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
1261       !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
1262   {
1263     goto err;
1264   }
1265 
1266 #if defined(HAVE_EPOLL_CREATE)
1267   if (recvdata.m_epoll_fd != -1)
1268   {
1269     int sock = m_extra_wakeup_sockets[0].fd;
1270     struct epoll_event event_poll;
1271     memset(&event_poll, 0, sizeof(event_poll));
1272     event_poll.data.u32 = 0;
1273     event_poll.events = EPOLLIN;
1274     int ret_val = epoll_ctl(recvdata.m_epoll_fd, EPOLL_CTL_ADD, sock,
1275                             &event_poll);
1276     if (ret_val != 0)
1277     {
1278       int error= errno;
1279       fprintf(stderr, "Failed to add extra sock %u to epoll-set: %u\n",
1280               sock, error);
1281       fflush(stderr);
1282       goto err;
1283     }
1284   }
1285 #endif
1286   m_has_extra_wakeup_socket = true;
1287   recvdata.m_transporters.set(Uint32(0));
1288   return true;
1289 
1290 err:
1291   ndb_socket_close(m_extra_wakeup_sockets[0]);
1292   ndb_socket_close(m_extra_wakeup_sockets[1]);
1293   ndb_socket_invalidate(m_extra_wakeup_sockets+0);
1294   ndb_socket_invalidate(m_extra_wakeup_sockets+1);
1295   return false;
1296 }
1297 
1298 void
wakeup()1299 TransporterRegistry::wakeup()
1300 {
1301   if (m_has_extra_wakeup_socket)
1302   {
1303     static char c = 37;
1304     ndb_send(m_extra_wakeup_sockets[1], &c, 1, 0);
1305   }
1306 }
1307 
1308 Uint32
check_TCP(TransporterReceiveHandle & recvdata,Uint32 timeOutMillis)1309 TransporterRegistry::check_TCP(TransporterReceiveHandle& recvdata,
1310                                Uint32 timeOutMillis)
1311 {
1312   Uint32 retVal = 0;
1313 #if defined(HAVE_EPOLL_CREATE)
1314   if (likely(recvdata.m_epoll_fd != -1))
1315   {
1316     int tcpReadSelectReply = 0;
1317     Uint32 num_trps = nTCPTransporters + nSHMTransporters +
1318                       (m_has_extra_wakeup_socket ? 1 : 0);
1319 
1320     if (num_trps)
1321     {
1322       tcpReadSelectReply = epoll_wait(recvdata.m_epoll_fd,
1323                                       recvdata.m_epoll_events,
1324                                       num_trps, timeOutMillis);
1325       retVal = tcpReadSelectReply;
1326     }
1327 
1328     int num_socket_events = tcpReadSelectReply;
1329     if (num_socket_events > 0)
1330     {
1331       for (int i = 0; i < num_socket_events; i++)
1332       {
1333         const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
1334         /**
1335          * check that it's assigned to "us"
1336          */
1337         assert(recvdata.m_transporters.get(trpid));
1338 
1339         recvdata.m_recv_transporters.set(trpid);
1340       }
1341     }
1342     else if (num_socket_events < 0)
1343     {
1344       assert(errno == EINTR);
1345     }
1346   }
1347   else
1348 #endif
1349   {
1350     retVal = poll_TCP(timeOutMillis, recvdata);
1351   }
1352   return retVal;
1353 }
1354 
1355 Uint32
poll_SHM(TransporterReceiveHandle & recvdata,NDB_TICKS start_poll,Uint32 micros_to_poll)1356 TransporterRegistry::poll_SHM(TransporterReceiveHandle& recvdata,
1357                               NDB_TICKS start_poll,
1358                               Uint32 micros_to_poll)
1359 {
1360   Uint32 res;
1361   Uint64 micros_passed;
1362   do
1363   {
1364     bool any_connected = false;
1365     res = poll_SHM(recvdata, any_connected);
1366     if (res || !any_connected)
1367     {
1368       /**
1369        * If data found or no SHM transporter connected there is no
1370        * reason to continue spinning.
1371        */
1372       break;
1373     }
1374     NDB_TICKS now = NdbTick_getCurrentTicks();
1375     micros_passed =
1376       NdbTick_Elapsed(start_poll, now).microSec();
1377   } while (micros_passed < Uint64(micros_to_poll));
1378   return res;
1379 }
1380 
1381 Uint32
poll_SHM(TransporterReceiveHandle & recvdata,bool & any_connected)1382 TransporterRegistry::poll_SHM(TransporterReceiveHandle& recvdata,
1383                               bool &any_connected)
1384 {
1385   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1386 
1387   Uint32 retVal = 0;
1388   any_connected = false;
1389 #ifndef WIN32
1390   for (Uint32 i = 0; i < recvdata.nSHMTransporters; i++)
1391   {
1392     SHM_Transporter * t = theSHMTransporters[i];
1393     Uint32 node_id = t->getRemoteNodeId();
1394     Uint32 trp_id = t->getTransporterIndex();
1395 
1396     if (!recvdata.m_transporters.get(trp_id))
1397       continue;
1398 
1399     if (t->isConnected() && is_connected(node_id))
1400     {
1401       any_connected = true;
1402       if (t->hasDataToRead())
1403       {
1404         recvdata.m_has_data_transporters.set(trp_id);
1405         retVal = 1;
1406       }
1407     }
1408   }
1409 #endif
1410   return retVal;
1411 }
1412 
1413 Uint32
spin_check_transporters(TransporterReceiveHandle & recvdata)1414 TransporterRegistry::spin_check_transporters(
1415                           TransporterReceiveHandle& recvdata)
1416 {
1417   Uint32 res = 0;
1418 #ifndef WIN32
1419   Uint64 micros_passed = 0;
1420   bool any_connected = false;
1421   Uint64 spintime = Uint64(recvdata.m_spintime);
1422 
1423   if (spintime == 0)
1424   {
1425     return res;
1426   }
1427   NDB_TICKS start = NdbTick_getCurrentTicks();
1428   do
1429   {
1430     {
1431       res = poll_SHM(recvdata, any_connected);
1432       if (res || !any_connected)
1433         break;
1434     }
1435     if (res || !any_connected)
1436       break;
1437     res = check_TCP(recvdata, 0);
1438     if (res)
1439       break;
1440     NdbSpin();
1441     NDB_TICKS now = NdbTick_getCurrentTicks();
1442     micros_passed =
1443       NdbTick_Elapsed(start, now).microSec();
1444   } while (micros_passed < Uint64(recvdata.m_spintime));
1445   recvdata.m_total_spintime += micros_passed;
1446 #endif
1447   return res;
1448 }
1449 
1450 Uint32
pollReceive(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1451 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
1452                                  TransporterReceiveHandle& recvdata)
1453 {
1454   bool sleep_state_set = false;
1455   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1456 
1457   Uint32 retVal = 0;
1458   recvdata.m_recv_transporters.clear();
1459 
1460   /**
1461    * If any transporters have left-over data that was not fully executed in
1462    * last loop, don't wait and return 'data available' even if nothing new
1463    */
1464   if (!recvdata.m_has_data_transporters.isclear())
1465   {
1466     timeOutMillis = 0;
1467     retVal = 1;
1468   }
1469 #ifndef WIN32
1470   if (recvdata.nSHMTransporters > 0)
1471   {
1472     /**
1473      * We start by checking shared memory transporters without
1474      * any mutexes or other protection. If we find something to
1475      * read we will set timeout to 0 and check the TCP transporters
1476      * before returning.
1477      */
1478     bool any_connected = false;
1479     Uint32 res = poll_SHM(recvdata, any_connected);
1480     if(res)
1481     {
1482       retVal |= res;
1483       timeOutMillis = 0;
1484     }
1485     else if (timeOutMillis > 0 && any_connected)
1486     {
1487       /**
1488        * We are preparing to wait for socket events. We will start by
1489        * polling for a configurable amount of microseconds before we
1490        * go to sleep. We will check both shared memory transporters and
1491        * TCP transporters in this period. We will check shared memory
1492        * transporter four times and then check TCP transporters in a
1493        * loop.
1494        *
1495        * After this polling period, if we are still waiting for data
1496        * we will prepare to go to sleep by informing the other side
1497        * that we are going to sleep.
1498        *
1499        * To do this we first grab the mutex used by the sender
1500        * to check for sleep/awake state, next we poll the shared
1501        * memory holding this mutex. If this check also returns
1502        * without finding any data we set the state to sleep,
1503        * release the mutex and go to sleep (on an epoll/poll)
1504        * that can be woken up by incoming data or a wakeup byte
1505        * sent to SHM transporter.
1506        */
1507       res = spin_check_transporters(recvdata);
1508       if (res)
1509       {
1510         retVal |= res;
1511         timeOutMillis = 0;
1512       }
1513       else
1514       {
1515         int res = reset_shm_awake_state(recvdata, sleep_state_set);
1516         if (res || !sleep_state_set)
1517         {
1518           /**
1519            * If sleep_state_set is false here it means that the
1520            * all SHM transporters were disconnected. Alternatively
1521            * there was data available on all the connected SHM
1522            * transporters. Read the data from TCP transporters and
1523            * return.
1524            */
1525           retVal |= 1;
1526           timeOutMillis = 0;
1527         }
1528       }
1529     }
1530   }
1531 #endif
1532   retVal |= check_TCP(recvdata, timeOutMillis);
1533 #ifndef WIN32
1534   if (recvdata.nSHMTransporters > 0)
1535   {
1536     /**
1537      * If any SHM transporter was put to sleep above we will
1538      * set all connected SHM transporters to awake now.
1539      */
1540     if (sleep_state_set)
1541     {
1542       set_shm_awake_state(recvdata);
1543     }
1544     bool any_connected = false;
1545     int res = poll_SHM(recvdata, any_connected);
1546     retVal |= res;
1547   }
1548 #endif
1549   return retVal;
1550 }
1551 
1552 /**
1553  * Every time a SHM transporter is sending data and wants the other side
1554  * to wake up to handle the data, it follows this procedure.
1555  * 1) Write the data in shared memory
1556  * 2) Acquire the mutex protecting the awake flag on the receive side.
1557  * 3) Read flag
1558  * 4) Release mutex
1559  * 5.1) If flag says that receiver is awake we're done
1560  * 5.2) If flag says that receiver is asleep we will send a byte on the
1561  *      transporter socket for the SHM transporter to wake up the other
1562  *      side.
1563  *
1564  * The reset_shm_awake_state is called right before we are going to go
1565  * to sleep. To ensure that we don't miss any signals from the other
1566  * side we will first check if there is data available on shared memory.
1567  * We first grab the mutex before checking this. If no data is available
1568  * we can proceed to go to sleep after setting the flag to indicate that
1569  * we are asleep. The above procedure used when sending means that we
1570  * know that the sender will always know the correct state. The only
1571  * error in this is that the sender might think that we are asleep when
1572  * we actually is still on our way to go to sleep. In this case no harm
1573  * has been done since the only thing that have happened is that one byte
1574  * is sent on the SHM socket that wasn't absolutely needed.
1575  */
1576 int
reset_shm_awake_state(TransporterReceiveHandle & recvdata,bool & sleep_state_set)1577 TransporterRegistry::reset_shm_awake_state(TransporterReceiveHandle& recvdata,
1578                                        bool& sleep_state_set)
1579 {
1580   int res = 0;
1581 #ifndef WIN32
1582   for (Uint32 i = 0; i < recvdata.nSHMTransporters; i++)
1583   {
1584     SHM_Transporter * t = theSHMTransporters[i];
1585     Uint32 node_id = t->getRemoteNodeId();
1586     Uint32 trp_id = t->getTransporterIndex();
1587 
1588     if (!recvdata.m_transporters.get(trp_id))
1589       continue;
1590 
1591     if (t->isConnected())
1592     {
1593       t->lock_mutex();
1594       if (is_connected(node_id))
1595       {
1596         if (t->hasDataToRead())
1597         {
1598           recvdata.m_has_data_transporters.set(trp_id);
1599           res = 1;
1600         }
1601         else
1602         {
1603           sleep_state_set = true;
1604           t->set_awake_state(0);
1605         }
1606       }
1607       t->unlock_mutex();
1608     }
1609   }
1610 #endif
1611   return res;
1612 }
1613 
1614 /**
1615  * We have been sleeping for a while, before proceeding we need to set
1616  * the flag to awake in the shared memory. This will flag to all other
1617  * nodes using shared memory to communicate that we're awake and don't
1618  * need any socket communication to wake up.
1619  */
1620 void
set_shm_awake_state(TransporterReceiveHandle & recvdata)1621 TransporterRegistry::set_shm_awake_state(TransporterReceiveHandle& recvdata)
1622 {
1623 #ifndef WIN32
1624   for (Uint32 i = 0; i < recvdata.nSHMTransporters; i++)
1625   {
1626     SHM_Transporter * t = theSHMTransporters[i];
1627     Uint32 id = t->getTransporterIndex();
1628 
1629     if (!recvdata.m_transporters.get(id))
1630       continue;
1631     if (t->isConnected())
1632     {
1633       t->lock_mutex();
1634       t->set_awake_state(1);
1635       t->unlock_mutex();
1636     }
1637   }
1638 #endif
1639 }
1640 
1641 /**
1642  * We do not want to hold any transporter locks during select(), so there
1643  * is no protection against a disconnect closing the socket during this call.
1644  *
1645  * That does not matter, at most we will get a spurious wakeup on the wrong
1646  * socket, which will be handled correctly in performReceive() (which _is_
1647  * protected by transporter locks on upper layer).
1648  */
1649 Uint32
poll_TCP(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1650 TransporterRegistry::poll_TCP(Uint32 timeOutMillis,
1651                               TransporterReceiveHandle& recvdata)
1652 {
1653   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1654 
1655   recvdata.m_socket_poller.clear();
1656 
1657   const bool extra_socket = m_has_extra_wakeup_socket;
1658   if (extra_socket && recvdata.m_transporters.get(0))
1659   {
1660     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1661     assert(&recvdata == receiveHandle); // not used by ndbmtd...
1662 
1663     // Poll the wakup-socket for read
1664     recvdata.m_socket_poller.add(socket, true, false, false);
1665   }
1666 
1667   Uint16 idx[MAX_NTRANSPORTERS];
1668   Uint32 i = 0;
1669   for (; i < recvdata.nTCPTransporters; i++)
1670   {
1671     TCP_Transporter * t = theTCPTransporters[i];
1672     const NDB_SOCKET_TYPE socket = t->getSocket();
1673     Uint32 node_id = t->getRemoteNodeId();
1674     Uint32 trp_id = t->getTransporterIndex();
1675 
1676     idx[i] = maxTransporters + 1;
1677     if (!recvdata.m_transporters.get(trp_id))
1678       continue;
1679 
1680     if (is_connected(node_id) && t->isConnected() && ndb_socket_valid(socket))
1681     {
1682       idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
1683     }
1684   }
1685 
1686 #ifndef WIN32
1687   for (Uint32 j = 0; j < recvdata.nSHMTransporters; j++)
1688   {
1689     /**
1690      * We need to listen to socket also for shared memory transporters.
1691      * These sockets are used as a wakeup mechanism, so we're not sending
1692      * any data in it. But we need a socket to be able to wake up things
1693      * when the receiver is not awake and we've only sent data on shared
1694      * memory transporter.
1695      */
1696     SHM_Transporter * t = theSHMTransporters[j];
1697     const NDB_SOCKET_TYPE socket = t->getSocket();
1698     Uint32 node_id = t->getRemoteNodeId();
1699     Uint32 trp_id = t->getTransporterIndex();
1700     idx[i] = maxTransporters + 1;
1701     if (!recvdata.m_transporters.get(trp_id))
1702     {
1703       i++;
1704       continue;
1705     }
1706     if (is_connected(node_id) && t->isConnected() && ndb_socket_valid(socket))
1707     {
1708       idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
1709     }
1710     i++;
1711   }
1712 #endif
1713   int tcpReadSelectReply = recvdata.m_socket_poller.poll_unsafe(timeOutMillis);
1714 
1715   if (tcpReadSelectReply > 0)
1716   {
1717     if (extra_socket)
1718     {
1719       if (recvdata.m_socket_poller.has_read(0))
1720       {
1721         assert(recvdata.m_transporters.get(0));
1722         recvdata.m_recv_transporters.set((Uint32)0);
1723       }
1724     }
1725     i = 0;
1726     for (; i < recvdata.nTCPTransporters; i++)
1727     {
1728       TCP_Transporter * t = theTCPTransporters[i];
1729       if (idx[i] != maxTransporters + 1)
1730       {
1731         Uint32 trp_id = t->getTransporterIndex();
1732         if (recvdata.m_socket_poller.has_read(idx[i]))
1733         {
1734           recvdata.m_recv_transporters.set(trp_id);
1735         }
1736       }
1737     }
1738 #ifndef WIN32
1739     for (Uint32 j = 0; j < recvdata.nSHMTransporters; j++)
1740     {
1741       /**
1742        * If a shared memory transporter have data on its socket we
1743        * will get it now, the data is only an indication for us to
1744        * wake up, so we're not interested in the data as such.
1745        * But to integrate it with epoll handling we will read it
1746        * in performReceive still.
1747        */
1748       SHM_Transporter * t = theSHMTransporters[j];
1749       if (idx[i] != maxTransporters + 1)
1750       {
1751         Uint32 trp_id = t->getTransporterIndex();
1752         if (recvdata.m_socket_poller.has_read(idx[i]))
1753           recvdata.m_recv_transporters.set(trp_id);
1754       }
1755       i++;
1756     }
1757 #endif
1758   }
1759 
1760   return tcpReadSelectReply;
1761 }
1762 
1763 void
set_recv_thread_idx(Transporter * t,Uint32 recv_thread_idx)1764 TransporterRegistry::set_recv_thread_idx(Transporter *t,
1765                                          Uint32 recv_thread_idx)
1766 {
1767   t->set_recv_thread_idx(recv_thread_idx);
1768 }
1769 
1770 /**
1771  * Receive from the set of transporters in the bitmask
1772  * 'recvdata.m_transporters'. These has been polled by
1773  * ::pollReceive() which recorded transporters with
1774  * available data in the subset 'recvdata.m_recv_transporters'.
1775  *
1776  * In multi-threaded datanodes, there might be multiple
1777  * receiver threads, each serving a disjunct set of
1778  * 'm_transporters'.
1779  *
1780  * Single-threaded datanodes does all ::performReceive
1781  * from the scheduler main-loop, and thus it will handle
1782  * all 'm_transporters'.
1783  *
1784  * Clients has to aquire a 'poll right' (see TransporterFacade)
1785  * which gives it the right to temporarily acts as a receive
1786  * thread with the right to poll *all* transporters.
1787  *
1788  * Reception takes place on a set of transporters knowing to be in a
1789  * 'CONNECTED' state. Transporters can (asynch) become 'DISCONNECTING'
1790  * while we performReceive(). There is *no* mutex lock protecting
1791  * 'disconnecting' from being started while we are in the receive-loop!
1792  * However, the contents of the buffers++  should still be in a
1793  * consistent state, such that the current receive can complete
1794  * without failures.
1795  *
1796  * With regular intervals we have to ::update_connections()
1797  * in order to bring DISCONNECTING transporters into
1798  * a DISCONNECTED state. At earlies at this point, resources
1799  * used by performReceive() may be reset or released.
1800  * A transporter should be brought to the DISCONNECTED state
1801  * before it can reconnect again. (Note: There is a break of
1802  * this rule in ::do_connect, see own note here)
1803  *
1804  * To not interfere with ::poll- or ::performReceive(),
1805  * ::update_connections() has to be synched with with these
1806  * methods. Either by being run within the same
1807  * receive thread (dataNodes), or protected by the 'poll rights'.
1808  *
1809  * In case we were unable to receive due to job buffers being full.
1810  * Returns 0 when receive succeeded from all Transporters having data,
1811  * else 1.
1812  */
1813 Uint32
performReceive(TransporterReceiveHandle & recvdata,Uint32 recv_thread_idx)1814 TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata,
1815                                     Uint32 recv_thread_idx)
1816 {
1817   (void)recv_thread_idx;
1818   TransporterReceiveWatchdog guard(recvdata);
1819   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1820   bool stopReceiving = false;
1821 
1822   if (recvdata.m_recv_transporters.get(0))
1823   {
1824     assert(recvdata.m_transporters.get(0));
1825     assert(&recvdata == receiveHandle); // not used by ndbmtd
1826     recvdata.m_recv_transporters.clear(Uint32(0));
1827     consume_extra_sockets();
1828   }
1829 
1830 #ifdef ERROR_INSERT
1831   if (!m_blocked_trp.isclear())
1832   {
1833     /* Exclude receive from blocked sockets. */
1834     recvdata.m_recv_transporters.bitANDC(m_blocked_trp);
1835 
1836     if (recvdata.m_recv_transporters.isclear()  &&
1837         recvdata.m_has_data_transporters.isclear())
1838     {
1839         /* poll sees data, but we want to ignore for now
1840          * sleep a little to avoid busy loop
1841          */
1842       NdbSleep_MilliSleep(1);
1843     }
1844   }
1845 #endif
1846 
1847   /**
1848    * m_recv_transporters set indicates that there might be data
1849    * available on the socket used by the transporter. The
1850    * doReceive call will read the socket. For TCP transporters
1851    * the doReceive call will return an indication if there is
1852    * data to receive on socket. This will set m_has_data_transporters.
1853    * For SHM transporter the socket is only used to send wakeup
1854    * bytes. The m_has_data_transporters bitmap was set already in
1855    * pollReceive for SHM transporters.
1856    */
1857   for(Uint32 trp_id = recvdata.m_recv_transporters.find_first();
1858       trp_id != BitmaskImpl::NotFound;
1859       trp_id = recvdata.m_recv_transporters.find_next(trp_id + 1))
1860   {
1861     Transporter *transp = allTransporters[trp_id];
1862     NodeId node_id = transp->getRemoteNodeId();
1863     if (transp->getTransporterType() == tt_TCP_TRANSPORTER)
1864     {
1865       TCP_Transporter * t = (TCP_Transporter*)transp;
1866       assert(recvdata.m_transporters.get(trp_id));
1867       assert(recv_thread_idx == transp->get_recv_thread_idx());
1868 
1869       /**
1870        * First check connection 'is CONNECTED.
1871        * A connection can only be set into, or taken out of, is_connected'
1872        * state by ::update_connections(). See comment there about
1873        * synchronication between ::update_connections() and
1874        * performReceive()
1875        *
1876        * Transporter::isConnected() state my change asynch.
1877        * A mismatch between the TransporterRegistry::is_connected(),
1878        * and Transporter::isConnected() state is possible, and indicate
1879        * that a change is underway. (Completed by update_connections())
1880        */
1881       if (is_connected(node_id))
1882       {
1883         if (t->isConnected())
1884         {
1885           int nBytes = t->doReceive(recvdata);
1886           if (nBytes > 0)
1887           {
1888             recvdata.transporter_recv_from(node_id);
1889             recvdata.m_has_data_transporters.set(trp_id);
1890           }
1891         }
1892       }
1893     }
1894     else
1895     {
1896 #ifndef WIN32
1897       require(transp->getTransporterType() == tt_SHM_TRANSPORTER);
1898       SHM_Transporter * t = (SHM_Transporter*)transp;
1899       assert(recvdata.m_transporters.get(trp_id));
1900       if (is_connected(node_id) && t->isConnected())
1901       {
1902         t->doReceive();
1903         /**
1904          * Ignore any data we read, the data wasn't collected by the
1905          * shared memory transporter, it was simply read and thrown
1906          * away, it is only a wakeup call to send data over the socket
1907          * for shared memory transporters.
1908          */
1909       }
1910 #else
1911       require(false);
1912 #endif
1913     }
1914   }
1915   recvdata.m_recv_transporters.clear();
1916 
1917   /**
1918    * Unpack data either received above or pending from prev rounds.
1919    * For the Shared memory transporter m_has_data_transporters can
1920    * be set in pollReceive as well.
1921    *
1922    * TCP Transporter
1923    * ---------------
1924    * Data to be processed at this stage is in the Transporter
1925    * receivebuffer. The data *is received*, and will stay in
1926    * the  receiveBuffer even if a disconnect is started during
1927    * unpack.
1928    * When ::update_connection() finaly completes the disconnect,
1929    * (synced with ::performReceive()), 'm_has_data_transporters'
1930    * will be cleared, which will terminate further unpacking.
1931    *
1932    * NOTE:
1933    *  Without reading inconsistent date, we could have removed
1934    *  the 'connected' checks below, However, there is a requirement
1935    *  in the CLOSE_COMREQ/CONF protocol between TRPMAN and QMGR
1936    *  that no signals arrives from disconnecting nodes after a
1937    *  CLOSE_COMCONF was sent. For the moment the risk of taking
1938    *  advantage of this small optimization is not worth the risk.
1939    */
1940   Uint32 trp_id = recvdata.m_last_trp_id;
1941   while ((trp_id = recvdata.m_has_data_transporters.find_next(trp_id + 1)) !=
1942 	 BitmaskImpl::NotFound)
1943   {
1944     bool hasdata = false;
1945     Transporter * t = (Transporter*)allTransporters[trp_id];
1946     NodeId node_id = t->getRemoteNodeId();
1947 
1948     assert(recvdata.m_transporters.get(trp_id));
1949 
1950     if (is_connected(node_id))
1951     {
1952       if (t->isConnected())
1953       {
1954         if (unlikely(recvdata.checkJobBuffer()))
1955         {
1956           return 1;     // Full, can't unpack more
1957         }
1958         if (unlikely(recvdata.m_handled_transporters.get(trp_id)))
1959           continue;     // Skip now to avoid starvation
1960         if (t->getTransporterType() == tt_TCP_TRANSPORTER)
1961         {
1962           TCP_Transporter *t_tcp = (TCP_Transporter*)t;
1963           Uint32 * ptr;
1964           Uint32 sz = t_tcp->getReceiveData(&ptr);
1965           Uint32 szUsed = unpack(recvdata,
1966                                  ptr,
1967                                  sz,
1968                                  node_id,
1969                                  ioStates[node_id],
1970                                  stopReceiving);
1971           if (likely(szUsed))
1972           {
1973             assert(recv_thread_idx == t_tcp->get_recv_thread_idx());
1974             t_tcp->updateReceiveDataPtr(szUsed);
1975             hasdata = t_tcp->hasReceiveData();
1976           }
1977         }
1978         else
1979         {
1980 #ifndef WIN32
1981           require(t->getTransporterType() == tt_SHM_TRANSPORTER);
1982           SHM_Transporter *t_shm = (SHM_Transporter*)t;
1983           Uint32 * readPtr, * eodPtr, * endPtr;
1984           t_shm->getReceivePtr(&readPtr, &eodPtr, &endPtr);
1985           recvdata.transporter_recv_from(node_id);
1986           Uint32 *newPtr = unpack(recvdata,
1987                                   readPtr,
1988                                   eodPtr,
1989                                   endPtr,
1990                                   node_id,
1991                                   ioStates[node_id],
1992 				  stopReceiving);
1993           t_shm->updateReceivePtr(recvdata, newPtr);
1994           /**
1995            * Set hasdata dependent on if data is still available in
1996            * transporter to ensure we follow rules about setting
1997            * m_has_data_transporters and m_handled_transporters
1998            * when returning from performReceive.
1999            */
2000           hasdata = t_shm->hasDataToRead();
2001 #else
2002           require(false);
2003 #endif
2004         }
2005         // else, we didn't unpack anything:
2006         //   Avail ReceiveData to short to be useful, need to
2007         //   receive more before we can resume this transporter.
2008       }
2009     }
2010     // If transporter still have data, make sure that it's remember to next time
2011     recvdata.m_has_data_transporters.set(trp_id, hasdata);
2012     recvdata.m_handled_transporters.set(trp_id, hasdata);
2013 
2014     if (unlikely(stopReceiving))
2015     {
2016       recvdata.m_last_trp_id = trp_id;  //Resume from node after 'last_node'
2017       return 1;
2018     }
2019   }
2020   recvdata.m_handled_transporters.clear();
2021   recvdata.m_last_trp_id = 0;
2022   return 0;
2023 }
2024 
2025 void
consume_extra_sockets()2026 TransporterRegistry::consume_extra_sockets()
2027 {
2028   char buf[4096];
2029   ssize_t ret;
2030   int err;
2031   NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
2032   do
2033   {
2034     ret = ndb_recv(sock, buf, sizeof(buf), 0);
2035     err = ndb_socket_errno();
2036   } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
2037 
2038   /* Notify upper layer of explicit wakeup */
2039   callbackObj->reportWakeup();
2040 }
2041 
2042 /**
2043  * performSend() - Call physical transporters to 'doSend'
2044  * of previously prepareSend() signals.
2045  *
2046  * The doSend() implementations will call
2047  * TransporterCallback::get_bytes_to_send_iovec() to fetch
2048  * any available data from the send buffer.
2049  *
2050  * *This* ^^ is the synch point where we under mutex protection
2051  * may check for specific nodes being disconnected/disabled.
2052  * For disabled nodes we may drain the send buffers instead of
2053  * returning anything from get_bytes_to_send_iovec().
2054  * Also see comments for prepareSend() above.
2055  *
2056  * Note that since disconnection may happen asynch from other
2057  * threads, we can not reliably check the 'connected' state
2058  * before doSend(). Instead we must require that the
2059  * TransporterCallback implementation provide necessary locking
2060  * of get_bytes_to_send() vs enable/disable of send buffers.
2061  *
2062  * Returns:
2063  *   true if anything still remains to be sent.
2064  *   Will require another ::performSend()
2065  *
2066  *   false: if nothing more remains, either due to
2067  *   the send buffers being empty, we succeeded
2068  *   sending everything, or we found the node to be
2069  *   disconnected and thus discarded the contents.
2070  */
2071 bool
performSendNode(NodeId nodeId,bool need_wakeup)2072 TransporterRegistry::performSendNode(NodeId nodeId, bool need_wakeup)
2073 {
2074   Transporter *t = get_node_transporter(nodeId);
2075   if (t != NULL)
2076   {
2077 #ifdef ERROR_INSERT
2078     if (m_sendBlocked.get(nodeId))
2079     {
2080       return true;
2081     }
2082 #endif
2083     return t->doSend(need_wakeup);
2084   }
2085   return false;
2086 }
2087 
2088 bool
performSend(TrpId trp_id,bool need_wakeup)2089 TransporterRegistry::performSend(TrpId trp_id, bool need_wakeup)
2090 {
2091   Transporter *t = get_transporter(trp_id);
2092   if (t != NULL)
2093   {
2094 #ifdef ERROR_INSERT
2095     NodeId nodeId = t->getRemoteNodeId();
2096     if (m_sendBlocked.get(nodeId))
2097     {
2098       return true;
2099     }
2100 #endif
2101     return t->doSend(need_wakeup);
2102   }
2103   return false;
2104 }
2105 
2106 void
performSend()2107 TransporterRegistry::performSend()
2108 {
2109   Uint32 i;
2110   sendCounter = 1;
2111 
2112   lockMultiTransporters();
2113   for (i = m_transp_count; i < (nTransporters + 1); i++)
2114   {
2115     Transporter *t = allTransporters[i];
2116     if (t != NULL
2117 #ifdef ERROR_INSERT
2118         && !m_sendBlocked.get(t->getRemoteNodeId())
2119 #endif
2120         )
2121     {
2122       t->doSend();
2123     }
2124   }
2125   for (i = 1; i < m_transp_count && i < (nTransporters + 1); i++)
2126   {
2127     Transporter *t = allTransporters[i];
2128     if (t != NULL
2129 #ifdef ERROR_INSERT
2130         && !m_sendBlocked.get(t->getRemoteNodeId())
2131 #endif
2132         )
2133     {
2134       t->doSend();
2135     }
2136   }
2137   m_transp_count++;
2138   if (m_transp_count == (nTransporters + 1))
2139     m_transp_count = 1;
2140   unlockMultiTransporters();
2141 }
2142 
2143 #ifdef DEBUG_TRANSPORTER
2144 void
printState()2145 TransporterRegistry::printState()
2146 {
2147   ndbout << "-- TransporterRegistry -- " << endl << endl
2148          << "Transporters = " << nTransporters << endl;
2149   for(Uint32 i = 0; i < maxTransporters; i++)
2150   {
2151     if (allTransporters[i] != NULL)
2152     {
2153       const NodeId remoteNodeId = allTransporters[i]->getRemoteNodeId();
2154       ndbout << "Transporter: " << remoteNodeId
2155              << " PerformState: " << performStates[remoteNodeId]
2156              << " IOState: " << ioStates[remoteNodeId] << endl;
2157     }
2158   }
2159 }
2160 #else
2161 void
printState()2162 TransporterRegistry::printState()
2163 {
2164 }
2165 #endif
2166 
2167 #ifdef ERROR_INSERT
2168 bool
isBlocked(NodeId nodeId)2169 TransporterRegistry::isBlocked(NodeId nodeId)
2170 {
2171   return m_blocked.get(nodeId);
2172 }
2173 
2174 void
blockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)2175 TransporterRegistry::blockReceive(TransporterReceiveHandle& recvdata,
2176                                   NodeId nodeId)
2177 {
2178   TrpId ids[MAX_NODE_GROUP_TRANSPORTERS];
2179   Uint32 num_ids;
2180   lockMultiTransporters();
2181   get_trps_for_node(nodeId, ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2182 
2183   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2184   for (Uint32 i = 0; i < num_ids; i++)
2185   {
2186     Uint32 trp_id = ids[i];
2187     if (recvdata.m_transporters.get(trp_id))
2188       m_blocked_trp.set(trp_id);
2189   }
2190   /* Check that node is not already blocked?
2191    * Stop pulling from its socket (but track received data etc)
2192    */
2193   /* Shouldn't already be blocked with data */
2194   bool last_call = true;
2195   for (Uint32 i = 0; i < num_ids; i++)
2196   {
2197     Uint32 trp_id = ids[i];
2198     if (!m_blocked_trp.get(trp_id))
2199       last_call = false;
2200   }
2201   if (last_call)
2202     m_blocked.set(nodeId);
2203   unlockMultiTransporters();
2204 }
2205 
2206 void
unblockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)2207 TransporterRegistry::unblockReceive(TransporterReceiveHandle& recvdata,
2208                                     NodeId nodeId)
2209 {
2210   TrpId ids[MAX_NODE_GROUP_TRANSPORTERS];
2211   Uint32 num_ids;
2212   lockMultiTransporters();
2213   get_trps_for_node(nodeId, ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2214 
2215   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2216   for (Uint32 i = 0; i < num_ids; i++)
2217   {
2218     Uint32 trp_id = ids[i];
2219     if (recvdata.m_transporters.get(trp_id))
2220     {
2221       assert(m_blocked_trp.get(trp_id));
2222       assert(!recvdata.m_has_data_transporters.get(trp_id));
2223       m_blocked_trp.clear(trp_id);
2224     }
2225   }
2226   bool last_call = true;
2227   for (Uint32 i = 0; i < num_ids; i++)
2228   {
2229     Uint32 trp_id = ids[i];
2230     if (m_blocked_trp.get(trp_id))
2231     {
2232       last_call = false;
2233     }
2234   }
2235 
2236   /* Check that node is blocked?
2237    * Resume pulling from its socket
2238    * Ensure in-flight data is processed if there was some
2239    */
2240   if (last_call)
2241     m_blocked.clear(nodeId);
2242   unlockMultiTransporters();
2243   if (m_blocked_disconnected.get(nodeId))
2244   {
2245     /* Process disconnect notification/handling now */
2246     report_disconnect(recvdata, nodeId, m_disconnect_errors[nodeId]);
2247   }
2248   if (last_call)
2249   {
2250     lockMultiTransporters();
2251     m_blocked_disconnected.clear(nodeId);
2252     unlockMultiTransporters();
2253   }
2254 }
2255 
2256 bool
isSendBlocked(NodeId nodeId) const2257 TransporterRegistry::isSendBlocked(NodeId nodeId) const
2258 {
2259   return m_sendBlocked.get(nodeId);
2260 }
2261 
2262 void
blockSend(TransporterReceiveHandle & recvdata,NodeId nodeId)2263 TransporterRegistry::blockSend(TransporterReceiveHandle& recvdata,
2264                                NodeId nodeId)
2265 {
2266 #ifdef VM_TRACE
2267   TrpId trp_ids[MAX_NODE_GROUP_TRANSPORTERS];
2268   Uint32 num_ids;
2269   lockMultiTransporters();
2270   get_trps_for_node(nodeId, trp_ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2271   unlockMultiTransporters();
2272 #endif
2273   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2274   m_sendBlocked.set(nodeId);
2275 }
2276 
2277 void
unblockSend(TransporterReceiveHandle & recvdata,NodeId nodeId)2278 TransporterRegistry::unblockSend(TransporterReceiveHandle& recvdata,
2279                                  NodeId nodeId)
2280 {
2281 #ifdef VM_TRACE
2282   TrpId trp_ids[MAX_NODE_GROUP_TRANSPORTERS];
2283   Uint32 num_ids;
2284   lockMultiTransporters();
2285   get_trps_for_node(nodeId, trp_ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2286   unlockMultiTransporters();
2287 #endif
2288   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2289   m_sendBlocked.clear(nodeId);
2290 }
2291 
2292 #endif
2293 
2294 #ifdef ERROR_INSERT
2295 Uint32
getMixologyLevel() const2296 TransporterRegistry::getMixologyLevel() const
2297 {
2298   return m_mixology_level;
2299 }
2300 
2301 extern Uint32 MAX_RECEIVED_SIGNALS;  /* Packer.cpp */
2302 
2303 #define MIXOLOGY_MIX_INCOMING_SIGNALS 4
2304 
2305 void
setMixologyLevel(Uint32 l)2306 TransporterRegistry::setMixologyLevel(Uint32 l)
2307 {
2308   m_mixology_level = l;
2309 
2310   if (m_mixology_level & MIXOLOGY_MIX_INCOMING_SIGNALS)
2311   {
2312     ndbout_c("MIXOLOGY_MIX_INCOMING_SIGNALS on");
2313     /* Max one signal per transporter */
2314     MAX_RECEIVED_SIGNALS = 1;
2315   }
2316 
2317   /* TODO : Add mixing of Send from NdbApi / MGMD */
2318 }
2319 #endif
2320 
2321 IOState
ioState(NodeId nodeId) const2322 TransporterRegistry::ioState(NodeId nodeId) const {
2323   return ioStates[nodeId];
2324 }
2325 
2326 void
setIOState(NodeId nodeId,IOState state)2327 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
2328   if (ioStates[nodeId] == state)
2329     return;
2330 
2331   DEBUG("TransporterRegistry::setIOState("
2332         << nodeId << ", " << state << ")");
2333 
2334   ioStates[nodeId] = state;
2335 }
2336 
2337 extern "C" void *
run_start_clients_C(void * me)2338 run_start_clients_C(void * me)
2339 {
2340   ((TransporterRegistry*) me)->start_clients_thread();
2341   return 0;
2342 }
2343 
2344 /**
2345  * This method is used to initiate connection, called from the TRPMAN block.
2346  *
2347  * This works asynchronously, no actions are taken directly in the calling
2348  * thread.
2349  */
2350 void
do_connect(NodeId node_id)2351 TransporterRegistry::do_connect(NodeId node_id)
2352 {
2353   PerformState &curr_state = performStates[node_id];
2354   switch(curr_state){
2355   case DISCONNECTED:
2356     break;
2357   case CONNECTED:
2358     return;
2359   case CONNECTING:
2360     return;
2361   case DISCONNECTING:
2362     /**
2363      * NOTE (Need future work)
2364      * Going directly from DISCONNECTING to CONNECTING creates
2365      * a possible race with ::update_connections(): It will
2366      * see either of the *ING states, and bring the connection
2367      * into CONNECTED or *DISCONNECTED* state. Furthermore, the
2368      * state may be overwritten to CONNECTING by this method.
2369      * We should probably have waited for DISCONNECTED state,
2370      * before allowing reCONNECTING ....
2371      */
2372     assert(false);
2373     break;
2374   }
2375   DEBUG_FPRINTF((stderr, "(%u)REG:do_connect(%u)\n", localNodeId, node_id));
2376   DBUG_ENTER("TransporterRegistry::do_connect");
2377   DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
2378 
2379   Transporter * t = theNodeIdTransporters[node_id];
2380   if (t != NULL)
2381   {
2382     if (t->isMultiTransporter())
2383     {
2384       Multi_Transporter *multi_trp = (Multi_Transporter*)t;
2385       require(multi_trp->get_num_active_transporters() == 1);
2386       t = multi_trp->get_active_transporter(0);
2387     }
2388     require(!t->isPartOfMultiTransporter());
2389     require(!t->isMultiTransporter());
2390     DEBUG_FPRINTF((stderr, "(%u)REG:resetBuffers(%u)\n",
2391                            localNodeId, node_id));
2392     t->resetBuffers();
2393   }
2394 
2395   DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = CONNECTING\n",
2396                  localNodeId, node_id));
2397   curr_state= CONNECTING;
2398   DBUG_VOID_RETURN;
2399 }
2400 
2401 /**
2402  * This method is used to initiate disconnect from TRPMAN. It is also called
2403  * from the TCP/SHM transporter in case of an I/O error on the socket.
2404  *
2405  * This works asynchronously, similar to do_connect().
2406  */
2407 bool
do_disconnect(NodeId node_id,int errnum,bool send_source)2408 TransporterRegistry::do_disconnect(NodeId node_id,
2409                                    int errnum,
2410                                    bool send_source)
2411 {
2412   DEBUG_FPRINTF((stderr, "(%u)REG:do_disconnect(%u, %d)\n",
2413                          localNodeId, node_id, errnum));
2414   PerformState &curr_state = performStates[node_id];
2415   switch(curr_state){
2416   case DISCONNECTED:
2417   {
2418     return true;
2419   }
2420   case CONNECTED:
2421   {
2422     break;
2423   }
2424   case CONNECTING:
2425     /**
2426      * This is a correct transition. But it should only occur for nodes
2427      * that lack resources, e.g. lack of shared memory resources to
2428      * setup the transporter. Therefore we assert here to get a simple
2429      * handling of test failures such that we can fix the test config.
2430      */
2431     //DBUG_ASSERT(false);
2432     break;
2433   case DISCONNECTING:
2434   {
2435     return true;
2436   }
2437   }
2438   if (errnum == ENOENT)
2439   {
2440     m_disconnect_enomem_error[node_id]++;
2441     if (m_disconnect_enomem_error[node_id] < 10)
2442     {
2443       NdbSleep_MilliSleep(40);
2444       g_eventLogger->info("Socket error %d on nodeId: %u in state: %u",
2445                           errnum, node_id, (Uint32)curr_state);
2446       return false;
2447     }
2448   }
2449   if (errnum == 0)
2450   {
2451     g_eventLogger->info("Node %u disconnected in state: %d",
2452                         node_id, (int)curr_state);
2453   }
2454   else
2455   {
2456     g_eventLogger->info("Node %u disconnected in %s with errnum: %d"
2457                         " in state: %d",
2458                         node_id,
2459                         send_source ? "send" : "recv",
2460                         errnum,
2461                         (int)curr_state);
2462   }
2463   DBUG_ENTER("TransporterRegistry::do_disconnect");
2464   DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
2465   curr_state= DISCONNECTING;
2466   DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTING\n",
2467                  localNodeId, node_id));
2468   m_disconnect_errnum[node_id] = errnum;
2469   DBUG_RETURN(false);
2470 }
2471 
2472 /**
2473  * report_connect() / report_disconnect()
2474  *
2475  * Connect or disconnect the 'TransporterReceiveHandle' and
2476  * enable/disable the send buffers.
2477  *
2478  * To prevent races wrt poll/receive of data, these methods must
2479  * either be called from the same (receive-)thread as performReceive(),
2480  * or by the (API) client holding the poll-right.
2481  *
2482  * The send buffers needs similar protection against concurent
2483  * enable/disable of the same send buffers. Thus the sender
2484  * side is also handled here.
2485  */
2486 void
report_connect(TransporterReceiveHandle & recvdata,NodeId node_id)2487 TransporterRegistry::report_connect(TransporterReceiveHandle& recvdata,
2488                                     NodeId node_id)
2489 {
2490   Transporter *t = theNodeIdTransporters[node_id];
2491   if (t->isMultiTransporter())
2492   {
2493     Multi_Transporter *multi_trp = (Multi_Transporter*)t;
2494     require(multi_trp->get_num_active_transporters() == 1);
2495     t = multi_trp->get_active_transporter(0);
2496   }
2497   require(!t->isMultiTransporter());
2498   require(!t->isPartOfMultiTransporter());
2499   Uint32 id = t->getTransporterIndex();
2500   DEBUG_FPRINTF((stderr, "(%u)REG:report_connect(%u)\n",
2501                          localNodeId, node_id));
2502   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2503   assert(recvdata.m_transporters.get(id));
2504 
2505   DBUG_ENTER("TransporterRegistry::report_connect");
2506   DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
2507 
2508   if (recvdata.epoll_add(t))
2509   {
2510     callbackObj->enable_send_buffer(node_id, id);
2511     DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = CONNECTED\n",
2512                    localNodeId, node_id));
2513     performStates[node_id] = CONNECTED;
2514     recvdata.reportConnect(node_id);
2515     DBUG_VOID_RETURN;
2516   }
2517 
2518   /**
2519    * Failed to add to epoll_set...
2520    *   disconnect it (this is really really bad)
2521    */
2522   DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTING\n",
2523                  localNodeId, node_id));
2524   performStates[node_id] = DISCONNECTING;
2525   DBUG_VOID_RETURN;
2526 }
2527 
2528 Multi_Transporter*
get_node_multi_transporter(NodeId node_id)2529 TransporterRegistry::get_node_multi_transporter(NodeId node_id)
2530 {
2531   Uint32 i;
2532   Uint32 num_multi_transporters = get_num_multi_transporters();
2533   for (i = 0; i < num_multi_transporters; i++)
2534   {
2535     Multi_Transporter *multi_trp = get_multi_transporter(i);
2536     if (multi_trp->getRemoteNodeId() == node_id)
2537     {
2538       return multi_trp;
2539     }
2540   }
2541   return (Multi_Transporter*)0;
2542 
2543 }
2544 
2545 void
report_disconnect(TransporterReceiveHandle & recvdata,NodeId node_id,int errnum)2546 TransporterRegistry::report_disconnect(TransporterReceiveHandle& recvdata,
2547                                        NodeId node_id, int errnum)
2548 {
2549   DEBUG_FPRINTF((stderr, "(%u)REG:report_disconnect(%u, %d)\n",
2550                          localNodeId, node_id, errnum));
2551   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2552 
2553   DBUG_ENTER("TransporterRegistry::report_disconnect");
2554   DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
2555 
2556 #ifdef ERROR_INSERT
2557   if (m_blocked.get(node_id))
2558   {
2559     /* We are simulating real latency, so control events experience
2560      * it too
2561      */
2562     m_blocked_disconnected.set(node_id);
2563     m_disconnect_errors[node_id] = errnum;
2564     DBUG_VOID_RETURN;
2565   }
2566 #endif
2567 
2568   /**
2569    * No one else should be using the transporter now,
2570    * reset its send buffer and recvdata.
2571    *
2572    * Note that we may 'do_disconnect' due to transporter failure,
2573    * while trying to 'CONNECTING'. This cause a transition
2574    * from CONNECTING to DISCONNECTING without first being CONNECTED.
2575    * Thus there can be multiple reset & disable of the buffers (below)
2576    * without being 'enabled' inbetween.
2577    */
2578   TrpId trp_ids[MAX_NODE_GROUP_TRANSPORTERS];
2579   Uint32 num_ids;
2580   lockMultiTransporters();
2581   get_trps_for_node(node_id, trp_ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2582 
2583   bool ready_to_disconnect = true;
2584   Transporter *node_trp = theNodeIdTransporters[node_id];
2585   for (Uint32 i = 0; i < num_ids; i++)
2586   {
2587     Uint32 trp_id = trp_ids[i];
2588     DEBUG_FPRINTF((stderr, "trp_id = %u, node_id = %u\n", trp_id, node_id));
2589     if (recvdata.m_transporters.get(trp_id))
2590     {
2591       /**
2592        * disable_send_buffer ensures that no more signals will be sent
2593        * to the disconnected node. Every time we collect data for sending
2594        * we will check the send buffer enabled flag holding the m_send_lock
2595        * thereby ensuring that after the disable_send_buffer method is
2596        * called no more signals are sent.
2597        */
2598       callbackObj->disable_send_buffer(node_id, trp_id);
2599       recvdata.m_recv_transporters.clear(trp_id);
2600       recvdata.m_has_data_transporters.clear(trp_id);
2601       recvdata.m_handled_transporters.clear(trp_id);
2602     }
2603     else
2604     {
2605       /**
2606        * Transporter is handled by another receive thread. If the
2607        * transporter is still in the allTransporters array it means
2608        * that we haven't called report_disconnect in this receive
2609        * thread yet. Thus we are not yet ready to disconnect.
2610        */
2611       require(node_trp->isMultiTransporter());
2612       if (allTransporters[trp_id] != 0)
2613       {
2614         ready_to_disconnect = false;
2615         DEBUG_FPRINTF((stderr, "trp_id = %u, node_id = %u,"
2616                                " ready_to_disconnect = false\n",
2617                                trp_id, node_id));
2618       }
2619     }
2620   }
2621   if (node_trp->isMultiTransporter())
2622   {
2623     /**
2624      * Switch back to having only one active transporter which is the
2625      * original TCP or SHM transporter. We only handle the single
2626      * transporter case when setting up a new connection. We can
2627      * only move from single transporter to multi transporter when
2628      * the transporter is connected.
2629      */
2630     Multi_Transporter *multi_trp = (Multi_Transporter*)node_trp;
2631     for (Uint32 i = 0; i < num_ids; i++)
2632     {
2633       Uint32 trp_id = trp_ids[i];
2634       if (recvdata.m_transporters.get(trp_id))
2635       {
2636         /**
2637          * Remove the transporter from the set of transporters handled in
2638          * this receive thread.
2639          *
2640          * Removing it from this bitmask means that the receive thread
2641          * will ignore this transporter when receiving and polling.
2642          */
2643         DEBUG_FPRINTF((stderr, "trp_id = %u, node_id = %u,"
2644                                " multi remove_all\n",
2645                                trp_id, node_id));
2646         Transporter *t = multi_trp->get_active_transporter(i);
2647         t->doDisconnect();
2648         if (t->isPartOfMultiTransporter())
2649         {
2650           require(num_ids > 1);
2651           remove_allTransporters(t);
2652         }
2653         else
2654         {
2655           require(num_ids == 1);
2656           Uint32 num_multi_trps = multi_trp->get_num_inactive_transporters();
2657           for (Uint32 i = 0; i < num_multi_trps; i++)
2658           {
2659             Transporter *remove_trp = multi_trp->get_inactive_transporter(i);
2660             TrpId remove_trp_id = remove_trp->getTransporterIndex();
2661             if (remove_trp_id != 0)
2662             {
2663               NodeId remove_node_id = remove_trp->getRemoteNodeId();
2664               require(node_id == remove_node_id);
2665               callbackObj->disable_send_buffer(node_id, remove_trp_id);
2666               remove_trp->doDisconnect();
2667               remove_allTransporters(remove_trp);
2668             }
2669           }
2670         }
2671       }
2672     }
2673     if (ready_to_disconnect)
2674     {
2675       /**
2676        * All multi transporter objects for this node have been removed
2677        * from being handled by any receive thread. We set the single
2678        * transporter used for setup of connections to be handled by
2679        * this receive thread. This assignment means that we can get
2680        * an unbalanced load on receive threads. However normally the
2681        * transporters should return to using multiple transporters as
2682        * soon as the connection is setup again. So this imbalance should
2683        * be temporary.
2684        */
2685       DEBUG_FPRINTF((stderr, "node_id = %u, new_node_trp\n",
2686                      node_id));
2687       if (multi_trp->get_num_active_transporters() > 1)
2688       {
2689         /**
2690          * Multi socket setup is in use, thus switch to single transporter.
2691          * In rare situations we can still have data to send on base
2692          * transporter, so need to remove the send buffer before
2693          * disconnecting. Also ensure that connection is dropped here.
2694          */
2695         multi_trp->switch_active_trp();
2696         Transporter *base_trp = multi_trp->get_active_transporter(0);
2697         NodeId base_node_id = base_trp->getRemoteNodeId();
2698         TrpId base_trp_id = base_trp->getTransporterIndex();
2699         require(base_node_id == node_id);
2700         callbackObj->disable_send_buffer(node_id, base_trp_id);
2701         base_trp->doDisconnect();
2702       }
2703     }
2704   }
2705   else
2706   {
2707     Multi_Transporter *multi_trp = get_node_multi_transporter(node_id);
2708     (void)multi_trp;
2709     DEBUG_FPRINTF((stderr, "node_id = %u, multi_trp not %s\n",
2710                    node_id, multi_trp == 0 ? "existing" : "active"));
2711   }
2712   recvdata.m_bad_data_transporters.clear(node_id);
2713   recvdata.m_last_trp_id = 0;
2714   if (ready_to_disconnect)
2715   {
2716     DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTED\n",
2717                    localNodeId, node_id));
2718     performStates[node_id] = DISCONNECTED;
2719     recvdata.reportDisconnect(node_id, errnum);
2720   }
2721   unlockMultiTransporters();
2722   DBUG_VOID_RETURN;
2723 }
2724 
2725 /**
2726  * We only call TransporterCallback::reportError() from
2727  * TransporterRegistry::update_connections().
2728  *
2729  * In other places we call this method to enqueue the error that will later be
2730  * picked up by update_connections().
2731  */
2732 void
report_error(NodeId nodeId,TransporterError errorCode,const char * errorInfo)2733 TransporterRegistry::report_error(NodeId nodeId, TransporterError errorCode,
2734                                   const char *errorInfo)
2735 {
2736   DEBUG_FPRINTF((stderr, "(%u)REG:report_error(%u, %d, %s\n",
2737                          localNodeId, nodeId, (int)errorCode, errorInfo));
2738   if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
2739       m_error_states[nodeId].m_info == (const char *)~(UintPtr)0)
2740   {
2741     m_error_states[nodeId].m_code = errorCode;
2742     m_error_states[nodeId].m_info = errorInfo;
2743   }
2744 }
2745 
2746 /**
2747  * Data node transporter model (ndbmtd)
2748  * ------------------------------------
2749  * The concurrency protection of transporters uses a number of mutexes
2750  * together with some things that can only happen in a single thread.
2751  * To explain this we will consider send and receive separately.
2752  * We also need to consider multi socket transporters distinctively since
2753  * they add another layer to the concurrency model.
2754  *
2755  * Each transporter is protected by two mutexes. One to protect send and
2756  * one to protect receives. Actually the send mutex is divided into two
2757  * mutexes as well.
2758  *
2759  * Starting with send when we send a signal we write it only in thread-local
2760  * buffers at first. Next at regular times we move them to buffers per
2761  * transporter. To move them from these thread-local buffers to the
2762  * transporter buffers requires acquiring the send buffer mutex. This
2763  * activity is normally called flush buffers.
2764  *
2765  * Next the transporter buffers are sent, this requires holding the
2766  * buffer mutex as well as the send mutex when moving the buffers to the
2767  * transporter. While sending only the send mutex is required. This mutex
2768  * is held also when performing the send call on the transporter.
2769  *
2770  * The selection of the transporter to send to next is handled by a
2771  * a global send thread mutex. This mutex ensures that we send 50% of the
2772  * time to our neighbour nodes (data nodes within the same nodegroup) and
2773  * 50% to the rest of the nodes in the cluster. Thus we give much higher
2774  * priority to messages performing write transactions in the cluster as
2775  * well as local reads within the same nodegroup.
2776  *
2777  * For receive we use a model where a transporter is only handled by one
2778  * receive thread. This thread has its own mutex to protect against
2779  * interaction with other threads for transporter connect and disconnect.
2780  * This mutex is held while calling performReceive. performReceive
2781  * receives data from those transporters that signalled that data was
2782  * available in pollReceive. The mutex isn't held while calling pollReceive.
2783  *
2784  * Connect of the transporter requires not so much protection. Nothing
2785  * will be received until we add the transporter to the epoll set (even
2786  * shared memory transporters use epoll sets (implemented using poll in
2787  * non-Linux platforms). The epoll_add call happens in the receive thread
2788  * owning the transporter, thus no call to pollReceive will happen on the
2789  * transporter while we are calling update_connections where the
2790  * report_connect is called from when a connect has happened.
2791  *
2792  * After adding the transporter to the epoll set we enable the send
2793  * buffers, this uses both the send mutex and the send buffer mutex
2794  * on the transporter.
2795  *
2796  * Finally a signal is sent to TRPMAN in the receive thread (actually
2797  * the same thread where the update_connections is sent from. This signal
2798  * is simply sent onwards to QMGR plus some logging of the connection
2799  * event.
2800  *
2801  * CONNECT_REP in QMGR updates the node information and initiates the
2802  * CM_REGREQ protocol for data nodes and API_REGREQ for API nodes to
2803  * include the node in an organised manner. At this point only QMGR
2804  * is allowed to receive signals from the node. QMGR will allow for
2805  * any block to communicate with the node when finishing the
2806  * registration.
2807  *
2808  * The actual connect logic happens in start_clients_thread that
2809  * executes in a single thread that handles all client connect setup.
2810  * This thread regularly wakes up and calls connect on transporters
2811  * not yet connected. It has backoff logic to ensure these connects
2812  * don't happen too often. It will only perform this connect client
2813  * attempts when the node is in the state CONNECTING. The QMGR code
2814  * will ensure that we set this state when we are ready to include
2815  * the node into the cluster again. After a node failure we have to
2816  * handle the node failure to completion before we allow the node
2817  * to reenter the cluster.
2818  *
2819  * Connecting a client requires both a connection to be setup, in
2820  * addition the client and the server must execute an authentication
2821  * protocol and a protocol where we signal what type of transporter
2822  * that is connecting.
2823  *
2824  * The server port to connect the client is either provided by the
2825  * configuration (recommended in any scenario where firewalls exists)
2826  * in the network. If not configured the server uses a dynamic port
2827  * and informs the management server about this port number.
2828  *
2829  * When the client has successfully connected and the authentication
2830  * protocol is successfully completed and the information about the
2831  * transporter type is sent, then we will set the transporter state
2832  * to connected. This will be picked up by update_connections that
2833  * will perform the logic above.
2834  *
2835  * When the connection has been successfully completed we set the
2836  * variable theSocket in the transporter. To ensure that we don't
2837  * call any socket actions on a socket we already closed we protect
2838  * this assignment with both the send mutex and the receive mutex.
2839  *
2840  * The actual network logic used by the start_clients_thread is
2841  * found in the SocketClient class.
2842  *
2843  * Similar logic exists also for transporters where the node is the
2844  * server part. As part of starting the data node we setup a socket
2845  * which we bind to the hostname and server port to use for this
2846  * data node. Next we will start to listen for connect attempts by
2847  * clients.
2848  *
2849  * When a client attempts to connect we will see this through the
2850  * accept call on the socket. The verification of this client
2851  * attempt is handled by the newSession call. For data nodes
2852  * this is the newSession here in TransporterRegistry.cpp.
2853  * This method will first handle the authentication protocol where
2854  * the client follows the NDB connect setup protocol to verify that
2855  * it is an NDB connection being setup. Next we call connect_server
2856  * in the TransporterRegistry.
2857  *
2858  * connect_server will receive information about node id, transporter type
2859  * that will enable it to verify that the connect is ok to perform at the
2860  * moment. It is very normal that there are many attempts to connect that
2861  * are unsuccessful since the API nodes are not allowed to connect until
2862  * the data node is started and they will regularly attempt to connect
2863  * while the data node is starting.
2864  *
2865  * When the connect has been successfully setup we will use the same logic
2866  * as for clients where we set theSocket and set the transporter state to
2867  * connected to ensure that update_connections will see the new
2868  * connection. So there is a lot of logic to setup the connections, but
2869  * really only the setting of theSocket requires mutex protection.
2870  *
2871  * Disconnects can be discovered by both the send logic as well as the
2872  * receive logic when calling socket send and socket recv. In both cases
2873  * the transporter will call do_disconnect. The only action here is to
2874  * set the state to DISCONNECTING (no action if already set or the state
2875  * is already set to DISCONNECTED). While calling this function we can
2876  * either hold the send mutex or the receive mutex. But none of these
2877  * are needed since we only set the DISCONNECTING state. This state
2878  * change will be picked up by start_clients_thread (note that it will
2879  * be picked by this method independent of if it connected as a client
2880  * or as a server. This method will call doDisconnect on the transporter.
2881  * This method will update the state and next it will call
2882  * disconnectImpl on the transporter that will close the socket and
2883  * ensure that the variable theSocket no longer points to a valid
2884  * socket. This is again protected by the send mutex and the receive
2885  * mutex to ensure that it doesn't happen while we are calling any
2886  * send or recv calls. Here we will also close the socket.
2887  *
2888  * Setting the state to not connected in doDisconnect will flag to
2889  * update_connections that it should call report_disconnect. This
2890  * call will disable the send buffer (holding the send mutex and
2891  * send buffer mutex), it will also clear bitmasks for the transporter
2892  * used by the receive thread. It requires no mutex to protect these
2893  * changes since they are performed in the receive thread that is
2894  * responsible for receiving on this transporter. The call can be
2895  * performed in all receive threads, but will be ignored by any
2896  * receive thread not responsible for it.
2897  *
2898  * Next it will call reportDisconnect that will send a signal
2899  * DISCONNECT_REP to our TRPMAN instance. This signal will be
2900  * sent to QMGR, the disconnect event will be logged and we will
2901  * send a signal to CMVMI to cancel subscriptions for the node.
2902  *
2903  * QMGR will update node state information and will initiate
2904  * node failure handling if not already started.
2905  *
2906  * QMGR will control when it is ok to communicate with the node
2907  * using CLOSE_COMREQ and OPEN_COMREQ calls to TRPMAN.
2908  *
2909  * Closing the communication to a node can also be initiated from
2910  * receiving a signal from another node that the node is dead.
2911  * In this case QMGR will send CLOSE_COMREQ to TRPMAN instances and
2912  * TRPMAN will set the IO state to HaltIO to ensure no more signals
2913  * are handled and it will call do_disconnect in TransporterRegistry
2914  * to ensure that the above close sequence is called although the
2915  * transporters are still functional.
2916  *
2917  * Introducing multi transporters
2918  * ------------------------------
2919  * To enable higher communication throughput between data nodes it is
2920  * possible to setup multiple transporters for communicating with another
2921  * data node. Currently this feature only allows for multiple transporters
2922  * when communicating within the same node group. There is no principal
2923  * problem with multiple transporters for other communication between
2924  * data nodes as well. For API nodes we already have the ability to
2925  * use multiple transporters from an application program by using
2926  * multiple node ids. Thus it is not necessary to add multiple
2927  * transporters to communicate with API nodes.
2928  *
2929  * The connect process cannot use multiple transporters. The reason is
2930  * that we cannot be certain that the connecting node supports multiple
2931  * transporters between two data nodes. We can only check this once the
2932  * connection is setup.
2933  *
2934  * This means that the change from a single socket to multiple sockets
2935  * is handled when the node is in a connected state and thus traffic
2936  * between the nodes is happening as we setup the multi socket setup.
2937  *
2938  * Given that multiple transporters can only be used between data nodes
2939  * we don't need to handle the Shared memory transporter as used for
2940  * multiple transporters since Shared memory transporter can only be
2941  * used to communicate between data nodes and API nodes.
2942  *
2943  * Thus multiple transporters is only handled by TCP transporters.
2944  *
2945  * Additionally to setup multiple transporters we need to know which
2946  * nodes are part of the same node group. This information is available
2947  * already in start phase 3 for initial starts and for cluster restarts.
2948  * For node restarts and initial node restarts we will wait until start
2949  * phase 4 to setup multiple transporters. In addition we have the case
2950  * where node groups are added while nodes are up and running, in this
2951  * case the nodes in the new node group will setup multiple transporters
2952  * when the new node group creation is committed.
2953  *
2954  * The setup logic for multiple transporters is described in detail in
2955  * QmgrMain.cpp. Here we will focus on how it changes the connect/disconnect
2956  * logic and what concurrency protection is required to handle the
2957  * multiple transporters.
2958  *
2959  * We introduce a new mutex that is locked by calling lockMultiTransporters
2960  * and unlocked through the call unlockMultiTransporters. This mutex should
2961  * always be taken before any other mutex is acquired to avoid deadlocks.
2962  *
2963  * During setup we communicate between the nodes in the same nodegroup to
2964  * decide on the number of multi sockets to use for one transporter. The
2965  * multiple transporters are handled also by a new Transporter class called
2966  * Multi_Transporter. When the change is performed to use multi transporters
2967  * this transporter is placed into the node transporter array. It is never
2968  * removed from this array even after a node failure.
2969  *
2970  * When we setup the multi transporters we assign them to different recv
2971  * threads. We also ensure that the port number is copied from the base
2972  * transporter to the new multi transporters to ensure that we use the
2973  * dynamic port number for a new node after a restart.
2974  *
2975  * The way to activate the connect of the multiple transporters happens by
2976  * inserting them into the allTransporters array. By inserting them into
2977  * this array they will be handled by the start_clients_thread.
2978  *
2979  * The protocol to setup a multi transporter is slightly different since we
2980  * need to know the node id, transporter type and additionally the instance
2981  * number of the transporter being setup in the connection. To accept a
2982  * connection we must be in the CONNECTED state for the node and the instance
2983  * number must exist in the multi transporter and must not be already
2984  * connected.
2985  *
2986  * Next step is to perform switch activity, when we agreed with the other
2987  * node on to perform this action we will send a crucial signal
2988  * ACTIVATE_TRP_REQ. This signal will be sent on the base transporter.
2989  * Immediately after sending this signal we will switch to using multi
2990  * transporters.
2991  *
2992  * Before sending this signal we ensure that the only thread active is the
2993  * main thread and we lock all send mutex for both the base transporter
2994  * and the new multi transporters. We perform this by a new set of
2995  * signals to freeze threads.
2996  *
2997  * When receiving this signal in the other node we need to use
2998  * SYNC_THREAD_VIA_REQ. This ensures that all signals sent using the
2999  * base transporter have been executed before we start executing the
3000  * signals arriving on the new multi transporters. This ensures that
3001  * we always keep signal order even in the context of changing
3002  * the number of transporters for a node.
3003  *
3004  * After this we will send the activation request for each new transporter
3005  * to each of the TRPMAN instances. When this request arrives the new
3006  * transporter will be added to the epoll set of the receive thread. So
3007  * from here on the signals sent after the ACTIVATE_TRP_REQ signal can
3008  * be processed in the receiving node. When all ACTIVATE_TRP_REQ have
3009  * been processed we can send ACTIVATE_TRP_CONF to the requesting node.
3010  * If we have received ACTIVATE_TRP_CONF from the other node and we
3011  * have received ACTIVATE_TRP_REQ from the other node, then we are ready
3012  * to close the socket of the base transporter. While doing this we call
3013  * lockMultiTransporters and we lock both the send mutex and the receive
3014  * thread mutex to ensure that we don't interact with any socket calls
3015  * when performing this action. At this point we also disable the send
3016  * buffer of the base transporter.
3017  *
3018  * At this point the switch to the multi transporter setup is completed.
3019  * If we are performing a restart during this setup and the other node
3020  * crashes we will also crash since we are in a restart. So only when
3021  * we are switching while we are up will this be an issue. When switching
3022  * and being up we must handle crashes in all phases of the setup.
3023  * We have added a test case that both tests crashes in all phases as
3024  * well as long sleeps in all phases.
3025  *
3026  * Closing the base transporter will cause do_disconnect to be called
3027  * on this transporter while the node is still up. Thus we have to
3028  * handle this call separately for non-active connections. This should
3029  * only happen in the receive thread, so we don't take the send
3030  * mutex before changing theSocket here. Since the transporter isn't
3031  * active it should not perform any send activity here.
3032  *
3033  *
3034  * Disconnects when using multi transporters still happens through the
3035  * do_disconnect call, either activated by an error on any of the multi
3036  * sockets or activated by blocks discovering a failed node. This means
3037  * that performStates for the node is set to DISCONNECTING. This will
3038  * trigger call to doDisconnect from start_clients_thread for each of
3039  * the multi transporters and thus every multi transporter will have
3040  * the disconnectImpl method called on the transporter.
3041  *
3042  * This in turn will lead to that update_connections will call
3043  * report_disconnect for each of the multi transporters.
3044  *
3045  * The report_disconnect will discover that a multi transporter is
3046  * involved. It will disable send buffers and receive thread bitmasks
3047  * will be cleared. It will remove the multi transporter from the
3048  * allTransporters array. This is synchronized by acquiring the
3049  * lockMultiTransporters before these actions.
3050  *
3051  * It is possible to arrive here after the switch to multi transporter
3052  * still having data in the send buffer (still waiting to send the
3053  * ACTIVATE_TRP_REQ signal). This case must be handled by calling
3054  * disable send buffer as well as calling doDisconnect on the
3055  * base transporter that we already switched away from, otherwise these
3056  * send buffers will remain when starting the node up again and cause
3057  * a crash.
3058  *
3059  * As part of report_disconnect we will check if all multi transporters
3060  * have been handled by report_disconnect, we check this by looking into
3061  * allTransporters array to see if we removed the transporter from this
3062  * array for all multi transporters. If all have been removed we switch
3063  * back to the base transporter as the active transporter.
3064  *
3065  * If the failure happens before we switch to the multi transporters we will
3066  * disable send buffer and call doDisconnect also on the multi transporters
3067  * just in case.
3068  *
3069  * Finally when all multi transporters have been handled and we have
3070  * switched back to the base transporter we will send the DISCONNECT_REP
3071  * signal through the reportDisconnect call in the receive thread.
3072  *
3073  * After this we are again ready to setup the multi transporter again
3074  * after node failure handling have completed.
3075  *
3076  * No special handling of epoll sets (and poll sets in non-Linux platforms)
3077  * is required since the sockets are removed from those sets when the
3078  * socket is closed.
3079  *
3080  * update_connections()
3081  * --------------------
3082  * update_connections(), together with the thread running in
3083  * start_clients_thread(), handle the state changes for transporters as they
3084  * connect and disconnect.
3085  *
3086  * update_connections on a specific set of recvdata *must not* be run
3087  * concurrently with :performReceive() on the same recvdata. Thus,
3088  * it must either be called from the same (receive-)thread as
3089  * performReceive(), or protected by aquiring the (client) poll rights.
3090  */
3091 Uint32
update_connections(TransporterReceiveHandle & recvdata,Uint32 max_spintime)3092 TransporterRegistry::update_connections(TransporterReceiveHandle& recvdata,
3093                                         Uint32 max_spintime)
3094 {
3095   Uint32 spintime = 0;
3096   TransporterReceiveWatchdog guard(recvdata);
3097   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
3098 
3099   for (Uint32 i = 1; i < (nTransporters + 1); i++)
3100   {
3101     require(i < MAX_NTRANSPORTERS);
3102     Transporter * t = allTransporters[i];
3103     if (!t)
3104       continue;
3105 
3106     const NodeId nodeId = t->getRemoteNodeId();
3107     if (!recvdata.m_transporters.get(i))
3108       continue;
3109 
3110     TransporterError code = m_error_states[nodeId].m_code;
3111     const char *info = m_error_states[nodeId].m_info;
3112     if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
3113     {
3114       if (performStates[nodeId] == CONNECTING)
3115       {
3116         fprintf(stderr,
3117                 "update_connections while CONNECTING, nodeId:%d, error:%d\n",
3118                 nodeId,
3119                 code);
3120         /* Failed during CONNECTING -> we are still DISCONNECTED */
3121         assert(!t->isConnected());
3122         assert(false);
3123         performStates[nodeId] = DISCONNECTED;
3124       }
3125 
3126       recvdata.reportError(nodeId, code, info);
3127       m_error_states[nodeId].m_code = TE_NO_ERROR;
3128       m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
3129     }
3130 
3131     switch(performStates[nodeId]){
3132     case CONNECTED:
3133 #ifndef WIN32
3134       if (t->getTransporterType() == tt_SHM_TRANSPORTER)
3135       {
3136         SHM_Transporter *shm_trp = (SHM_Transporter*)t;
3137         spintime = MAX(spintime, shm_trp->get_spintime());
3138       }
3139 #endif
3140       break;
3141     case DISCONNECTED:
3142       break;
3143     case CONNECTING:
3144       if(t->isConnected())
3145 	report_connect(recvdata, nodeId);
3146       break;
3147     case DISCONNECTING:
3148       if(!t->isConnected())
3149 	report_disconnect(recvdata, nodeId, m_disconnect_errnum[nodeId]);
3150       break;
3151     }
3152   }
3153   recvdata.nTCPTransporters = nTCPTransporters;
3154   recvdata.nSHMTransporters = nSHMTransporters;
3155   recvdata.m_spintime = MIN(spintime, max_spintime);
3156   return spintime; //Inform caller of spintime calculated on this level
3157 }
3158 
3159 /**
3160  * Run as own thread
3161  * Possible blocking parts of transporter connect and diconnect
3162  * is supposed to be handled here.
3163  */
3164 void
start_clients_thread()3165 TransporterRegistry::start_clients_thread()
3166 {
3167   int persist_mgm_count= 0;
3168   DBUG_ENTER("TransporterRegistry::start_clients_thread");
3169   while (m_run_start_clients_thread) {
3170     NdbSleep_MilliSleep(100);
3171     persist_mgm_count++;
3172     if(persist_mgm_count==50)
3173     {
3174       ndb_mgm_check_connection(m_mgm_handle);
3175       persist_mgm_count= 0;
3176     }
3177     lockMultiTransporters();
3178     for (Uint32 i = 1;
3179          i < (nTransporters +1) && m_run_start_clients_thread;
3180          i++)
3181     {
3182       require(i < MAX_NTRANSPORTERS);
3183       Transporter * t = allTransporters[i];
3184       if (!t)
3185 	continue;
3186 
3187       const NodeId nodeId = t->getRemoteNodeId();
3188       switch(performStates[nodeId]){
3189       case CONNECTING:
3190       {
3191         if (t->isPartOfMultiTransporter())
3192         {
3193           break;
3194         }
3195         if (!t->isConnected() && !t->isServer)
3196         {
3197           if (get_and_clear_node_up_indicator(nodeId))
3198           {
3199             // Other node have indicated that node nodeId is up, try connect
3200             // now and restart backoff sequence
3201             backoff_reset_connecting_time(nodeId);
3202           }
3203           if (!backoff_update_and_check_time_for_connect(nodeId))
3204           {
3205             // Skip connect this time
3206             continue;
3207           }
3208 
3209 	  bool connected= false;
3210 	  /**
3211 	   * First, we try to connect (if we have a port number).
3212 	   */
3213 
3214 	  if (t->get_s_port())
3215           {
3216             DBUG_PRINT("info", ("connecting to node %d using port %d",
3217                                 nodeId, t->get_s_port()));
3218             unlockMultiTransporters();
3219             connected= t->connect_client();
3220             lockMultiTransporters();
3221           }
3222 
3223 	  /**
3224 	   * If dynamic, get the port for connecting from the management server
3225 	   */
3226 	  if (!connected && t->get_s_port() <= 0) // Port is dynamic
3227           {
3228 	    int server_port= 0;
3229 	    struct ndb_mgm_reply mgm_reply;
3230             unlockMultiTransporters();
3231 
3232             DBUG_PRINT("info", ("connection to node %d should use "
3233                                 "dynamic port",
3234                                 nodeId));
3235 
3236 	    if(!ndb_mgm_is_connected(m_mgm_handle))
3237 	      ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
3238 
3239 	    if(ndb_mgm_is_connected(m_mgm_handle))
3240 	    {
3241               DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
3242                                   nodeId));
3243 
3244               const int res=
3245 		ndb_mgm_get_connection_int_parameter(m_mgm_handle,
3246 						     t->getRemoteNodeId(),
3247 						     t->getLocalNodeId(),
3248 						     CFG_CONNECTION_SERVER_PORT,
3249 						     &server_port,
3250 						     &mgm_reply);
3251 
3252 	      DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
3253 				 server_port,t->getRemoteNodeId(),
3254 				 t->getLocalNodeId(),res));
3255 	      if( res >= 0 )
3256 	      {
3257                 DBUG_PRINT("info", ("got port %d to use for connection to %d",
3258                                     server_port, nodeId));
3259 
3260 		if (server_port != 0)
3261                 {
3262                   if (t->get_s_port() != server_port)
3263                   {
3264                     // Got a different port number, reset backoff
3265                     backoff_reset_connecting_time(nodeId);
3266                   }
3267                   // Save the new port number
3268 		  t->set_s_port(server_port);
3269                 }
3270                 else
3271                 {
3272                   // Got port number 0, port is not known.  Keep the old.
3273                 }
3274 	      }
3275 	      else if(ndb_mgm_is_connected(m_mgm_handle))
3276 	      {
3277                 DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
3278                                     res));
3279                 g_eventLogger->info("Failed to get dynamic port, res: %d",
3280                                     res);
3281 		ndb_mgm_disconnect(m_mgm_handle);
3282 	      }
3283 	      else
3284 	      {
3285                 DBUG_PRINT("info", ("mgmd close connection early"));
3286                 g_eventLogger->info
3287                   ("Management server closed connection early. "
3288                    "It is probably being shut down (or has problems). "
3289                    "We will retry the connection. %d %s %s line: %d",
3290                    ndb_mgm_get_latest_error(m_mgm_handle),
3291                    ndb_mgm_get_latest_error_desc(m_mgm_handle),
3292                    ndb_mgm_get_latest_error_msg(m_mgm_handle),
3293                    ndb_mgm_get_latest_error_line(m_mgm_handle)
3294                    );
3295 	      }
3296 	    }
3297 	    /** else
3298 	     * We will not be able to get a new port unless
3299 	     * the m_mgm_handle is connected. Note that not
3300 	     * being connected is an ok state, just continue
3301 	     * until it is able to connect. Continue using the
3302 	     * old port until we can connect again and get a
3303 	     * new port.
3304 	     */
3305             lockMultiTransporters();
3306 	  }
3307 	}
3308 	break;
3309       }
3310       case DISCONNECTING:
3311 	if(t->isConnected())
3312         {
3313           DEBUG_FPRINTF((stderr, "(%u)doDisconnect(%u), line: %u\n",
3314                          localNodeId, t->getRemoteNodeId(), __LINE__));
3315 	  t->doDisconnect();
3316         }
3317 	break;
3318       case DISCONNECTED:
3319       {
3320         if (t->isConnected())
3321         {
3322           g_eventLogger->warning("Found connection to %u in state DISCONNECTED "
3323                                  " while being connected, disconnecting!",
3324                                  t->getRemoteNodeId());
3325           DEBUG_FPRINTF((stderr, "(%u)doDisconnect(%u), line: %u\n",
3326                          localNodeId, t->getRemoteNodeId(), __LINE__));
3327           t->doDisconnect();
3328         }
3329         break;
3330       }
3331       case CONNECTED:
3332       {
3333         if (t->isPartOfMultiTransporter() &&
3334             !t->isConnected() &&
3335             !t->isServer)
3336         {
3337           require(t->get_s_port());
3338           DBUG_PRINT("info", ("connecting multi-transporter to node %d"
3339                      " using port %d",
3340                      nodeId,
3341                      t->get_s_port()));
3342           unlockMultiTransporters();
3343           t->connect_client();
3344           DEBUG_FPRINTF((stderr, "Connect client of trp id %u, res: %u\n",
3345                         t->getTransporterIndex(),
3346                         t->isConnected()));
3347           lockMultiTransporters();
3348         }
3349       }
3350       default:
3351 	break;
3352       }
3353     }
3354     unlockMultiTransporters();
3355   }
3356   DBUG_VOID_RETURN;
3357 }
3358 
3359 struct NdbThread*
start_clients()3360 TransporterRegistry::start_clients()
3361 {
3362   m_run_start_clients_thread= true;
3363   m_start_clients_thread= NdbThread_Create(run_start_clients_C,
3364 					   (void**)this,
3365                                            0, // default stack size
3366 					   "ndb_start_clients",
3367 					   NDB_THREAD_PRIO_LOW);
3368   if (m_start_clients_thread == 0)
3369   {
3370     m_run_start_clients_thread= false;
3371   }
3372   return m_start_clients_thread;
3373 }
3374 
3375 bool
stop_clients()3376 TransporterRegistry::stop_clients()
3377 {
3378   if (m_start_clients_thread) {
3379     m_run_start_clients_thread= false;
3380     void* status;
3381     NdbThread_WaitFor(m_start_clients_thread, &status);
3382     NdbThread_Destroy(&m_start_clients_thread);
3383   }
3384   return true;
3385 }
3386 
3387 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)3388 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
3389 					       const char *interf,
3390 					       int s_port)
3391 {
3392   DBUG_ENTER("TransporterRegistry::add_transporter_interface");
3393   DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
3394   if (interf && strlen(interf) == 0)
3395     interf= 0;
3396 
3397   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
3398   {
3399     Transporter_interface &tmp= m_transporter_interface[i];
3400     if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
3401       continue;
3402     if (interf != 0 && tmp.m_interface != 0 &&
3403 	strcmp(interf, tmp.m_interface) == 0)
3404     {
3405       DBUG_VOID_RETURN; // found match, no need to insert
3406     }
3407     if (interf == 0 && tmp.m_interface == 0)
3408     {
3409       DBUG_VOID_RETURN; // found match, no need to insert
3410     }
3411   }
3412   Transporter_interface t;
3413   t.m_remote_nodeId= remoteNodeId;
3414   t.m_s_service_port= s_port;
3415   t.m_interface= interf;
3416   m_transporter_interface.push_back(t);
3417   DBUG_PRINT("exit",("interface and port added"));
3418   DBUG_VOID_RETURN;
3419 }
3420 
3421 bool
start_service(SocketServer & socket_server)3422 TransporterRegistry::start_service(SocketServer& socket_server)
3423 {
3424   DBUG_ENTER("TransporterRegistry::start_service");
3425   if (m_transporter_interface.size() > 0 &&
3426       localNodeId == 0)
3427   {
3428     g_eventLogger->error("INTERNAL ERROR: not initialized");
3429     DBUG_RETURN(false);
3430   }
3431 
3432   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
3433   {
3434     Transporter_interface &t= m_transporter_interface[i];
3435 
3436     unsigned short port= (unsigned short)t.m_s_service_port;
3437     if(t.m_s_service_port<0)
3438       port= -t.m_s_service_port; // is a dynamic port
3439     TransporterService *transporter_service =
3440       new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
3441     if(!socket_server.setup(transporter_service,
3442 			    &port, t.m_interface))
3443     {
3444       DBUG_PRINT("info", ("Trying new port"));
3445       port= 0;
3446       if(t.m_s_service_port>0
3447 	 || !socket_server.setup(transporter_service,
3448 				 &port, t.m_interface))
3449       {
3450 	/*
3451 	 * If it wasn't a dynamically allocated port, or
3452 	 * our attempts at getting a new dynamic port failed
3453 	 */
3454         g_eventLogger->error("Unable to setup transporter service port: %s:%d!\n"
3455                              "Please check if the port is already used,\n"
3456                              "(perhaps the node is already running)",
3457                              t.m_interface ? t.m_interface : "*", t.m_s_service_port);
3458 	delete transporter_service;
3459 	DBUG_RETURN(false);
3460       }
3461     }
3462     t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
3463     DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
3464     transporter_service->setTransporterRegistry(this);
3465   }
3466   DBUG_RETURN(true);
3467 }
3468 
3469 void
startReceiving()3470 TransporterRegistry::startReceiving()
3471 {
3472   DBUG_ENTER("TransporterRegistry::startReceiving");
3473 
3474 #ifndef WIN32
3475   m_shm_own_pid = getpid();
3476 #endif
3477   DBUG_VOID_RETURN;
3478 }
3479 
3480 void
stopReceiving()3481 TransporterRegistry::stopReceiving(){
3482 }
3483 
3484 void
startSending()3485 TransporterRegistry::startSending(){
3486 }
3487 
3488 void
stopSending()3489 TransporterRegistry::stopSending(){
3490 }
3491 
operator <<(NdbOut & out,SignalHeader & sh)3492 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
3493   out << "-- Signal Header --" << endl;
3494   out << "theLength:    " << sh.theLength << endl;
3495   out << "gsn:          " << sh.theVerId_signalNumber << endl;
3496   out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
3497   out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
3498   out << "sendersSig:   " << sh.theSendersSignalId << endl;
3499   out << "theSignalId:  " << sh.theSignalId << endl;
3500   out << "trace:        " << (int)sh.theTrace << endl;
3501   return out;
3502 }
3503 
3504 int
get_transporter_count() const3505 TransporterRegistry::get_transporter_count() const
3506 {
3507   assert(nTransporters > 0);
3508   return nTransporters;
3509 }
3510 
3511 bool
is_shm_transporter(TrpId trp_id)3512 TransporterRegistry::is_shm_transporter(TrpId trp_id)
3513 {
3514   assert(trp_id < maxTransporters);
3515   Transporter *trp = allTransporters[trp_id];
3516   if (trp->getTransporterType() == tt_SHM_TRANSPORTER)
3517     return true;
3518   else
3519     return false;
3520 }
3521 
3522 Transporter*
get_transporter(TrpId trp_id) const3523 TransporterRegistry::get_transporter(TrpId trp_id) const
3524 {
3525   assert(trp_id < MAX_NTRANSPORTERS);
3526   return allTransporters[trp_id];
3527 }
3528 
3529 Transporter*
get_node_transporter(NodeId nodeId) const3530 TransporterRegistry::get_node_transporter(NodeId nodeId) const
3531 {
3532   assert(nodeId <= MAX_NODES);
3533   return theNodeIdTransporters[nodeId];
3534 }
3535 
connect_client(NdbMgmHandle * h)3536 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
3537 {
3538   DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
3539 
3540   Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
3541 
3542   if(!mgm_nodeid)
3543   {
3544     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
3545     return false;
3546   }
3547   Transporter * t = theNodeIdTransporters[mgm_nodeid];
3548   if (!t)
3549   {
3550     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
3551     return false;
3552   }
3553 
3554   if (t->isMultiTransporter())
3555   {
3556     Multi_Transporter *multi_trp = (Multi_Transporter*)t;
3557     require(multi_trp->get_num_active_transporters() == 1);
3558     t = multi_trp->get_active_transporter(0);
3559   }
3560   require(!t->isMultiTransporter());
3561   require(!t->isPartOfMultiTransporter());
3562   bool res = t->connect_client(connect_ndb_mgmd(h));
3563   if (res == true)
3564   {
3565     DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTING,"
3566                    " connect_client\n", localNodeId, mgm_nodeid));
3567     performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
3568   }
3569   DBUG_RETURN(res);
3570 }
3571 
3572 
report_dynamic_ports(NdbMgmHandle h) const3573 bool TransporterRegistry::report_dynamic_ports(NdbMgmHandle h) const
3574 {
3575   // Fill array of nodeid/port pairs for those ports which are dynamic
3576   unsigned num_ports = 0;
3577   ndb_mgm_dynamic_port ports[MAX_NODES];
3578   for(unsigned i = 0; i < m_transporter_interface.size(); i++)
3579   {
3580     const Transporter_interface& ti = m_transporter_interface[i];
3581     if (ti.m_s_service_port >= 0)
3582       continue; // Not a dynamic port
3583 
3584     assert(num_ports < NDB_ARRAY_SIZE(ports));
3585     ports[num_ports].nodeid = ti.m_remote_nodeId;
3586     ports[num_ports].port = ti.m_s_service_port;
3587     num_ports++;
3588   }
3589 
3590   if (num_ports == 0)
3591   {
3592     // No dynamic ports in use, nothing to report
3593     return true;
3594   }
3595 
3596   // Send array of nodeid/port pairs to mgmd
3597   if (ndb_mgm_set_dynamic_ports(h, localNodeId,
3598                                 ports, num_ports) < 0)
3599   {
3600     g_eventLogger->error("Failed to register dynamic ports, error: %d  - '%s'",
3601                          ndb_mgm_get_latest_error(h),
3602                          ndb_mgm_get_latest_error_desc(h));
3603     return false;
3604   }
3605 
3606   return true;
3607 }
3608 
3609 
3610 /**
3611  * Given a connected NdbMgmHandle, turns it into a transporter
3612  * and returns the socket.
3613  */
connect_ndb_mgmd(NdbMgmHandle * h)3614 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
3615 {
3616   NDB_SOCKET_TYPE sockfd;
3617   ndb_socket_invalidate(&sockfd);
3618 
3619   DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
3620 
3621   if ( h==NULL || *h == NULL )
3622   {
3623     g_eventLogger->error("Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
3624     DBUG_RETURN(sockfd);
3625   }
3626 
3627   if (!report_dynamic_ports(*h))
3628   {
3629     ndb_mgm_destroy_handle(h);
3630     DBUG_RETURN(sockfd);
3631   }
3632 
3633   /**
3634    * convert_to_transporter also disposes of the handle (i.e. we don't leak
3635    * memory here.
3636    */
3637   DBUG_PRINT("info", ("Converting handle to transporter"));
3638   sockfd= ndb_mgm_convert_to_transporter(h);
3639   if (!ndb_socket_valid(sockfd))
3640   {
3641     g_eventLogger->error("Failed to convert to transporter (%s: %d)",
3642                          __FILE__, __LINE__);
3643     ndb_mgm_destroy_handle(h);
3644   }
3645   DBUG_RETURN(sockfd);
3646 }
3647 
3648 /**
3649  * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
3650  * and returns the socket.
3651  */
3652 NDB_SOCKET_TYPE
connect_ndb_mgmd(const char * server_name,unsigned short server_port)3653 TransporterRegistry::connect_ndb_mgmd(const char* server_name,
3654                                       unsigned short server_port)
3655 {
3656   NdbMgmHandle h= ndb_mgm_create_handle();
3657   NDB_SOCKET_TYPE s;
3658   ndb_socket_invalidate(&s);
3659 
3660   DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
3661 
3662   if ( h == NULL )
3663   {
3664     DBUG_RETURN(s);
3665   }
3666 
3667   /**
3668    * Set connectstring
3669    */
3670   {
3671     BaseString cs;
3672     cs.assfmt("%s:%u", server_name, server_port);
3673     ndb_mgm_set_connectstring(h, cs.c_str());
3674   }
3675 
3676   if(ndb_mgm_connect(h, 0, 0, 0)<0)
3677   {
3678     DBUG_PRINT("info", ("connection to mgmd failed"));
3679     ndb_mgm_destroy_handle(&h);
3680     DBUG_RETURN(s);
3681   }
3682 
3683   DBUG_RETURN(connect_ndb_mgmd(&h));
3684 }
3685 
3686 /**
3687  * The calls below are used by all implementations: NDB API, ndbd and
3688  * ndbmtd. The calls to handle->getWritePtr, handle->updateWritePtr
3689  * are handled by special implementations for NDB API, ndbd and
3690  * ndbmtd.
3691  */
3692 
3693 Uint32 *
getWritePtr(TransporterSendBufferHandle * handle,Transporter * t,Uint32 trp_id,Uint32 lenBytes,Uint32 prio,SendStatus * error)3694 TransporterRegistry::getWritePtr(TransporterSendBufferHandle *handle,
3695                                  Transporter* t,
3696                                  Uint32 trp_id,
3697                                  Uint32 lenBytes,
3698                                  Uint32 prio,
3699                                  SendStatus *error)
3700 {
3701   NodeId nodeId = t->getRemoteNodeId();
3702   Uint32 *insertPtr = handle->getWritePtr(nodeId,
3703                                           trp_id,
3704                                           lenBytes,
3705                                           prio,
3706                                           t->get_max_send_buffer(),
3707                                           error);
3708 
3709   if (unlikely(insertPtr == nullptr && *error != SEND_MESSAGE_TOO_BIG))
3710   {
3711     //-------------------------------------------------
3712     // Buffer was completely full. We have severe problems.
3713     // We will attempt to wait for a small time
3714     //-------------------------------------------------
3715     if (t->send_is_possible(10))
3716     {
3717       //-------------------------------------------------
3718       // Send is possible after the small timeout.
3719       //-------------------------------------------------
3720       if (!handle->forceSend(nodeId, trp_id))
3721       {
3722 	return 0;
3723       }
3724       else
3725       {
3726 	//-------------------------------------------------
3727 	// Since send was successful we will make a renewed
3728 	// attempt at inserting the signal into the buffer.
3729 	//-------------------------------------------------
3730         insertPtr = handle->getWritePtr(nodeId,
3731                                         trp_id,
3732                                         lenBytes,
3733                                         prio,
3734                                         t->get_max_send_buffer(),
3735                                         error);
3736       }//if
3737     }
3738     else
3739     {
3740       return 0;
3741     }//if
3742   }
3743   return insertPtr;
3744 }
3745 
3746 void
updateWritePtr(TransporterSendBufferHandle * handle,Transporter * t,Uint32 trp_id,Uint32 lenBytes,Uint32 prio)3747 TransporterRegistry::updateWritePtr(TransporterSendBufferHandle *handle,
3748                                     Transporter* t,
3749                                     Uint32 trp_id,
3750                                     Uint32 lenBytes,
3751                                     Uint32 prio)
3752 {
3753   NodeId nodeId = t->getRemoteNodeId();
3754   Uint32 used = handle->updateWritePtr(nodeId, trp_id, lenBytes, prio);
3755   t->update_status_overloaded(used);
3756 
3757   if (t->send_limit_reached(used))
3758   {
3759     //-------------------------------------------------
3760     // Buffer is full and we are ready to send. We will
3761     // not wait since the signal is already in the buffer.
3762     // Force flag set has the same indication that we
3763     // should always send. If it is not possible to send
3764     // we will not worry since we will soon be back for
3765     // a renewed trial.
3766     //-------------------------------------------------
3767     if (t->send_is_possible(0))
3768     {
3769       //-------------------------------------------------
3770       // Send was possible, attempt at a send.
3771       //-------------------------------------------------
3772       handle->forceSend(nodeId, trp_id);
3773     }//if
3774   }
3775 }
3776 
3777 void
inc_overload_count(Uint32 nodeId)3778 TransporterRegistry::inc_overload_count(Uint32 nodeId)
3779 {
3780   assert(nodeId < MAX_NODES);
3781   assert(theNodeIdTransporters[nodeId] != NULL);
3782   theNodeIdTransporters[nodeId]->inc_overload_count();
3783 }
3784 
3785 void
inc_slowdown_count(Uint32 nodeId)3786 TransporterRegistry::inc_slowdown_count(Uint32 nodeId)
3787 {
3788   assert(nodeId < MAX_NODES);
3789   assert(theNodeIdTransporters[nodeId] != NULL);
3790   theNodeIdTransporters[nodeId]->inc_slowdown_count();
3791 }
3792 
3793 Uint32
get_overload_count(Uint32 nodeId)3794 TransporterRegistry::get_overload_count(Uint32 nodeId)
3795 {
3796   assert(nodeId < MAX_NODES);
3797   assert(theNodeIdTransporters[nodeId] != NULL);
3798   return theNodeIdTransporters[nodeId]->get_overload_count();
3799 }
3800 
3801 Uint32
get_slowdown_count(Uint32 nodeId)3802 TransporterRegistry::get_slowdown_count(Uint32 nodeId)
3803 {
3804   assert(nodeId < MAX_NODES);
3805   assert(theNodeIdTransporters[nodeId] != NULL);
3806   return theNodeIdTransporters[nodeId]->get_slowdown_count();
3807 }
3808 
3809 Uint32
get_connect_count(Uint32 nodeId)3810 TransporterRegistry::get_connect_count(Uint32 nodeId)
3811 {
3812   assert(nodeId < MAX_NODES);
3813   assert(theNodeIdTransporters[nodeId] != NULL);
3814   return theNodeIdTransporters[nodeId]->get_connect_count();
3815 }
3816 
3817 void
get_trps_for_node(Uint32 nodeId,TrpId * trp_ids,Uint32 & num_ids,Uint32 max_size)3818 TransporterRegistry::get_trps_for_node(Uint32 nodeId,
3819                                        TrpId *trp_ids,
3820                                        Uint32 &num_ids,
3821                                        Uint32 max_size)
3822 {
3823   Transporter *t = theNodeIdTransporters[nodeId];
3824   if (!t)
3825   {
3826     num_ids = 0;
3827   }
3828   else if (t->isMultiTransporter())
3829   {
3830     Multi_Transporter *multi_trp = (Multi_Transporter*)t;
3831     num_ids = multi_trp->get_num_active_transporters();
3832     num_ids = MIN(num_ids, max_size);
3833     for (Uint32 i = 0; i < num_ids; i++)
3834     {
3835       Transporter* tmp_trp = multi_trp->get_active_transporter(i);
3836       trp_ids[i] = tmp_trp->getTransporterIndex();
3837       require(trp_ids[i] != 0);
3838     }
3839   }
3840   else
3841   {
3842     num_ids = 1;
3843     trp_ids[0] = t->getTransporterIndex();
3844     require(trp_ids[0] != 0);
3845   }
3846   require(max_size >= 1);
3847 }
3848 
3849 TrpId
getTransporterIndex(Transporter * t)3850 TransporterRegistry::getTransporterIndex(Transporter* t)
3851 {
3852   return t->getTransporterIndex();
3853 }
3854 
3855 bool
isMultiTransporter(Transporter * t)3856 TransporterRegistry::isMultiTransporter(Transporter* t)
3857 {
3858   return t->isMultiTransporter();
3859 }
3860 
3861 void
switch_active_trp(Multi_Transporter * t)3862 TransporterRegistry::switch_active_trp(Multi_Transporter *t)
3863 {
3864   require(t->isMultiTransporter());
3865   t->switch_active_trp();
3866 }
3867 
3868 Uint32
get_num_active_transporters(Multi_Transporter * t)3869 TransporterRegistry::get_num_active_transporters(Multi_Transporter *t)
3870 {
3871   require(t->isMultiTransporter());
3872   {
3873     return t->get_num_active_transporters();
3874   }
3875 }
3876 
3877 /**
3878  * We calculate the risk level for a send buffer.
3879  * The primary instrument is the current size of
3880  * the node send buffer. However if the total
3881  * buffer for all send buffers is also close to
3882  * empty, then we will adjust the node send
3883  * buffer size for this. In this manner a very
3884  * contested total buffer will also slow down
3885  * the entire node operation.
3886  */
3887 void
calculate_send_buffer_level(Uint64 node_send_buffer_size,Uint64 total_send_buffer_size,Uint64 total_used_send_buffer_size,Uint32 num_threads,SB_LevelType & level)3888 calculate_send_buffer_level(Uint64 node_send_buffer_size,
3889                             Uint64 total_send_buffer_size,
3890                             Uint64 total_used_send_buffer_size,
3891                             Uint32 num_threads,
3892                             SB_LevelType &level)
3893 {
3894   Uint64 percentage =
3895     (total_used_send_buffer_size * 100) / total_send_buffer_size;
3896 
3897   if (percentage < 90)
3898   {
3899     ;
3900   }
3901   else if (percentage < 95)
3902   {
3903     node_send_buffer_size *= 2;
3904   }
3905   else if (percentage < 97)
3906   {
3907     node_send_buffer_size *= 4;
3908   }
3909   else if (percentage < 98)
3910   {
3911     node_send_buffer_size *= 8;
3912   }
3913   else if (percentage < 99)
3914   {
3915     node_send_buffer_size *= 16;
3916   }
3917   else
3918   {
3919     level = SB_CRITICAL_LEVEL;
3920     return;
3921   }
3922 
3923   if (node_send_buffer_size < 128 * 1024)
3924   {
3925     level = SB_NO_RISK_LEVEL;
3926     return;
3927   }
3928   else if (node_send_buffer_size < 256 * 1024)
3929   {
3930     level = SB_LOW_LEVEL;
3931     return;
3932   }
3933   else if (node_send_buffer_size < 384 * 1024)
3934   {
3935     level = SB_MEDIUM_LEVEL;
3936     return;
3937   }
3938   else if (node_send_buffer_size < 1024 * 1024)
3939   {
3940     level = SB_HIGH_LEVEL;
3941     return;
3942   }
3943   else if (node_send_buffer_size < 2 * 1024 * 1024)
3944   {
3945     level = SB_RISK_LEVEL;
3946     return;
3947   }
3948   else
3949   {
3950     level = SB_CRITICAL_LEVEL;
3951     return;
3952   }
3953 }
3954 
3955 template class Vector<TransporterRegistry::Transporter_interface>;
3956