1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include <TransporterRegistry.hpp>
28 #include "TransporterInternalDefinitions.hpp"
29 
30 #include "Transporter.hpp"
31 #include <SocketAuthenticator.hpp>
32 
33 #ifdef NDB_TCP_TRANSPORTER
34 #include "TCP_Transporter.hpp"
35 #include "Loopback_Transporter.hpp"
36 #endif
37 
38 #ifdef NDB_SCI_TRANSPORTER
39 #include "SCI_Transporter.hpp"
40 #endif
41 
42 #ifdef NDB_SHM_TRANSPORTER
43 #include "SHM_Transporter.hpp"
44 extern int g_ndb_shm_signum;
45 #endif
46 
47 #include "NdbOut.hpp"
48 #include <NdbSleep.h>
49 #include <InputStream.hpp>
50 #include <OutputStream.hpp>
51 #include <socket_io.h>
52 
53 #include <mgmapi/mgmapi.h>
54 #include <mgmapi_internal.h>
55 #include <mgmapi/mgmapi_debug.h>
56 
57 #include <EventLogger.hpp>
58 extern EventLogger * g_eventLogger;
59 
60 /**
61  * There is a requirement in the Transporter design that
62  * ::performReceive() and ::update_connections()
63  * on the same 'TransporterReceiveHandle' should not be
64  * run concurrently. class TransporterReceiveWatchdog provides a
65  * simple mechanism to assert that this rule is followed.
66  * Does nothing if NDEBUG is defined (in production code)
67  */
68 class TransporterReceiveWatchdog
69 {
70 public:
71 #if NDEBUG
TransporterReceiveWatchdog(TransporterReceiveHandle & recvdata)72   TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
73   {}
74 
75 #else
76   TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
77     : m_recvdata(recvdata)
78   {
79     assert(m_recvdata.m_active == false);
80     m_recvdata.m_active = true;
81   }
82 
83   ~TransporterReceiveWatchdog()
84   {
85     assert(m_recvdata.m_active == true);
86     m_recvdata.m_active = false;
87   }
88 
89 private:
90   TransporterReceiveHandle& m_recvdata;
91 #endif
92 };
93 
94 
95 struct in_addr
get_connect_address(NodeId node_id) const96 TransporterRegistry::get_connect_address(NodeId node_id) const
97 {
98   return theTransporters[node_id]->m_connect_address;
99 }
100 
101 Uint64
get_bytes_sent(NodeId node_id) const102 TransporterRegistry::get_bytes_sent(NodeId node_id) const
103 {
104   return theTransporters[node_id]->m_bytes_sent;
105 }
106 
107 Uint64
get_bytes_received(NodeId node_id) const108 TransporterRegistry::get_bytes_received(NodeId node_id) const
109 {
110   return theTransporters[node_id]->m_bytes_received;
111 }
112 
newSession(NDB_SOCKET_TYPE sockfd)113 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
114 {
115   DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
116   if (m_auth && !m_auth->server_authenticate(sockfd))
117   {
118     ndb_socket_close(sockfd, true); // Close with reset
119     DBUG_RETURN(0);
120   }
121 
122   BaseString msg;
123   bool close_with_reset = true;
124   if (!m_transporter_registry->connect_server(sockfd, msg, close_with_reset))
125   {
126     ndb_socket_close(sockfd, close_with_reset);
127     DBUG_RETURN(0);
128   }
129 
130   DBUG_RETURN(0);
131 }
132 
TransporterReceiveData()133 TransporterReceiveData::TransporterReceiveData()
134 {
135   /**
136    * With multi receiver threads
137    *   an interface to reassign these is needed...
138    */
139   m_transporters.set();            // Handle all
140   m_transporters.clear(Uint32(0)); // Except wakeup socket...
141   m_handled_transporters.clear();
142 
143 #if defined(HAVE_EPOLL_CREATE)
144   m_epoll_fd = -1;
145   m_epoll_events = 0;
146 #endif
147 }
148 
149 bool
init(unsigned maxTransporters)150 TransporterReceiveData::init(unsigned maxTransporters)
151 {
152   maxTransporters += 1; /* wakeup socket */
153 #if defined(HAVE_EPOLL_CREATE)
154   m_epoll_fd = epoll_create(maxTransporters);
155   if (m_epoll_fd == -1)
156   {
157     perror("epoll_create failed... falling back to select!");
158     goto fallback;
159   }
160   m_epoll_events = new struct epoll_event[maxTransporters];
161   if (m_epoll_events == 0)
162   {
163     perror("Failed to alloc epoll-array... falling back to select!");
164     close(m_epoll_fd);
165     m_epoll_fd = -1;
166     goto fallback;
167   }
168   bzero(m_epoll_events, maxTransporters * sizeof(struct epoll_event));
169   return true;
170 fallback:
171 #endif
172   return m_socket_poller.set_max_count(maxTransporters);
173 }
174 
175 bool
epoll_add(TCP_Transporter * t)176 TransporterReceiveData::epoll_add(TCP_Transporter *t)
177 {
178   assert(m_transporters.get(t->getRemoteNodeId()));
179 #if defined(HAVE_EPOLL_CREATE)
180   if (m_epoll_fd != -1)
181   {
182     bool add = true;
183     struct epoll_event event_poll;
184     bzero(&event_poll, sizeof(event_poll));
185     NDB_SOCKET_TYPE sock_fd = t->getSocket();
186     int node_id = t->getRemoteNodeId();
187     int op = EPOLL_CTL_ADD;
188     int ret_val, error;
189 
190     if (!my_socket_valid(sock_fd))
191       return FALSE;
192 
193     event_poll.data.u32 = t->getRemoteNodeId();
194     event_poll.events = EPOLLIN;
195     ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
196     if (!ret_val)
197       goto ok;
198     error= errno;
199     if (error == ENOENT && !add)
200     {
201       /*
202        * Could be that socket was closed premature to this call.
203        * Not a problem that this occurs.
204        */
205       goto ok;
206     }
207     if (!add || (add && (error != ENOMEM)))
208     {
209       /*
210        * Serious problems, we are either using wrong parameters,
211        * have permission problems or the socket doesn't support
212        * epoll!!
213        */
214       ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
215                " node %u to epoll-set,"
216                " errno: %u %s",
217                add ? "ADD" : "DEL",
218                m_epoll_fd,
219                MY_SOCKET_FORMAT_VALUE(sock_fd),
220                node_id,
221                error,
222                strerror(error));
223       abort();
224     }
225     ndbout << "We lacked memory to add the socket for node id ";
226     ndbout << node_id << endl;
227     return false;
228   }
229 
230 ok:
231 #endif
232   return true;
233 }
234 
~TransporterReceiveData()235 TransporterReceiveData::~TransporterReceiveData()
236 {
237 #if defined(HAVE_EPOLL_CREATE)
238   if (m_epoll_fd != -1)
239   {
240     close(m_epoll_fd);
241     m_epoll_fd = -1;
242   }
243 
244   if (m_epoll_events)
245   {
246     delete [] m_epoll_events;
247     m_epoll_events = 0;
248   }
249 #endif
250 }
251 
TransporterRegistry(TransporterCallback * callback,TransporterReceiveHandle * recvHandle,bool use_default_send_buffer,unsigned _maxTransporters)252 TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
253                                          TransporterReceiveHandle * recvHandle,
254                                          bool use_default_send_buffer,
255                                          unsigned _maxTransporters) :
256   m_mgm_handle(0),
257   localNodeId(0),
258   connectBackoffMaxTime(0),
259   m_transp_count(0),
260   m_use_default_send_buffer(use_default_send_buffer),
261   m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
262   m_total_max_send_buffer(0)
263 {
264   DBUG_ENTER("TransporterRegistry::TransporterRegistry");
265 
266   receiveHandle = recvHandle;
267   maxTransporters = _maxTransporters;
268   sendCounter = 1;
269 
270   callbackObj=callback;
271 
272   theTCPTransporters  = new TCP_Transporter * [maxTransporters];
273   theSCITransporters  = new SCI_Transporter * [maxTransporters];
274   theSHMTransporters  = new SHM_Transporter * [maxTransporters];
275   theTransporterTypes = new TransporterType   [maxTransporters];
276   theTransporters     = new Transporter     * [maxTransporters];
277   performStates       = new PerformState      [maxTransporters];
278   ioStates            = new IOState           [maxTransporters];
279   peerUpIndicators    = new bool              [maxTransporters];
280   connectingTime      = new Uint32            [maxTransporters];
281   m_disconnect_errnum = new int               [maxTransporters];
282   m_error_states      = new ErrorState        [maxTransporters];
283 
284   m_has_extra_wakeup_socket = false;
285 
286 #ifdef ERROR_INSERT
287   m_blocked.clear();
288   m_blocked_disconnected.clear();
289 
290   m_mixology_level = 0;
291 #endif
292 
293   // Initialize member variables
294   nTransporters    = 0;
295   nTCPTransporters = 0;
296   nSCITransporters = 0;
297   nSHMTransporters = 0;
298 
299   // Initialize the transporter arrays
300   ErrorState default_error_state = { TE_NO_ERROR, (const char *)~(UintPtr)0 };
301   for (unsigned i=0; i<maxTransporters; i++) {
302     theTCPTransporters[i] = NULL;
303     theSCITransporters[i] = NULL;
304     theSHMTransporters[i] = NULL;
305     theTransporters[i]    = NULL;
306     performStates[i]      = DISCONNECTED;
307     ioStates[i]           = NoHalt;
308     peerUpIndicators[i]   = true; // Assume all nodes are up, will be
309                                   // cleared at first connect attempt
310     connectingTime[i]     = 0;
311     m_disconnect_errnum[i]= 0;
312     m_error_states[i]     = default_error_state;
313   }
314 
315   DBUG_VOID_RETURN;
316 }
317 
318 #define MIN_SEND_BUFFER_SIZE (4 * 1024 * 1024)
319 
320 void
allocate_send_buffers(Uint64 total_send_buffer,Uint64 extra_send_buffer)321 TransporterRegistry::allocate_send_buffers(Uint64 total_send_buffer,
322                                            Uint64 extra_send_buffer)
323 {
324   if (!m_use_default_send_buffer)
325     return;
326 
327   if (total_send_buffer == 0)
328     total_send_buffer = get_total_max_send_buffer();
329 
330   total_send_buffer += extra_send_buffer;
331 
332   if (!extra_send_buffer)
333   {
334     /**
335      * If extra send buffer memory is 0 it means we can decide on an
336      * appropriate value for it. We select to always ensure that the
337      * minimum send buffer memory is 4M, otherwise we simply don't
338      * add any extra send buffer memory at all.
339      */
340     if (total_send_buffer < MIN_SEND_BUFFER_SIZE)
341     {
342       total_send_buffer = (Uint64)MIN_SEND_BUFFER_SIZE;
343     }
344   }
345 
346   if (m_send_buffers)
347   {
348     /* Send buffers already allocated -> resize the buffer pages */
349     assert(m_send_buffer_memory);
350 
351     // TODO resize send buffer pages
352 
353     return;
354   }
355 
356   /* Initialize transporter send buffers (initially empty). */
357   m_send_buffers = new SendBuffer[maxTransporters];
358   for (unsigned i = 0; i < maxTransporters; i++)
359   {
360     SendBuffer &b = m_send_buffers[i];
361     b.m_first_page = NULL;
362     b.m_last_page = NULL;
363     b.m_used_bytes = 0;
364   }
365 
366   /* Initialize the page freelist. */
367   Uint64 send_buffer_pages =
368     (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
369   /* Add one extra page of internal fragmentation overhead per transporter. */
370   send_buffer_pages += nTransporters;
371 
372   m_send_buffer_memory =
373     new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
374   if (m_send_buffer_memory == NULL)
375   {
376     ndbout << "Unable to allocate "
377            << send_buffer_pages * SendBufferPage::PGSIZE
378            << " bytes of memory for send buffers, aborting." << endl;
379     abort();
380   }
381 
382   m_page_freelist = NULL;
383   for (unsigned i = 0; i < send_buffer_pages; i++)
384   {
385     SendBufferPage *page =
386       (SendBufferPage *)(m_send_buffer_memory + i * SendBufferPage::PGSIZE);
387     page->m_bytes = 0;
388     page->m_next = m_page_freelist;
389     m_page_freelist = page;
390   }
391   m_tot_send_buffer_memory = SendBufferPage::PGSIZE * send_buffer_pages;
392   m_tot_used_buffer_memory = 0;
393 }
394 
set_mgm_handle(NdbMgmHandle h)395 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
396 {
397   DBUG_ENTER("TransporterRegistry::set_mgm_handle");
398   if (m_mgm_handle)
399     ndb_mgm_destroy_handle(&m_mgm_handle);
400   m_mgm_handle= h;
401   ndb_mgm_set_timeout(m_mgm_handle, 5000);
402 #ifndef NDEBUG
403   if (h)
404   {
405     char buf[256];
406     DBUG_PRINT("info",("handle set with connectstring: %s",
407 		       ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
408   }
409   else
410   {
411     DBUG_PRINT("info",("handle set to NULL"));
412   }
413 #endif
414   DBUG_VOID_RETURN;
415 }
416 
~TransporterRegistry()417 TransporterRegistry::~TransporterRegistry()
418 {
419   DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
420 
421   disconnectAll();
422   removeAll();
423 
424   delete[] theTCPTransporters;
425   delete[] theSCITransporters;
426   delete[] theSHMTransporters;
427   delete[] theTransporterTypes;
428   delete[] theTransporters;
429   delete[] performStates;
430   delete[] ioStates;
431   delete[] peerUpIndicators;
432   delete[] connectingTime;
433   delete[] m_disconnect_errnum;
434   delete[] m_error_states;
435 
436   if (m_send_buffers)
437     delete[] m_send_buffers;
438   m_page_freelist = NULL;
439   if (m_send_buffer_memory)
440     delete[] m_send_buffer_memory;
441 
442   if (m_mgm_handle)
443     ndb_mgm_destroy_handle(&m_mgm_handle);
444 
445   if (m_has_extra_wakeup_socket)
446   {
447     my_socket_close(m_extra_wakeup_sockets[0]);
448     my_socket_close(m_extra_wakeup_sockets[1]);
449   }
450 
451   DBUG_VOID_RETURN;
452 }
453 
454 void
removeAll()455 TransporterRegistry::removeAll(){
456   for(unsigned i = 0; i<maxTransporters; i++){
457     if(theTransporters[i] != NULL)
458       removeTransporter(theTransporters[i]->getRemoteNodeId());
459   }
460 }
461 
462 void
disconnectAll()463 TransporterRegistry::disconnectAll(){
464   for(unsigned i = 0; i<maxTransporters; i++){
465     if(theTransporters[i] != NULL)
466       theTransporters[i]->doDisconnect();
467   }
468 }
469 
470 bool
init(NodeId nodeId)471 TransporterRegistry::init(NodeId nodeId) {
472   DBUG_ENTER("TransporterRegistry::init");
473   assert(localNodeId == 0 ||
474          localNodeId == nodeId);
475 
476   localNodeId = nodeId;
477 
478   DEBUG("TransporterRegistry started node: " << localNodeId);
479 
480   if (receiveHandle)
481   {
482     if (!init(* receiveHandle))
483       DBUG_RETURN(false);
484   }
485 
486   DBUG_RETURN(true);
487 }
488 
489 bool
init(TransporterReceiveHandle & recvhandle)490 TransporterRegistry::init(TransporterReceiveHandle& recvhandle)
491 {
492   return recvhandle.init(maxTransporters);
493 }
494 
495 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg,bool & close_with_reset) const496 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
497                                     BaseString & msg,
498                                     bool& close_with_reset) const
499 {
500   DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
501 
502   // Read "hello" that consists of node id and transporter
503   // type from client
504   SocketInputStream s_input(sockfd);
505   char buf[11+1+11+1]; // <int> <int>
506   if (s_input.gets(buf, sizeof(buf)) == 0) {
507     msg.assfmt("line: %u : Failed to get nodeid from client", __LINE__);
508     DBUG_PRINT("error", ("Failed to read 'hello' from client"));
509     DBUG_RETURN(false);
510   }
511 
512   int nodeId, remote_transporter_type= -1;
513   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
514   switch (r) {
515   case 2:
516     break;
517   case 1:
518     // we're running version prior to 4.1.9
519     // ok, but with no checks on transporter configuration compatability
520     break;
521   default:
522     msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
523     DBUG_PRINT("error", ("Failed to parse 'hello' from client, buf: '%.*s'",
524                          (int)sizeof(buf), buf));
525     DBUG_RETURN(false);
526   }
527 
528   DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d",
529 		      nodeId, remote_transporter_type));
530 
531 
532   // Check that nodeid is in range before accessing the arrays
533   if (nodeId < 0 ||
534       nodeId >= (int)maxTransporters)
535   {
536     msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
537     DBUG_PRINT("error", ("Out of range nodeId: %d from client",
538                          nodeId));
539     DBUG_RETURN(false);
540   }
541 
542   // Check that transporter is allocated
543   Transporter *t= theTransporters[nodeId];
544   if (t == 0)
545   {
546     msg.assfmt("line: %u : Incorrect reply from client: >%s<, node: %u",
547                __LINE__, buf, nodeId);
548     DBUG_PRINT("error", ("No transporter available for node id %d", nodeId));
549     DBUG_RETURN(false);
550   }
551 
552   // Check that the transporter should be connecting
553   if (performStates[nodeId] != TransporterRegistry::CONNECTING)
554   {
555     msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
556                __LINE__, nodeId,
557                getPerformStateString(nodeId),
558                performStates[nodeId]);
559 
560     DBUG_PRINT("error", ("Transporter for node id %d in wrong state",
561                          nodeId));
562 
563     // Avoid TIME_WAIT on server by requesting client to close connection
564     SocketOutputStream s_output(sockfd);
565     if (s_output.println("BYE") < 0)
566     {
567       // Failed to request client close
568       DBUG_PRINT("error", ("Failed to send client BYE"));
569       DBUG_RETURN(false);
570     }
571 
572     // Wait for to close connection by reading EOF(i.e read returns 0)
573     const int read_eof_timeout = 1000; // Fairly short timeout
574     if (read_socket(sockfd, read_eof_timeout,
575                     buf, sizeof(buf)) == 0)
576     {
577       // Client gracefully closed connection, turn off close_with_reset
578       close_with_reset = false;
579       DBUG_RETURN(false);
580     }
581 
582     // Failed to request client close
583     DBUG_RETURN(false);
584   }
585 
586   // Check transporter type
587   if (remote_transporter_type != -1 &&
588       remote_transporter_type != t->m_type)
589   {
590     g_eventLogger->error("Connection from node: %d uses different transporter "
591                          "type: %d, expected type: %d",
592                          nodeId, remote_transporter_type, t->m_type);
593     DBUG_RETURN(false);
594   }
595 
596   // Send reply to client
597   SocketOutputStream s_output(sockfd);
598   if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
599   {
600     msg.assfmt("line: %u : Failed to reply to connecting socket (node: %u)",
601                __LINE__, nodeId);
602     DBUG_PRINT("error", ("Send of reply failed"));
603     DBUG_RETURN(false);
604   }
605 
606   // Setup transporter (transporter responsible for closing sockfd)
607   bool res = t->connect_server(sockfd, msg);
608 
609   if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
610   {
611     msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
612                __LINE__, nodeId,
613                getPerformStateString(nodeId),
614                performStates[nodeId]);
615     // Connection suceeded, but not connecting anymore, return
616     // false to close the connection
617     DBUG_RETURN(false);
618   }
619 
620   DBUG_RETURN(res);
621 }
622 
623 
624 bool
configureTransporter(TransporterConfiguration * config)625 TransporterRegistry::configureTransporter(TransporterConfiguration *config)
626 {
627   NodeId remoteNodeId = config->remoteNodeId;
628 
629   assert(localNodeId);
630   assert(config->localNodeId == localNodeId);
631 
632   if (remoteNodeId >= maxTransporters)
633     return false;
634 
635   Transporter* t = theTransporters[remoteNodeId];
636   if(t != NULL)
637   {
638     // Transporter already exist, try to reconfigure it
639     return t->configure(config);
640   }
641 
642   DEBUG("Configuring transporter from " << localNodeId
643 	<< " to " << remoteNodeId);
644 
645   switch (config->type){
646   case tt_TCP_TRANSPORTER:
647     return createTCPTransporter(config);
648   case tt_SHM_TRANSPORTER:
649     return createSHMTransporter(config);
650   case tt_SCI_TRANSPORTER:
651     return createSCITransporter(config);
652   default:
653     abort();
654     break;
655   }
656   return false;
657 }
658 
659 
660 bool
createTCPTransporter(TransporterConfiguration * config)661 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
662 #ifdef NDB_TCP_TRANSPORTER
663 
664   TCP_Transporter * t = 0;
665   if (config->remoteNodeId == config->localNodeId)
666   {
667     t = new Loopback_Transporter(* this, config);
668   }
669   else
670   {
671     t = new TCP_Transporter(*this, config);
672   }
673 
674   if (t == NULL)
675     return false;
676   else if (!t->initTransporter()) {
677     delete t;
678     return false;
679   }
680 
681   // Put the transporter in the transporter arrays
682   theTCPTransporters[nTCPTransporters]      = t;
683   theTransporters[t->getRemoteNodeId()]     = t;
684   theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
685   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
686   nTransporters++;
687   nTCPTransporters++;
688   m_total_max_send_buffer += t->get_max_send_buffer();
689 
690   return true;
691 #else
692   return false;
693 #endif
694 }
695 
696 bool
createSCITransporter(TransporterConfiguration * config)697 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
698 #ifdef NDB_SCI_TRANSPORTER
699 
700   if(!SCI_Transporter::initSCI())
701     abort();
702 
703   SCI_Transporter * t = new SCI_Transporter(*this,
704                                             config->localHostName,
705                                             config->remoteHostName,
706                                             config->s_port,
707 					    config->isMgmConnection,
708                                             config->sci.sendLimit,
709 					    config->sci.bufferSize,
710 					    config->sci.nLocalAdapters,
711 					    config->sci.remoteSciNodeId0,
712 					    config->sci.remoteSciNodeId1,
713 					    localNodeId,
714 					    config->remoteNodeId,
715 					    config->serverNodeId,
716 					    config->checksum,
717 					    config->signalId);
718 
719   if (t == NULL)
720     return false;
721   else if (!t->initTransporter()) {
722     delete t;
723     return false;
724   }
725   // Put the transporter in the transporter arrays
726   theSCITransporters[nSCITransporters]      = t;
727   theTransporters[t->getRemoteNodeId()]     = t;
728   theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
729   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
730   nTransporters++;
731   nSCITransporters++;
732   m_total_max_send_buffer += t->get_max_send_buffer();
733 
734   return true;
735 #else
736   return false;
737 #endif
738 }
739 
740 bool
createSHMTransporter(TransporterConfiguration * config)741 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
742   DBUG_ENTER("TransporterRegistry::createTransporter SHM");
743 #ifdef NDB_SHM_TRANSPORTER
744 
745   if (!g_ndb_shm_signum) {
746     g_ndb_shm_signum= config->shm.signum;
747     DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
748     /**
749      * Make sure to block g_ndb_shm_signum
750      *   TransporterRegistry::init is run from "main" thread
751      */
752     NdbThread_set_shm_sigmask(TRUE);
753   }
754 
755   if(config->shm.signum != g_ndb_shm_signum)
756     return false;
757 
758   SHM_Transporter * t = new SHM_Transporter(*this,
759 					    config->localHostName,
760 					    config->remoteHostName,
761 					    config->s_port,
762 					    config->isMgmConnection,
763 					    localNodeId,
764 					    config->remoteNodeId,
765 					    config->serverNodeId,
766 					    config->checksum,
767 					    config->signalId,
768 					    config->shm.shmKey,
769 					    config->shm.shmSize
770 					    );
771   if (t == NULL)
772     return false;
773   else if (!t->initTransporter()) {
774     delete t;
775     return false;
776   }
777   // Put the transporter in the transporter arrays
778   theSHMTransporters[nSHMTransporters]      = t;
779   theTransporters[t->getRemoteNodeId()]     = t;
780   theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
781   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
782 
783   nTransporters++;
784   nSHMTransporters++;
785   m_total_max_send_buffer += t->get_max_send_buffer();
786 
787   DBUG_RETURN(true);
788 #else
789   DBUG_RETURN(false);
790 #endif
791 }
792 
793 
794 void
removeTransporter(NodeId nodeId)795 TransporterRegistry::removeTransporter(NodeId nodeId) {
796 
797   DEBUG("Removing transporter from " << localNodeId
798 	<< " to " << nodeId);
799 
800   if(theTransporters[nodeId] == NULL)
801     return;
802 
803   theTransporters[nodeId]->doDisconnect();
804 
805   const TransporterType type = theTransporterTypes[nodeId];
806 
807   int ind = 0;
808   switch(type){
809   case tt_TCP_TRANSPORTER:
810 #ifdef NDB_TCP_TRANSPORTER
811     for(; ind < nTCPTransporters; ind++)
812       if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
813 	break;
814     ind++;
815     for(; ind<nTCPTransporters; ind++)
816       theTCPTransporters[ind-1] = theTCPTransporters[ind];
817     nTCPTransporters --;
818 #endif
819     break;
820   case tt_SCI_TRANSPORTER:
821 #ifdef NDB_SCI_TRANSPORTER
822     for(; ind < nSCITransporters; ind++)
823       if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
824 	break;
825     ind++;
826     for(; ind<nSCITransporters; ind++)
827       theSCITransporters[ind-1] = theSCITransporters[ind];
828     nSCITransporters --;
829 #endif
830     break;
831   case tt_SHM_TRANSPORTER:
832 #ifdef NDB_SHM_TRANSPORTER
833     for(; ind < nSHMTransporters; ind++)
834       if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
835 	break;
836     ind++;
837     for(; ind<nSHMTransporters; ind++)
838       theSHMTransporters[ind-1] = theSHMTransporters[ind];
839     nSHMTransporters --;
840 #endif
841     break;
842   }
843 
844   nTransporters--;
845 
846   // Delete the transporter and remove it from theTransporters array
847   delete theTransporters[nodeId];
848   theTransporters[nodeId] = NULL;
849 }
850 
851 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])852 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
853                                  const SignalHeader * const signalHeader,
854 				 Uint8 prio,
855 				 const Uint32 * const signalData,
856 				 NodeId nodeId,
857 				 const LinearSectionPtr ptr[3]){
858 
859 
860   Transporter *t = theTransporters[nodeId];
861   if(t != NULL &&
862      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
863       ((signalHeader->theReceiversBlockNumber == 252) ||
864        (signalHeader->theReceiversBlockNumber == 4002)))) {
865 
866     if(t->isConnected()){
867       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
868       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
869 	Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
870 	if(insertPtr != 0){
871 	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
872 	  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
873 	  return SEND_OK;
874 	}
875 
876         set_status_overloaded(nodeId, true);
877         int sleepTime = 2;
878 
879 	/**
880 	 * @note: on linux/i386 the granularity is 10ms
881 	 *        so sleepTime = 2 generates a 10 ms sleep.
882 	 */
883 	for(int i = 0; i<50; i++){
884 	  if((nSHMTransporters+nSCITransporters) == 0)
885 	    NdbSleep_MilliSleep(sleepTime);
886           /* FC : Consider counting sleeps here */
887 	  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
888 	  if(insertPtr != 0){
889 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
890 	    updateWritePtr(sendHandle, nodeId, lenBytes, prio);
891 	    break;
892 	  }
893 	}
894 
895 	if(insertPtr != 0){
896 	  /**
897 	   * Send buffer full, but resend works
898 	   */
899 	  report_error(nodeId, TE_SEND_BUFFER_FULL);
900 	  return SEND_OK;
901 	}
902 
903 	WARNING("Signal to " << nodeId << " lost(buffer)");
904 	report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
905 	return SEND_BUFFER_FULL;
906       } else {
907 	return SEND_MESSAGE_TOO_BIG;
908       }
909     } else {
910 #ifdef ERROR_INSERT
911       if (m_blocked.get(nodeId))
912       {
913         /* Looks like it disconnected while blocked.  We'll pretend
914          * not to notice for now
915          */
916         WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
917         return SEND_OK;
918       }
919 #endif
920       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
921       return SEND_DISCONNECTED;
922     }
923   } else {
924     DEBUG("Discarding message to block: "
925 	  << signalHeader->theReceiversBlockNumber
926 	  << " node: " << nodeId);
927 
928     if(t == NULL)
929       return SEND_UNKNOWN_NODE;
930 
931     return SEND_BLOCKED;
932   }
933 }
934 
935 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])936 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
937                                  const SignalHeader * const signalHeader,
938 				 Uint8 prio,
939 				 const Uint32 * const signalData,
940 				 NodeId nodeId,
941 				 class SectionSegmentPool & thePool,
942 				 const SegmentedSectionPtr ptr[3]){
943 
944 
945   Transporter *t = theTransporters[nodeId];
946   if(t != NULL &&
947      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
948       ((signalHeader->theReceiversBlockNumber == 252)||
949        (signalHeader->theReceiversBlockNumber == 4002)))) {
950 
951     if(t->isConnected()){
952       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
953       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
954 	Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
955 	if(insertPtr != 0){
956 	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
957 	  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
958 	  return SEND_OK;
959 	}
960 
961 	/**
962 	 * @note: on linux/i386 the granularity is 10ms
963 	 *        so sleepTime = 2 generates a 10 ms sleep.
964 	 */
965         set_status_overloaded(nodeId, true);
966         int sleepTime = 2;
967 	for(int i = 0; i<50; i++){
968 	  if((nSHMTransporters+nSCITransporters) == 0)
969 	    NdbSleep_MilliSleep(sleepTime);
970 	  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
971 	  if(insertPtr != 0){
972 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
973 	    updateWritePtr(sendHandle, nodeId, lenBytes, prio);
974 	    break;
975 	  }
976 	}
977 
978 	if(insertPtr != 0){
979 	  /**
980 	   * Send buffer full, but resend works
981 	   */
982 	  report_error(nodeId, TE_SEND_BUFFER_FULL);
983 	  return SEND_OK;
984 	}
985 
986 	WARNING("Signal to " << nodeId << " lost(buffer)");
987 	report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
988 	return SEND_BUFFER_FULL;
989       } else {
990 	return SEND_MESSAGE_TOO_BIG;
991       }
992     } else {
993 #ifdef ERROR_INSERT
994       if (m_blocked.get(nodeId))
995       {
996         /* Looks like it disconnected while blocked.  We'll pretend
997          * not to notice for now
998          */
999         WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
1000         return SEND_OK;
1001       }
1002 #endif
1003       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
1004       return SEND_DISCONNECTED;
1005     }
1006   } else {
1007     DEBUG("Discarding message to block: "
1008 	  << signalHeader->theReceiversBlockNumber
1009 	  << " node: " << nodeId);
1010 
1011     if(t == NULL)
1012       return SEND_UNKNOWN_NODE;
1013 
1014     return SEND_BLOCKED;
1015   }
1016 }
1017 
1018 
1019 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const GenericSectionPtr ptr[3])1020 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1021                                  const SignalHeader * const signalHeader,
1022 				 Uint8 prio,
1023 				 const Uint32 * const signalData,
1024 				 NodeId nodeId,
1025 				 const GenericSectionPtr ptr[3]){
1026 
1027 
1028   Transporter *t = theTransporters[nodeId];
1029   if(t != NULL &&
1030      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
1031       ((signalHeader->theReceiversBlockNumber == 252) ||
1032        (signalHeader->theReceiversBlockNumber == 4002)))) {
1033 
1034     if(t->isConnected()){
1035       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
1036       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
1037         Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
1038         if(insertPtr != 0){
1039           t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
1040           updateWritePtr(sendHandle, nodeId, lenBytes, prio);
1041           return SEND_OK;
1042 	}
1043 
1044 	/**
1045 	 * @note: on linux/i386 the granularity is 10ms
1046 	 *        so sleepTime = 2 generates a 10 ms sleep.
1047 	 */
1048         set_status_overloaded(nodeId, true);
1049         int sleepTime = 2;
1050 	for(int i = 0; i<50; i++){
1051 	  if((nSHMTransporters+nSCITransporters) == 0)
1052 	    NdbSleep_MilliSleep(sleepTime);
1053 	  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
1054 	  if(insertPtr != 0){
1055 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
1056 	    updateWritePtr(sendHandle, nodeId, lenBytes, prio);
1057 	    break;
1058 	  }
1059 	}
1060 
1061 	if(insertPtr != 0){
1062 	  /**
1063 	   * Send buffer full, but resend works
1064 	   */
1065 	  report_error(nodeId, TE_SEND_BUFFER_FULL);
1066 	  return SEND_OK;
1067 	}
1068 
1069 	WARNING("Signal to " << nodeId << " lost(buffer)");
1070 	report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
1071 	return SEND_BUFFER_FULL;
1072       } else {
1073 	return SEND_MESSAGE_TOO_BIG;
1074       }
1075     } else {
1076       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
1077       return SEND_DISCONNECTED;
1078     }
1079   } else {
1080     DEBUG("Discarding message to block: "
1081 	  << signalHeader->theReceiversBlockNumber
1082 	  << " node: " << nodeId);
1083 
1084     if(t == NULL)
1085       return SEND_UNKNOWN_NODE;
1086 
1087     return SEND_BLOCKED;
1088   }
1089 }
1090 
1091 void
external_IO(Uint32 timeOutMillis)1092 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
1093   //-----------------------------------------------------------
1094   // Most of the time we will send the buffers here and then wait
1095   // for new signals. Thus we start by sending without timeout
1096   // followed by the receive part where we expect to sleep for
1097   // a while.
1098   //-----------------------------------------------------------
1099   if(pollReceive(timeOutMillis, * receiveHandle)){
1100     performReceive(* receiveHandle);
1101   }
1102   performSend();
1103 }
1104 
1105 bool
setup_wakeup_socket(TransporterReceiveHandle & recvdata)1106 TransporterRegistry::setup_wakeup_socket(TransporterReceiveHandle& recvdata)
1107 {
1108   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1109 
1110   if (m_has_extra_wakeup_socket)
1111   {
1112     return true;
1113   }
1114 
1115   assert(!recvdata.m_transporters.get(0));
1116 
1117   if (my_socketpair(m_extra_wakeup_sockets))
1118   {
1119     perror("socketpair failed!");
1120     return false;
1121   }
1122 
1123   if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
1124       !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
1125   {
1126     goto err;
1127   }
1128 
1129 #if defined(HAVE_EPOLL_CREATE)
1130   if (recvdata.m_epoll_fd != -1)
1131   {
1132     int sock = m_extra_wakeup_sockets[0].fd;
1133     struct epoll_event event_poll;
1134     bzero(&event_poll, sizeof(event_poll));
1135     event_poll.data.u32 = 0;
1136     event_poll.events = EPOLLIN;
1137     int ret_val = epoll_ctl(recvdata.m_epoll_fd, EPOLL_CTL_ADD, sock,
1138                             &event_poll);
1139     if (ret_val != 0)
1140     {
1141       int error= errno;
1142       fprintf(stderr, "Failed to add extra sock %u to epoll-set: %u\n",
1143               sock, error);
1144       fflush(stderr);
1145       goto err;
1146     }
1147   }
1148 #endif
1149   m_has_extra_wakeup_socket = true;
1150   recvdata.m_transporters.set(Uint32(0));
1151   return true;
1152 
1153 err:
1154   my_socket_close(m_extra_wakeup_sockets[0]);
1155   my_socket_close(m_extra_wakeup_sockets[1]);
1156   my_socket_invalidate(m_extra_wakeup_sockets+0);
1157   my_socket_invalidate(m_extra_wakeup_sockets+1);
1158   return false;
1159 }
1160 
1161 void
wakeup()1162 TransporterRegistry::wakeup()
1163 {
1164   if (m_has_extra_wakeup_socket)
1165   {
1166     static char c = 37;
1167     my_send(m_extra_wakeup_sockets[1], &c, 1, 0);
1168   }
1169 }
1170 
1171 Uint32
pollReceive(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1172 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
1173                                  TransporterReceiveHandle& recvdata)
1174 {
1175   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1176 
1177   Uint32 retVal = 0;
1178   recvdata.m_recv_transporters.clear();
1179 
1180   /**
1181    * If any transporters have left-over data that was not fully executed in
1182    * last loop, don't wait and return 'data available' even if nothing new
1183    */
1184   if (!recvdata.m_has_data_transporters.isclear())
1185   {
1186     timeOutMillis = 0;
1187     retVal = 1;
1188   }
1189 
1190   if (nSCITransporters > 0)
1191   {
1192     timeOutMillis=0;
1193   }
1194 
1195 #ifdef NDB_SHM_TRANSPORTER
1196   if (nSHMTransporters > 0)
1197   {
1198     Uint32 res = poll_SHM(0, recvdata);
1199     if(res)
1200     {
1201       retVal |= res;
1202       timeOutMillis = 0;
1203     }
1204   }
1205 #endif
1206 
1207 #ifdef NDB_TCP_TRANSPORTER
1208 #if defined(HAVE_EPOLL_CREATE)
1209   if (likely(recvdata.m_epoll_fd != -1))
1210   {
1211     int tcpReadSelectReply = 0;
1212     Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
1213 
1214     if (num_trps)
1215     {
1216       tcpReadSelectReply = epoll_wait(recvdata.m_epoll_fd,
1217                                       recvdata.m_epoll_events,
1218                                       num_trps, timeOutMillis);
1219       retVal |= tcpReadSelectReply;
1220     }
1221 
1222     int num_socket_events = tcpReadSelectReply;
1223     if (num_socket_events > 0)
1224     {
1225       for (int i = 0; i < num_socket_events; i++)
1226       {
1227         const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
1228         /**
1229          * check that it's assigned to "us"
1230          */
1231         assert(recvdata.m_transporters.get(trpid));
1232 
1233         recvdata.m_recv_transporters.set(trpid);
1234       }
1235     }
1236     else if (num_socket_events < 0)
1237     {
1238       assert(errno == EINTR);
1239     }
1240   }
1241   else
1242 #endif
1243   {
1244     if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
1245     {
1246       retVal |= poll_TCP(timeOutMillis, recvdata);
1247     }
1248   }
1249 #endif
1250 #ifdef NDB_SCI_TRANSPORTER
1251   if (nSCITransporters > 0)
1252     retVal |= poll_SCI(timeOutMillis, recvdata);
1253 #endif
1254 #ifdef NDB_SHM_TRANSPORTER
1255   if (nSHMTransporters > 0)
1256   {
1257     int res = poll_SHM(0, recvdata);
1258     retVal |= res;
1259   }
1260 #endif
1261   return retVal;
1262 }
1263 
1264 
1265 #ifdef NDB_SCI_TRANSPORTER
1266 Uint32
poll_SCI(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1267 TransporterRegistry::poll_SCI(Uint32 timeOutMillis,
1268                               TransporterReceiveHandle& recvdata)
1269 {
1270   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1271 
1272   Uint32 retVal = 0;
1273   for (int i = 0; i < nSCITransporters; i++)
1274   {
1275     SCI_Transporter * t = theSCITransporters[i];
1276     Uint32 node_id = t->getRemoteNodeId();
1277 
1278     if (!recvdata.m_transporters.get(nodeId))
1279       continue;
1280 
1281     if (t->isConnected() && is_connected(node_id))
1282     {
1283       if (t->hasDataToRead())
1284       {
1285         recvdata.m_has_data_transporters.set(node_id);
1286 	retVal = 1;
1287       }
1288     }
1289   }
1290   return retVal;
1291 }
1292 #endif
1293 
1294 
1295 #ifdef NDB_SHM_TRANSPORTER
1296 static int g_shm_counter = 0;
1297 Uint32
poll_SHM(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1298 TransporterRegistry::poll_SHM(Uint32 timeOutMillis,
1299                               TransporterReceiveHandle& recvdata)
1300 {
1301   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1302 
1303   Uint32 retVal = 0;
1304   for (int j = 0; j < 100; j++)
1305   {
1306     for (int i = 0; i<nSHMTransporters; i++)
1307     {
1308       SHM_Transporter * t = theSHMTransporters[i];
1309       Uint32 node_id = t->getRemoteNodeId();
1310 
1311       if (!recvdata.m_transporters.get(node_id))
1312         continue;
1313 
1314       if (t->isConnected() && is_connected(node_id))
1315       {
1316 	if (t->hasDataToRead())
1317         {
1318           j = 100;
1319           recvdata.m_has_data_transporters.set(node_id);
1320           retVal = 1;
1321 	}
1322       }
1323     }
1324   }
1325   return retVal;
1326 }
1327 #endif
1328 
1329 #ifdef NDB_TCP_TRANSPORTER
1330 /**
1331  * We do not want to hold any transporter locks during select(), so there
1332  * is no protection against a disconnect closing the socket during this call.
1333  *
1334  * That does not matter, at most we will get a spurious wakeup on the wrong
1335  * socket, which will be handled correctly in performReceive() (which _is_
1336  * protected by transporter locks on upper layer).
1337  */
1338 Uint32
poll_TCP(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1339 TransporterRegistry::poll_TCP(Uint32 timeOutMillis,
1340                               TransporterReceiveHandle& recvdata)
1341 {
1342   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1343 
1344   recvdata.m_socket_poller.clear();
1345 
1346   const bool extra_socket = m_has_extra_wakeup_socket;
1347   if (extra_socket && recvdata.m_transporters.get(0))
1348   {
1349     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1350     assert(&recvdata == receiveHandle); // not used by ndbmtd...
1351 
1352     // Poll the wakup-socket for read
1353     recvdata.m_socket_poller.add(socket, true, false, false);
1354   }
1355 
1356   Uint16 idx[MAX_NODES];
1357   for (int i = 0; i < nTCPTransporters; i++)
1358   {
1359     TCP_Transporter * t = theTCPTransporters[i];
1360     const NDB_SOCKET_TYPE socket = t->getSocket();
1361     Uint32 node_id = t->getRemoteNodeId();
1362 
1363     idx[i] = MAX_NODES + 1;
1364     if (!recvdata.m_transporters.get(node_id))
1365       continue;
1366 
1367     if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
1368     {
1369       idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
1370     }
1371   }
1372 
1373   int tcpReadSelectReply = recvdata.m_socket_poller.poll_unsafe(timeOutMillis);
1374 
1375   if (tcpReadSelectReply > 0)
1376   {
1377     if (extra_socket)
1378     {
1379       if (recvdata.m_socket_poller.has_read(0))
1380       {
1381         assert(recvdata.m_transporters.get(0));
1382         recvdata.m_recv_transporters.set((Uint32)0);
1383       }
1384     }
1385 
1386     for (int i = 0; i < nTCPTransporters; i++)
1387     {
1388       TCP_Transporter * t = theTCPTransporters[i];
1389       if (idx[i] != MAX_NODES + 1)
1390       {
1391         Uint32 node_id = t->getRemoteNodeId();
1392         if (recvdata.m_socket_poller.has_read(idx[i]))
1393           recvdata.m_recv_transporters.set(node_id);
1394       }
1395     }
1396   }
1397 
1398   return tcpReadSelectReply;
1399 }
1400 #endif
1401 
1402 /**
1403  * Receive from the set of transporters in the bitmask
1404  * 'recvdata.m_transporters'. These has been polled by
1405  * ::pollReceive() which recorded transporters with
1406  * available data in the subset 'recvdata.m_recv_transporters'.
1407  *
1408  * In multi-threaded datanodes, there might be multiple
1409  * receiver threads, each serving a disjunct set of
1410  * 'm_transporters'.
1411  *
1412  * Single-threaded datanodes does all ::performReceive
1413  * from the scheduler main-loop, and thus it will handle
1414  * all 'm_transporters'.
1415  *
1416  * Clients has to aquire a 'poll right' (see TransporterFacade)
1417  * which gives it the right to temporarily acts as a receive
1418  * thread with the right to poll *all* transporters.
1419  *
1420  * Reception takes place on a set of transporters knowing to be in a
1421  * 'CONNECTED' state. Transporters can (asynch) become 'DISCONNECTING'
1422  * while we performReceive(). There is *no* mutex lock protecting
1423  * 'disconnecting' from being started while we are in the receive-loop!
1424  * However, the contents of the buffers++  should still be in a
1425  * consistent state, such that the current receive can complete
1426  * without failures.
1427  *
1428  * With regular intervals we have to ::update_connections()
1429  * in order to bring DISCONNECTING transporters into
1430  * a DISCONNECTED state. At earlies at this point, resources
1431  * used by performReceive() may be reset or released.
1432  * A transporter should be brought to the DISCONNECTED state
1433  * before it can reconnect again. (Note: There is a break of
1434  * this rule in ::do_connect, see own note here)
1435  *
1436  * To not interfere with ::poll- or ::performReceive(),
1437  * ::update_connections() has to be synched with with these
1438  * methods. Either by being run within the same
1439  * receive thread (dataNodes), or protected by the 'poll rights'.
1440  *
1441  * In case we were unable to receive due to job buffers being full.
1442  * Returns 0 when receive succeeded from all Transporters having data,
1443  * else 1.
1444  */
1445 Uint32
performReceive(TransporterReceiveHandle & recvdata)1446 TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
1447 {
1448   TransporterReceiveWatchdog guard(recvdata);
1449   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1450   bool stopReceiving = false;
1451 
1452   if (recvdata.m_recv_transporters.get(0))
1453   {
1454     assert(recvdata.m_transporters.get(0));
1455     assert(&recvdata == receiveHandle); // not used by ndbmtd
1456     recvdata.m_recv_transporters.clear(Uint32(0));
1457     consume_extra_sockets();
1458   }
1459 
1460 #ifdef ERROR_INSERT
1461   if (!m_blocked.isclear())
1462   {
1463     /* Exclude receive from blocked sockets. */
1464     recvdata.m_recv_transporters.bitANDC(m_blocked);
1465 
1466     if (recvdata.m_recv_transporters.isclear()  &&
1467         recvdata.m_has_data_transporters.isclear())
1468     {
1469         /* poll sees data, but we want to ignore for now
1470          * sleep a little to avoid busy loop
1471          */
1472       NdbSleep_MilliSleep(1);
1473     }
1474   }
1475 #endif
1476 
1477 #ifdef NDB_TCP_TRANSPORTER
1478   /**
1479    * Receive data from transporters polled to have data.
1480    * Add to set of transporters having pending data.
1481    */
1482   for(Uint32 id = recvdata.m_recv_transporters.find_first();
1483       id != BitmaskImpl::NotFound;
1484       id = recvdata.m_recv_transporters.find_next(id + 1))
1485   {
1486     TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
1487     assert(recvdata.m_transporters.get(id));
1488 
1489     /**
1490      * First check connection 'is CONNECTED.
1491      * A connection can only be set into, or taken out of, is_connected'
1492      * state by ::update_connections(). See comment there about
1493      * synchronication between ::update_connections() and
1494      * performReceive()
1495      *
1496      * Transporter::isConnected() state my change asynch.
1497      * A mismatch between the TransporterRegistry::is_connected(),
1498      * and Transporter::isConnected() state is possible, and indicate
1499      * that a change is underway. (Completed by update_connections())
1500      */
1501     if (is_connected(id))
1502     {
1503       if (t->isConnected())
1504       {
1505         int nBytes = t->doReceive(recvdata);
1506         if (nBytes > 0)
1507         {
1508           recvdata.transporter_recv_from(id);
1509           recvdata.m_has_data_transporters.set(id);
1510         }
1511       }
1512     }
1513   }
1514   recvdata.m_recv_transporters.clear();
1515 
1516   /**
1517    * Unpack data either received above or pending from prev rounds.
1518    *
1519    * Data to be processed at this stage is in the Transporter
1520    * receivebuffer. The data *is received*, and will stay in
1521    * the  receiveBuffer even if a disconnect is started during
1522    * unpack.
1523    * When ::update_connection() finaly completes the disconnect,
1524    * (synced with ::performReceive()), 'm_has_data_transporters'
1525    * will be cleared, which will terminate further unpacking.
1526    *
1527    * NOTE:
1528    *  Without reading inconsistent date, we could have removed
1529    *  the 'connected' checks below, However, there is a requirement
1530    *  in the CLOSE_COMREQ/CONF protocol between TRPMAN and QMGR
1531    *  that no signals arrives from disconnecting nodes after a
1532    *  CLOSE_COMCONF was sent. For the moment the risk of taking
1533    *  advantage of this small optimization is not worth the risk.
1534    */
1535   for(Uint32 id = recvdata.m_has_data_transporters.find_first();
1536       id != BitmaskImpl::NotFound && !stopReceiving;
1537       id = recvdata.m_has_data_transporters.find_next(id + 1))
1538   {
1539     bool hasdata = false;
1540     TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
1541 
1542     assert(recvdata.m_transporters.get(id));
1543 
1544     if (is_connected(id))
1545     {
1546       if (t->isConnected())
1547       {
1548         if (unlikely(recvdata.checkJobBuffer()))
1549           return 1;     // Full, can't unpack more
1550         if (unlikely(recvdata.m_handled_transporters.get(id)))
1551           continue;     // Skip now to avoid starvation
1552         Uint32 * ptr;
1553         Uint32 sz = t->getReceiveData(&ptr);
1554         Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id], stopReceiving);
1555         if (likely(szUsed))
1556         {
1557           t->updateReceiveDataPtr(szUsed);
1558           hasdata = t->hasReceiveData();
1559         }
1560         // else, we didn't unpack anything:
1561         //   Avail ReceiveData to short to be usefull, need to
1562         //   receive more before we can resume this transporter.
1563       }
1564     }
1565     // If transporter still have data, make sure that it's remember to next time
1566     recvdata.m_has_data_transporters.set(id, hasdata);
1567     recvdata.m_handled_transporters.set(id, hasdata);
1568   }
1569 #endif
1570 
1571 #ifdef NDB_SCI_TRANSPORTER
1572   //performReceive
1573   //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))
1574   for (int i=0; i<nSCITransporters && !stopReceiving; i++)
1575   {
1576     SCI_Transporter  *t = theSCITransporters[i];
1577     const NodeId nodeId = t->getRemoteNodeId();
1578     assert(recvdata.m_transporters.get(nodeId));
1579     if(is_connected(nodeId))
1580     {
1581       if(t->isConnected() && t->checkConnected())
1582       {
1583         if (unlikely(recvdata.checkJobBuffer()))
1584           return 1;      // Full, can't unpack more
1585         if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
1586           continue;      // Skip now to avoid starvation
1587 
1588         Uint32 * readPtr, * eodPtr;
1589         t->getReceivePtr(&readPtr, &eodPtr);
1590         callbackObj->transporter_recv_from(nodeId);
1591         Uint32 *newPtr = unpack(recvdata, readPtr, eodPtr, nodeId, ioStates[nodeId], stopReceiving);
1592         t->updateReceivePtr(newPtr);
1593       }
1594     }
1595     recvdata.m_handled_transporters.set(nodeId);
1596   }
1597 #endif
1598 #ifdef NDB_SHM_TRANSPORTER
1599   for (int i=0; i<nSHMTransporters && !stopReceiving; i++)
1600   {
1601     SHM_Transporter *t = theSHMTransporters[i];
1602     const NodeId nodeId = t->getRemoteNodeId();
1603     assert(recvdata.m_transporters.get(nodeId));
1604     if(is_connected(nodeId)){
1605       if(t->isConnected() && t->checkConnected())
1606       {
1607         if (unlikely(recvdata.checkJobBuffer()))
1608           return 1;      // Full, can't unpack more
1609         if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
1610           continue;      // Previously handled, skip to avoid starvation
1611 
1612         Uint32 * readPtr, * eodPtr;
1613         t->getReceivePtr(&readPtr, &eodPtr);
1614         recvdata.transporter_recv_from(nodeId);
1615         Uint32 *newPtr = unpack(recvdata,
1616                                 readPtr, eodPtr, nodeId, ioStates[nodeId],
1617 				stopReceiving);
1618         t->updateReceivePtr(newPtr);
1619       }
1620     }
1621     recvdata.m_handled_transporters.set(nodeId);
1622   }
1623 #endif
1624   recvdata.m_handled_transporters.clear();
1625   return 0;
1626 }
1627 
1628 /**
1629  * In multi-threaded cases, this must be protected by send lock (can use
1630  * different locks for each node).
1631  */
1632 bool
performSend(NodeId nodeId)1633 TransporterRegistry::performSend(NodeId nodeId)
1634 {
1635   Transporter *t = get_transporter(nodeId);
1636   if (t && t->isConnected() && is_connected(nodeId))
1637   {
1638     return t->doSend();
1639   }
1640 
1641   return false;
1642 }
1643 
1644 void
consume_extra_sockets()1645 TransporterRegistry::consume_extra_sockets()
1646 {
1647   char buf[4096];
1648   ssize_t ret;
1649   int err;
1650   NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
1651   do
1652   {
1653     ret = my_recv(sock, buf, sizeof(buf), 0);
1654     err = my_socket_errno();
1655   } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
1656 
1657   /* Notify upper layer of explicit wakeup */
1658   callbackObj->reportWakeup();
1659 }
1660 
1661 void
performSend()1662 TransporterRegistry::performSend()
1663 {
1664   int i;
1665   sendCounter = 1;
1666 
1667 #ifdef NDB_TCP_TRANSPORTER
1668   for (i = m_transp_count; i < nTCPTransporters; i++)
1669   {
1670     TCP_Transporter *t = theTCPTransporters[i];
1671     if (t && t->has_data_to_send() &&
1672         t->isConnected() && is_connected(t->getRemoteNodeId()))
1673     {
1674       t->doSend();
1675     }
1676   }
1677   for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
1678   {
1679     TCP_Transporter *t = theTCPTransporters[i];
1680     if (t && t->has_data_to_send() &&
1681         t->isConnected() && is_connected(t->getRemoteNodeId()))
1682     {
1683       t->doSend();
1684     }
1685   }
1686   m_transp_count++;
1687   if (m_transp_count == nTCPTransporters) m_transp_count = 0;
1688 #endif
1689 #ifdef NDB_SCI_TRANSPORTER
1690   //scroll through the SCI transporters,
1691   // get each transporter, check if connected, send data
1692   for (i=0; i<nSCITransporters; i++) {
1693     SCI_Transporter  *t = theSCITransporters[i];
1694     const NodeId nodeId = t->getRemoteNodeId();
1695 
1696     if(is_connected(nodeId))
1697     {
1698       if(t->isConnected() && t->has_data_to_send())
1699       {
1700 	t->doSend();
1701       } //if
1702     } //if
1703   }
1704 #endif
1705 
1706 #ifdef NDB_SHM_TRANSPORTER
1707   for (i=0; i<nSHMTransporters; i++)
1708   {
1709     SHM_Transporter  *t = theSHMTransporters[i];
1710     const NodeId nodeId = t->getRemoteNodeId();
1711     if(is_connected(nodeId))
1712     {
1713       if(t->isConnected())
1714       {
1715 	t->doSend();
1716       }
1717     }
1718   }
1719 #endif
1720 }
1721 
1722 int
forceSendCheck(int sendLimit)1723 TransporterRegistry::forceSendCheck(int sendLimit){
1724   int tSendCounter = sendCounter;
1725   sendCounter = tSendCounter + 1;
1726   if (tSendCounter >= sendLimit) {
1727     performSend();
1728     sendCounter = 1;
1729     return 1;
1730   }//if
1731   return 0;
1732 }//TransporterRegistry::forceSendCheck()
1733 
1734 #ifdef DEBUG_TRANSPORTER
1735 void
printState()1736 TransporterRegistry::printState(){
1737   ndbout << "-- TransporterRegistry -- " << endl << endl
1738 	 << "Transporters = " << nTransporters << endl;
1739   for(int i = 0; i<maxTransporters; i++)
1740     if(theTransporters[i] != NULL){
1741       const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
1742       ndbout << "Transporter: " << remoteNodeId
1743 	     << " PerformState: " << performStates[remoteNodeId]
1744 	     << " IOState: " << ioStates[remoteNodeId] << endl;
1745     }
1746 }
1747 #endif
1748 
1749 #ifdef ERROR_INSERT
1750 bool
isBlocked(NodeId nodeId)1751 TransporterRegistry::isBlocked(NodeId nodeId)
1752 {
1753   return m_blocked.get(nodeId);
1754 }
1755 
1756 void
blockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)1757 TransporterRegistry::blockReceive(TransporterReceiveHandle& recvdata,
1758                                   NodeId nodeId)
1759 {
1760   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1761   assert(recvdata.m_transporters.get(nodeId));
1762 
1763   /* Check that node is not already blocked?
1764    * Stop pulling from its socket (but track received data etc)
1765    */
1766   /* Shouldn't already be blocked with data */
1767   assert(!m_blocked.get(nodeId));
1768 
1769   m_blocked.set(nodeId);
1770 }
1771 
1772 void
unblockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)1773 TransporterRegistry::unblockReceive(TransporterReceiveHandle& recvdata,
1774                                     NodeId nodeId)
1775 {
1776   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1777   assert(recvdata.m_transporters.get(nodeId));
1778 
1779   /* Check that node is blocked?
1780    * Resume pulling from its socket
1781    * Ensure in-flight data is processed if there was some
1782    */
1783   assert(m_blocked.get(nodeId));
1784   assert(!recvdata.m_has_data_transporters.get(nodeId));
1785 
1786   m_blocked.clear(nodeId);
1787 
1788   if (m_blocked_disconnected.get(nodeId))
1789   {
1790     /* Process disconnect notification/handling now */
1791     m_blocked_disconnected.clear(nodeId);
1792 
1793     report_disconnect(recvdata, nodeId, m_disconnect_errors[nodeId]);
1794   }
1795 }
1796 #endif
1797 
1798 #ifdef ERROR_INSERT
1799 Uint32
getMixologyLevel() const1800 TransporterRegistry::getMixologyLevel() const
1801 {
1802   return m_mixology_level;
1803 }
1804 
1805 extern Uint32 MAX_RECEIVED_SIGNALS;  /* Packer.cpp */
1806 
1807 #define MIXOLOGY_MIX_INCOMING_SIGNALS 4
1808 
1809 void
setMixologyLevel(Uint32 l)1810 TransporterRegistry::setMixologyLevel(Uint32 l)
1811 {
1812   m_mixology_level = l;
1813 
1814   if (m_mixology_level & MIXOLOGY_MIX_INCOMING_SIGNALS)
1815   {
1816     ndbout_c("MIXOLOGY_MIX_INCOMING_SIGNALS on");
1817     /* Max one signal per transporter */
1818     MAX_RECEIVED_SIGNALS = 1;
1819   }
1820 
1821   /* TODO : Add mixing of Send from NdbApi / MGMD */
1822 }
1823 #endif
1824 
1825 IOState
ioState(NodeId nodeId) const1826 TransporterRegistry::ioState(NodeId nodeId) const {
1827   return ioStates[nodeId];
1828 }
1829 
1830 void
setIOState(NodeId nodeId,IOState state)1831 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
1832   if (ioStates[nodeId] == state)
1833     return;
1834 
1835   DEBUG("TransporterRegistry::setIOState("
1836         << nodeId << ", " << state << ")");
1837 
1838   ioStates[nodeId] = state;
1839 }
1840 
1841 extern "C" void *
run_start_clients_C(void * me)1842 run_start_clients_C(void * me)
1843 {
1844   ((TransporterRegistry*) me)->start_clients_thread();
1845   return 0;
1846 }
1847 
1848 /**
1849  * This method is used to initiate connection, called from the TRPMAN block.
1850  *
1851  * This works asynchronously, no actions are taken directly in the calling
1852  * thread.
1853  */
1854 void
do_connect(NodeId node_id)1855 TransporterRegistry::do_connect(NodeId node_id)
1856 {
1857   PerformState &curr_state = performStates[node_id];
1858   switch(curr_state){
1859   case DISCONNECTED:
1860     break;
1861   case CONNECTED:
1862     return;
1863   case CONNECTING:
1864     return;
1865   case DISCONNECTING:
1866     /**
1867      * NOTE (Need future work)
1868      * Going directly from DISCONNECTION to CONNECTING creates
1869      * a possile race with ::update_connections(): It will
1870      * see either of the *ING states, and bring the connection
1871      * into CONNECTED or *DISCONNECTED* state. Furthermore, the
1872      * state may be overwritten to CONNECTING by this method.
1873      * We should probably have waited for DISCONNECTED state,
1874      * before allowing reCONNECTING ....
1875      */
1876     break;
1877   }
1878   DBUG_ENTER("TransporterRegistry::do_connect");
1879   DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
1880 
1881   /*
1882     No one else should be using the transporter now, reset
1883     its send buffer
1884    */
1885   callbackObj->reset_send_buffer(node_id);
1886 
1887   Transporter * t = theTransporters[node_id];
1888   if (t != NULL)
1889     t->resetBuffers();
1890 
1891   curr_state= CONNECTING;
1892   DBUG_VOID_RETURN;
1893 }
1894 
1895 /**
1896  * This method is used to initiate disconnect from TRPMAN. It is also called
1897  * from the TCP transporter in case of an I/O error on the socket.
1898  *
1899  * This works asynchronously, similar to do_connect().
1900  */
1901 void
do_disconnect(NodeId node_id,int errnum)1902 TransporterRegistry::do_disconnect(NodeId node_id, int errnum)
1903 {
1904   PerformState &curr_state = performStates[node_id];
1905   switch(curr_state){
1906   case DISCONNECTED:
1907     return;
1908   case CONNECTED:
1909     break;
1910   case CONNECTING:
1911     break;
1912   case DISCONNECTING:
1913     return;
1914   }
1915   DBUG_ENTER("TransporterRegistry::do_disconnect");
1916   DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1917   curr_state= DISCONNECTING;
1918   m_disconnect_errnum[node_id] = errnum;
1919   DBUG_VOID_RETURN;
1920 }
1921 
1922 void
report_connect(TransporterReceiveHandle & recvdata,NodeId node_id)1923 TransporterRegistry::report_connect(TransporterReceiveHandle& recvdata,
1924                                     NodeId node_id)
1925 {
1926   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1927   assert(recvdata.m_transporters.get(node_id));
1928 
1929   DBUG_ENTER("TransporterRegistry::report_connect");
1930   DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1931 
1932   /*
1933     The send buffers was reset when this connection
1934     was set to CONNECTING. In order to make sure no stray
1935     signals has been written to the send buffer since then
1936     call 'reset_send_buffer' with the "should_be_empty" flag
1937     set
1938   */
1939   callbackObj->reset_send_buffer(node_id, true);
1940 
1941   if (recvdata.epoll_add((TCP_Transporter*)theTransporters[node_id]))
1942   {
1943     performStates[node_id] = CONNECTED;
1944     recvdata.reportConnect(node_id);
1945     DBUG_VOID_RETURN;
1946   }
1947 
1948   /**
1949    * Failed to add to epoll_set...
1950    *   disconnect it (this is really really bad)
1951    */
1952   performStates[node_id] = DISCONNECTING;
1953   DBUG_VOID_RETURN;
1954 }
1955 
1956 void
report_disconnect(TransporterReceiveHandle & recvdata,NodeId node_id,int errnum)1957 TransporterRegistry::report_disconnect(TransporterReceiveHandle& recvdata,
1958                                        NodeId node_id, int errnum)
1959 {
1960   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1961   assert(recvdata.m_transporters.get(node_id));
1962 
1963   DBUG_ENTER("TransporterRegistry::report_disconnect");
1964   DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1965 
1966 #ifdef ERROR_INSERT
1967   if (m_blocked.get(node_id))
1968   {
1969     /* We are simulating real latency, so control events experience
1970      * it too
1971      */
1972     m_blocked_disconnected.set(node_id);
1973     m_disconnect_errors[node_id] = errnum;
1974     DBUG_VOID_RETURN;
1975   }
1976 #endif
1977 
1978   performStates[node_id] = DISCONNECTED;
1979   recvdata.m_recv_transporters.clear(node_id);
1980   recvdata.m_has_data_transporters.clear(node_id);
1981   recvdata.m_handled_transporters.clear(node_id);
1982   recvdata.m_bad_data_transporters.clear(node_id);
1983   recvdata.reportDisconnect(node_id, errnum);
1984   DBUG_VOID_RETURN;
1985 }
1986 
1987 /**
1988  * We only call TransporterCallback::reportError() from
1989  * TransporterRegistry::update_connections().
1990  *
1991  * In other places we call this method to enqueue the error that will later be
1992  * picked up by update_connections().
1993  */
1994 void
report_error(NodeId nodeId,TransporterError errorCode,const char * errorInfo)1995 TransporterRegistry::report_error(NodeId nodeId, TransporterError errorCode,
1996                                   const char *errorInfo)
1997 {
1998   if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
1999       m_error_states[nodeId].m_info == (const char *)~(UintPtr)0)
2000   {
2001     m_error_states[nodeId].m_code = errorCode;
2002     m_error_states[nodeId].m_info = errorInfo;
2003   }
2004 }
2005 
2006 /**
2007  * update_connections(), together with the thread running in
2008  * start_clients_thread(), handle the state changes for transporters as they
2009  * connect and disconnect.
2010  *
2011  * update_connections on a specific set of recvdata *must not* be run
2012  * concurrently with :performReceive() on the same recvdata. Thus,
2013  * it must either be called from the same (receive-)thread as
2014  * performReceive(), or protected by aquiring the (client) poll rights.
2015  */
2016 void
update_connections(TransporterReceiveHandle & recvdata)2017 TransporterRegistry::update_connections(TransporterReceiveHandle& recvdata)
2018 {
2019   TransporterReceiveWatchdog guard(recvdata);
2020   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2021 
2022   for (int i= 0, n= 0; n < nTransporters; i++){
2023     Transporter * t = theTransporters[i];
2024     if (!t)
2025       continue;
2026     n++;
2027 
2028     const NodeId nodeId = t->getRemoteNodeId();
2029     if (!recvdata.m_transporters.get(nodeId))
2030       continue;
2031 
2032     TransporterError code = m_error_states[nodeId].m_code;
2033     const char *info = m_error_states[nodeId].m_info;
2034     if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
2035     {
2036       recvdata.reportError(nodeId, code, info);
2037       m_error_states[nodeId].m_code = TE_NO_ERROR;
2038       m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
2039     }
2040 
2041     switch(performStates[nodeId]){
2042     case CONNECTED:
2043     case DISCONNECTED:
2044       break;
2045     case CONNECTING:
2046       if(t->isConnected())
2047 	report_connect(recvdata, nodeId);
2048       break;
2049     case DISCONNECTING:
2050       if(!t->isConnected())
2051 	report_disconnect(recvdata, nodeId, m_disconnect_errnum[nodeId]);
2052       break;
2053     }
2054   }
2055 }
2056 
2057 /**
2058  * Run as own thread
2059  * Possible blocking parts of transporter connect and diconnect
2060  * is supposed to be handled here.
2061  */
2062 void
start_clients_thread()2063 TransporterRegistry::start_clients_thread()
2064 {
2065   int persist_mgm_count= 0;
2066   DBUG_ENTER("TransporterRegistry::start_clients_thread");
2067   while (m_run_start_clients_thread) {
2068     NdbSleep_MilliSleep(100);
2069     persist_mgm_count++;
2070     if(persist_mgm_count==50)
2071     {
2072       ndb_mgm_check_connection(m_mgm_handle);
2073       persist_mgm_count= 0;
2074     }
2075     for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
2076       Transporter * t = theTransporters[i];
2077       if (!t)
2078 	continue;
2079       n++;
2080 
2081       const NodeId nodeId = t->getRemoteNodeId();
2082       switch(performStates[nodeId]){
2083       case CONNECTING:
2084 	if(!t->isConnected() && !t->isServer) {
2085           if (get_and_clear_node_up_indicator(nodeId))
2086           {
2087             // Other node have indicated that node nodeId is up, try connect
2088             // now and restart backoff sequence
2089             backoff_reset_connecting_time(nodeId);
2090           }
2091           if (!backoff_update_and_check_time_for_connect(nodeId))
2092           {
2093             // Skip connect this time
2094             continue;
2095           }
2096 
2097 	  bool connected= false;
2098 	  /**
2099 	   * First, we try to connect (if we have a port number).
2100 	   */
2101 
2102 	  if (t->get_s_port())
2103           {
2104             DBUG_PRINT("info", ("connecting to node %d using port %d",
2105                                 nodeId, t->get_s_port()));
2106             connected= t->connect_client();
2107           }
2108 
2109 	  /**
2110 	   * If dynamic, get the port for connecting from the management server
2111 	   */
2112 	  if( !connected && t->get_s_port() <= 0) {	// Port is dynamic
2113 	    int server_port= 0;
2114 	    struct ndb_mgm_reply mgm_reply;
2115 
2116             DBUG_PRINT("info", ("connection to node %d should use "
2117                                 "dynamic port",
2118                                 nodeId));
2119 
2120 	    if(!ndb_mgm_is_connected(m_mgm_handle))
2121 	      ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
2122 
2123 	    if(ndb_mgm_is_connected(m_mgm_handle))
2124 	    {
2125               DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
2126                                   nodeId));
2127 
2128               const int res=
2129 		ndb_mgm_get_connection_int_parameter(m_mgm_handle,
2130 						     t->getRemoteNodeId(),
2131 						     t->getLocalNodeId(),
2132 						     CFG_CONNECTION_SERVER_PORT,
2133 						     &server_port,
2134 						     &mgm_reply);
2135 	      DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
2136 				 server_port,t->getRemoteNodeId(),
2137 				 t->getLocalNodeId(),res));
2138 	      if( res >= 0 )
2139 	      {
2140                 DBUG_PRINT("info", ("got port %d to use for connection to %d",
2141                                     server_port, nodeId));
2142 
2143 		if (server_port != 0)
2144                 {
2145                   if (t->get_s_port() != server_port)
2146                   {
2147                     // Got a different port number, reset backoff
2148                     backoff_reset_connecting_time(nodeId);
2149                   }
2150                   // Save the new port number
2151 		  t->set_s_port(server_port);
2152                 }
2153                 else
2154                 {
2155                   // Got port number 0, port is not known.  Keep the old.
2156                 }
2157 	      }
2158 	      else if(ndb_mgm_is_connected(m_mgm_handle))
2159 	      {
2160                 DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
2161                                     res));
2162                 g_eventLogger->info("Failed to get dynamic port, res: %d",
2163                                     res);
2164 		ndb_mgm_disconnect(m_mgm_handle);
2165 	      }
2166 	      else
2167 	      {
2168                 DBUG_PRINT("info", ("mgmd close connection early"));
2169                 g_eventLogger->info
2170                   ("Management server closed connection early. "
2171                    "It is probably being shut down (or has problems). "
2172                    "We will retry the connection. %d %s %s line: %d",
2173                    ndb_mgm_get_latest_error(m_mgm_handle),
2174                    ndb_mgm_get_latest_error_desc(m_mgm_handle),
2175                    ndb_mgm_get_latest_error_msg(m_mgm_handle),
2176                    ndb_mgm_get_latest_error_line(m_mgm_handle)
2177                    );
2178 	      }
2179 	    }
2180 	    /** else
2181 	     * We will not be able to get a new port unless
2182 	     * the m_mgm_handle is connected. Note that not
2183 	     * being connected is an ok state, just continue
2184 	     * until it is able to connect. Continue using the
2185 	     * old port until we can connect again and get a
2186 	     * new port.
2187 	     */
2188 	  }
2189 	}
2190 	break;
2191       case DISCONNECTING:
2192 	if(t->isConnected())
2193 	  t->doDisconnect();
2194 	break;
2195       case DISCONNECTED:
2196       {
2197         if (t->isConnected())
2198         {
2199           g_eventLogger->warning("Found connection to %u in state DISCONNECTED "
2200                                  " while being connected, disconnecting!",
2201                                  t->getRemoteNodeId());
2202           t->doDisconnect();
2203         }
2204         break;
2205       }
2206       default:
2207 	break;
2208       }
2209     }
2210   }
2211   DBUG_VOID_RETURN;
2212 }
2213 
2214 struct NdbThread*
start_clients()2215 TransporterRegistry::start_clients()
2216 {
2217   m_run_start_clients_thread= true;
2218   m_start_clients_thread= NdbThread_Create(run_start_clients_C,
2219 					   (void**)this,
2220                                            0, // default stack size
2221 					   "ndb_start_clients",
2222 					   NDB_THREAD_PRIO_LOW);
2223   if (m_start_clients_thread == 0)
2224   {
2225     m_run_start_clients_thread= false;
2226   }
2227   return m_start_clients_thread;
2228 }
2229 
2230 bool
stop_clients()2231 TransporterRegistry::stop_clients()
2232 {
2233   if (m_start_clients_thread) {
2234     m_run_start_clients_thread= false;
2235     void* status;
2236     NdbThread_WaitFor(m_start_clients_thread, &status);
2237     NdbThread_Destroy(&m_start_clients_thread);
2238   }
2239   return true;
2240 }
2241 
2242 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)2243 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
2244 					       const char *interf,
2245 					       int s_port)
2246 {
2247   DBUG_ENTER("TransporterRegistry::add_transporter_interface");
2248   DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
2249   if (interf && strlen(interf) == 0)
2250     interf= 0;
2251 
2252   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
2253   {
2254     Transporter_interface &tmp= m_transporter_interface[i];
2255     if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
2256       continue;
2257     if (interf != 0 && tmp.m_interface != 0 &&
2258 	strcmp(interf, tmp.m_interface) == 0)
2259     {
2260       DBUG_VOID_RETURN; // found match, no need to insert
2261     }
2262     if (interf == 0 && tmp.m_interface == 0)
2263     {
2264       DBUG_VOID_RETURN; // found match, no need to insert
2265     }
2266   }
2267   Transporter_interface t;
2268   t.m_remote_nodeId= remoteNodeId;
2269   t.m_s_service_port= s_port;
2270   t.m_interface= interf;
2271   m_transporter_interface.push_back(t);
2272   DBUG_PRINT("exit",("interface and port added"));
2273   DBUG_VOID_RETURN;
2274 }
2275 
2276 bool
start_service(SocketServer & socket_server)2277 TransporterRegistry::start_service(SocketServer& socket_server)
2278 {
2279   DBUG_ENTER("TransporterRegistry::start_service");
2280   if (m_transporter_interface.size() > 0 &&
2281       localNodeId == 0)
2282   {
2283     g_eventLogger->error("INTERNAL ERROR: not initialized");
2284     DBUG_RETURN(false);
2285   }
2286 
2287   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
2288   {
2289     Transporter_interface &t= m_transporter_interface[i];
2290 
2291     unsigned short port= (unsigned short)t.m_s_service_port;
2292     if(t.m_s_service_port<0)
2293       port= -t.m_s_service_port; // is a dynamic port
2294     TransporterService *transporter_service =
2295       new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
2296     if(!socket_server.setup(transporter_service,
2297 			    &port, t.m_interface))
2298     {
2299       DBUG_PRINT("info", ("Trying new port"));
2300       port= 0;
2301       if(t.m_s_service_port>0
2302 	 || !socket_server.setup(transporter_service,
2303 				 &port, t.m_interface))
2304       {
2305 	/*
2306 	 * If it wasn't a dynamically allocated port, or
2307 	 * our attempts at getting a new dynamic port failed
2308 	 */
2309         g_eventLogger->error("Unable to setup transporter service port: %s:%d!\n"
2310                              "Please check if the port is already used,\n"
2311                              "(perhaps the node is already running)",
2312                              t.m_interface ? t.m_interface : "*", t.m_s_service_port);
2313 	delete transporter_service;
2314 	DBUG_RETURN(false);
2315       }
2316     }
2317     t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
2318     DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
2319     transporter_service->setTransporterRegistry(this);
2320   }
2321   DBUG_RETURN(true);
2322 }
2323 
2324 #ifdef NDB_SHM_TRANSPORTER
2325 extern "C"
2326 void
shm_sig_handler(int signo)2327 shm_sig_handler(int signo)
2328 {
2329   g_shm_counter++;
2330 }
2331 #endif
2332 
2333 void
startReceiving()2334 TransporterRegistry::startReceiving()
2335 {
2336   DBUG_ENTER("TransporterRegistry::startReceiving");
2337 
2338 #ifdef NDB_SHM_TRANSPORTER
2339   m_shm_own_pid = getpid();
2340   if (g_ndb_shm_signum)
2341   {
2342     DBUG_PRINT("info",("Install signal handler for signum %d",
2343 		       g_ndb_shm_signum));
2344     struct sigaction sa;
2345     NdbThread_set_shm_sigmask(FALSE);
2346     sigemptyset(&sa.sa_mask);
2347     sa.sa_handler = shm_sig_handler;
2348     sa.sa_flags = 0;
2349     int ret;
2350     while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR)
2351       ;
2352     if(ret != 0)
2353     {
2354       DBUG_PRINT("error",("Install failed"));
2355       g_eventLogger->error("Failed to install signal handler for"
2356                            " SHM transporter, signum %d, errno: %d (%s)",
2357                            g_ndb_shm_signum, errno, strerror(errno));
2358     }
2359   }
2360 #endif // NDB_SHM_TRANSPORTER
2361   DBUG_VOID_RETURN;
2362 }
2363 
2364 void
stopReceiving()2365 TransporterRegistry::stopReceiving(){
2366 }
2367 
2368 void
startSending()2369 TransporterRegistry::startSending(){
2370 }
2371 
2372 void
stopSending()2373 TransporterRegistry::stopSending(){
2374 }
2375 
operator <<(NdbOut & out,SignalHeader & sh)2376 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
2377   out << "-- Signal Header --" << endl;
2378   out << "theLength:    " << sh.theLength << endl;
2379   out << "gsn:          " << sh.theVerId_signalNumber << endl;
2380   out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
2381   out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
2382   out << "sendersSig:   " << sh.theSendersSignalId << endl;
2383   out << "theSignalId:  " << sh.theSignalId << endl;
2384   out << "trace:        " << (int)sh.theTrace << endl;
2385   return out;
2386 }
2387 
2388 Transporter*
get_transporter(NodeId nodeId)2389 TransporterRegistry::get_transporter(NodeId nodeId) {
2390   assert(nodeId < maxTransporters);
2391   return theTransporters[nodeId];
2392 }
2393 
2394 
connect_client(NdbMgmHandle * h)2395 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
2396 {
2397   DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
2398 
2399   Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
2400 
2401   if(!mgm_nodeid)
2402   {
2403     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2404     return false;
2405   }
2406   Transporter * t = theTransporters[mgm_nodeid];
2407   if (!t)
2408   {
2409     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2410     return false;
2411   }
2412 
2413   bool res = t->connect_client(connect_ndb_mgmd(h));
2414   if (res == true)
2415   {
2416     performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
2417   }
2418   DBUG_RETURN(res);
2419 }
2420 
2421 
report_dynamic_ports(NdbMgmHandle h) const2422 bool TransporterRegistry::report_dynamic_ports(NdbMgmHandle h) const
2423 {
2424   // Fill array of nodeid/port pairs for those ports which are dynamic
2425   unsigned num_ports = 0;
2426   ndb_mgm_dynamic_port ports[MAX_NODES];
2427   for(unsigned i = 0; i < m_transporter_interface.size(); i++)
2428   {
2429     const Transporter_interface& ti = m_transporter_interface[i];
2430     if (ti.m_s_service_port >= 0)
2431       continue; // Not a dynamic port
2432 
2433     assert(num_ports < NDB_ARRAY_SIZE(ports));
2434     ports[num_ports].nodeid = ti.m_remote_nodeId;
2435     ports[num_ports].port = ti.m_s_service_port;
2436     num_ports++;
2437   }
2438 
2439   if (num_ports == 0)
2440   {
2441     // No dynamic ports in use, nothing to report
2442     return true;
2443   }
2444 
2445   // Send array of nodeid/port pairs to mgmd
2446   if (ndb_mgm_set_dynamic_ports(h, localNodeId,
2447                                 ports, num_ports) < 0)
2448   {
2449     g_eventLogger->error("Failed to register dynamic ports, error: %d  - '%s'",
2450                          ndb_mgm_get_latest_error(h),
2451                          ndb_mgm_get_latest_error_desc(h));
2452     return false;
2453   }
2454 
2455   return true;
2456 }
2457 
2458 
2459 /**
2460  * Given a connected NdbMgmHandle, turns it into a transporter
2461  * and returns the socket.
2462  */
connect_ndb_mgmd(NdbMgmHandle * h)2463 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
2464 {
2465   NDB_SOCKET_TYPE sockfd;
2466   my_socket_invalidate(&sockfd);
2467 
2468   DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
2469 
2470   if ( h==NULL || *h == NULL )
2471   {
2472     g_eventLogger->error("Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
2473     DBUG_RETURN(sockfd);
2474   }
2475 
2476   if (!report_dynamic_ports(*h))
2477   {
2478     ndb_mgm_destroy_handle(h);
2479     DBUG_RETURN(sockfd);
2480   }
2481 
2482   /**
2483    * convert_to_transporter also disposes of the handle (i.e. we don't leak
2484    * memory here.
2485    */
2486   DBUG_PRINT("info", ("Converting handle to transporter"));
2487   sockfd= ndb_mgm_convert_to_transporter(h);
2488   if (!my_socket_valid(sockfd))
2489   {
2490     g_eventLogger->error("Failed to convert to transporter (%s: %d)",
2491                          __FILE__, __LINE__);
2492     ndb_mgm_destroy_handle(h);
2493   }
2494   DBUG_RETURN(sockfd);
2495 }
2496 
2497 /**
2498  * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
2499  * and returns the socket.
2500  */
2501 NDB_SOCKET_TYPE
connect_ndb_mgmd(const char * server_name,unsigned short server_port)2502 TransporterRegistry::connect_ndb_mgmd(const char* server_name,
2503                                       unsigned short server_port)
2504 {
2505   NdbMgmHandle h= ndb_mgm_create_handle();
2506   NDB_SOCKET_TYPE s;
2507   my_socket_invalidate(&s);
2508 
2509   DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
2510 
2511   if ( h == NULL )
2512   {
2513     DBUG_RETURN(s);
2514   }
2515 
2516   /**
2517    * Set connectstring
2518    */
2519   {
2520     BaseString cs;
2521     cs.assfmt("%s:%u", server_name, server_port);
2522     ndb_mgm_set_connectstring(h, cs.c_str());
2523   }
2524 
2525   if(ndb_mgm_connect(h, 0, 0, 0)<0)
2526   {
2527     DBUG_PRINT("info", ("connection to mgmd failed"));
2528     ndb_mgm_destroy_handle(&h);
2529     DBUG_RETURN(s);
2530   }
2531 
2532   DBUG_RETURN(connect_ndb_mgmd(&h));
2533 }
2534 
2535 /**
2536  * The calls below are used by all implementations: NDB API, ndbd and
2537  * ndbmtd. The calls to handle->getWritePtr, handle->updateWritePtr
2538  * are handled by special implementations for NDB API, ndbd and
2539  * ndbmtd.
2540  */
2541 
2542 Uint32 *
getWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2543 TransporterRegistry::getWritePtr(TransporterSendBufferHandle *handle,
2544                                  NodeId node, Uint32 lenBytes, Uint32 prio)
2545 {
2546   Transporter *t = theTransporters[node];
2547   Uint32 *insertPtr = handle->getWritePtr(node, lenBytes, prio,
2548                                           t->get_max_send_buffer());
2549 
2550   if (insertPtr == 0) {
2551     //-------------------------------------------------
2552     // Buffer was completely full. We have severe problems.
2553     // We will attempt to wait for a small time
2554     //-------------------------------------------------
2555     if(t->send_is_possible(10)) {
2556       //-------------------------------------------------
2557       // Send is possible after the small timeout.
2558       //-------------------------------------------------
2559       if(!handle->forceSend(node)){
2560 	return 0;
2561       } else {
2562 	//-------------------------------------------------
2563 	// Since send was successful we will make a renewed
2564 	// attempt at inserting the signal into the buffer.
2565 	//-------------------------------------------------
2566         insertPtr = handle->getWritePtr(node, lenBytes, prio,
2567                                         t->get_max_send_buffer());
2568       }//if
2569     } else {
2570       return 0;
2571     }//if
2572   }
2573   return insertPtr;
2574 }
2575 
2576 void
updateWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2577 TransporterRegistry::updateWritePtr(TransporterSendBufferHandle *handle,
2578                                     NodeId node, Uint32 lenBytes, Uint32 prio)
2579 {
2580   Transporter *t = theTransporters[node];
2581 
2582   Uint32 used = handle->updateWritePtr(node, lenBytes, prio);
2583   t->update_status_overloaded(used);
2584 
2585   if(t->send_limit_reached(used)) {
2586     //-------------------------------------------------
2587     // Buffer is full and we are ready to send. We will
2588     // not wait since the signal is already in the buffer.
2589     // Force flag set has the same indication that we
2590     // should always send. If it is not possible to send
2591     // we will not worry since we will soon be back for
2592     // a renewed trial.
2593     //-------------------------------------------------
2594     if(t->send_is_possible(0)) {
2595       //-------------------------------------------------
2596       // Send was possible, attempt at a send.
2597       //-------------------------------------------------
2598       handle->forceSend(node);
2599     }//if
2600   }
2601 }
2602 
2603 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)2604 TransporterRegistry::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
2605                                              Uint32 max)
2606 {
2607   assert(m_use_default_send_buffer);
2608 
2609   if (max == 0)
2610     return 0;
2611 
2612   Uint32 count = 0;
2613   SendBuffer *b = m_send_buffers + node;
2614   SendBufferPage *page = b->m_first_page;
2615   while (page != NULL && count < max)
2616   {
2617     dst[count].iov_base = page->m_data+page->m_start;
2618     dst[count].iov_len = page->m_bytes;
2619     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2620     page = page->m_next;
2621     count++;
2622   }
2623 
2624   return count;
2625 }
2626 
2627 Uint32
bytes_sent(NodeId node,Uint32 bytes)2628 TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
2629 {
2630   assert(m_use_default_send_buffer);
2631 
2632   SendBuffer *b = m_send_buffers + node;
2633   Uint32 used_bytes = b->m_used_bytes;
2634 
2635   if (bytes == 0)
2636     return used_bytes;
2637 
2638   used_bytes -= bytes;
2639   b->m_used_bytes = used_bytes;
2640 
2641   SendBufferPage *page = b->m_first_page;
2642   while (bytes && bytes >= page->m_bytes)
2643   {
2644     SendBufferPage * tmp = page;
2645     bytes -= page->m_bytes;
2646     page = page->m_next;
2647     release_page(tmp);
2648   }
2649 
2650   if (used_bytes == 0)
2651   {
2652     b->m_first_page = 0;
2653     b->m_last_page = 0;
2654   }
2655   else
2656   {
2657     page->m_start += bytes;
2658     page->m_bytes -= bytes;
2659     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2660     b->m_first_page = page;
2661   }
2662 
2663   return used_bytes;
2664 }
2665 
2666 bool
has_data_to_send(NodeId node)2667 TransporterRegistry::has_data_to_send(NodeId node)
2668 {
2669   assert(m_use_default_send_buffer);
2670 
2671   SendBuffer *b = m_send_buffers + node;
2672   return (b->m_first_page != NULL && b->m_first_page->m_bytes);
2673 }
2674 
2675 void
reset_send_buffer(NodeId node,bool should_be_empty)2676 TransporterRegistry::reset_send_buffer(NodeId node, bool should_be_empty)
2677 {
2678   assert(m_use_default_send_buffer);
2679 
2680   // Make sure that buffer is already empty if the "should_be_empty"
2681   // flag is set. This is done to quickly catch any stray signals
2682   // written to the send buffer while not being connected
2683   if (should_be_empty && !has_data_to_send(node))
2684     return;
2685   assert(!should_be_empty);
2686 
2687   SendBuffer *b = m_send_buffers + node;
2688   SendBufferPage *page = b->m_first_page;
2689   while (page != NULL)
2690   {
2691     SendBufferPage *next = page->m_next;
2692     release_page(page);
2693     page = next;
2694   }
2695   b->m_first_page = NULL;
2696   b->m_last_page = NULL;
2697   b->m_used_bytes = 0;
2698 }
2699 
2700 TransporterRegistry::SendBufferPage *
alloc_page()2701 TransporterRegistry::alloc_page()
2702 {
2703   SendBufferPage *page = m_page_freelist;
2704   if (page != NULL)
2705   {
2706     m_tot_used_buffer_memory += SendBufferPage::PGSIZE;
2707     m_page_freelist = page->m_next;
2708     return page;
2709   }
2710 
2711   ndbout << "ERROR: out of send buffers in kernel." << endl;
2712   return NULL;
2713 }
2714 
2715 void
release_page(SendBufferPage * page)2716 TransporterRegistry::release_page(SendBufferPage *page)
2717 {
2718   assert(page != NULL);
2719   page->m_next = m_page_freelist;
2720   m_tot_used_buffer_memory -= SendBufferPage::PGSIZE;
2721   m_page_freelist = page;
2722 }
2723 
2724 /**
2725  * These are the TransporterSendBufferHandle methods used by the
2726  * single-threaded ndbd.
2727  */
2728 Uint32 *
getWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio,Uint32 max_use)2729 TransporterRegistry::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
2730                                  Uint32 max_use)
2731 {
2732   assert(m_use_default_send_buffer);
2733 
2734   SendBuffer *b = m_send_buffers + node;
2735 
2736   /* First check if we have room in already allocated page. */
2737   SendBufferPage *page = b->m_last_page;
2738   if (page != NULL && page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
2739   {
2740     return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
2741   }
2742 
2743   if (b->m_used_bytes + lenBytes > max_use)
2744     return NULL;
2745 
2746   /* Allocate a new page. */
2747   page = alloc_page();
2748   if (page == NULL)
2749     return NULL;
2750   page->m_next = NULL;
2751   page->m_bytes = 0;
2752   page->m_start = 0;
2753 
2754   if (b->m_last_page == NULL)
2755   {
2756     b->m_first_page = page;
2757     b->m_last_page = page;
2758   }
2759   else
2760   {
2761     assert(b->m_first_page != NULL);
2762     b->m_last_page->m_next = page;
2763     b->m_last_page = page;
2764   }
2765   return (Uint32 *)(page->m_data);
2766 }
2767 
2768 /**
2769  * This is used by the ndbd, so here only one thread is using this, so
2770  * values will always be consistent.
2771  */
2772 void
getSendBufferLevel(NodeId node,SB_LevelType & level)2773 TransporterRegistry::getSendBufferLevel(NodeId node, SB_LevelType &level)
2774 {
2775   SendBuffer *b = m_send_buffers + node;
2776   calculate_send_buffer_level(b->m_used_bytes,
2777                               m_tot_send_buffer_memory,
2778                               m_tot_used_buffer_memory,
2779                               0,
2780                               level);
2781   return;
2782 }
2783 
2784 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)2785 TransporterRegistry::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2786 {
2787   assert(m_use_default_send_buffer);
2788 
2789   SendBuffer *b = m_send_buffers + node;
2790   SendBufferPage *page = b->m_last_page;
2791   assert(page != NULL);
2792   assert(page->m_bytes + lenBytes <= page->max_data_bytes());
2793   page->m_bytes += lenBytes;
2794   b->m_used_bytes += lenBytes;
2795   return b->m_used_bytes;
2796 }
2797 
2798 bool
forceSend(NodeId node)2799 TransporterRegistry::forceSend(NodeId node)
2800 {
2801   Transporter *t = get_transporter(node);
2802   if (t)
2803     return t->doSend();
2804   else
2805     return false;
2806 }
2807 
2808 
2809 void
print_transporters(const char * where,NdbOut & out)2810 TransporterRegistry::print_transporters(const char* where, NdbOut& out)
2811 {
2812   out << where << " >>" << endl;
2813 
2814   for(unsigned i = 0; i < maxTransporters; i++){
2815     if(theTransporters[i] == NULL)
2816       continue;
2817 
2818     const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
2819     struct in_addr conn_addr = get_connect_address(remoteNodeId);
2820     char addr_buf[NDB_ADDR_STRLEN];
2821     char *addr_str = Ndb_inet_ntop(AF_INET,
2822                                    static_cast<void*>(&conn_addr),
2823                                    addr_buf,
2824                                    (socklen_t)sizeof(addr_buf));
2825 
2826     out << i << " "
2827         << getPerformStateString(remoteNodeId) << " to node: "
2828         << remoteNodeId << " at " << addr_str << endl;
2829   }
2830 
2831   out << "<<" << endl;
2832 
2833   for (unsigned i= 0; i < m_transporter_interface.size(); i++){
2834     Transporter_interface tf= m_transporter_interface[i];
2835 
2836     out << i
2837         << " remote node: " << tf.m_remote_nodeId
2838         << " port: " << tf.m_s_service_port
2839         << " interface: " << tf.m_interface << endl;
2840   }
2841 }
2842 
2843 void
inc_overload_count(Uint32 nodeId)2844 TransporterRegistry::inc_overload_count(Uint32 nodeId)
2845 {
2846   assert(nodeId < MAX_NODES);
2847   assert(theTransporters[nodeId] != NULL);
2848   theTransporters[nodeId]->inc_overload_count();
2849 }
2850 
2851 void
inc_slowdown_count(Uint32 nodeId)2852 TransporterRegistry::inc_slowdown_count(Uint32 nodeId)
2853 {
2854   assert(nodeId < MAX_NODES);
2855   assert(theTransporters[nodeId] != NULL);
2856   theTransporters[nodeId]->inc_slowdown_count();
2857 }
2858 
2859 Uint32
get_overload_count(Uint32 nodeId)2860 TransporterRegistry::get_overload_count(Uint32 nodeId)
2861 {
2862   assert(nodeId < MAX_NODES);
2863   assert(theTransporters[nodeId] != NULL);
2864   return theTransporters[nodeId]->get_overload_count();
2865 }
2866 
2867 Uint32
get_slowdown_count(Uint32 nodeId)2868 TransporterRegistry::get_slowdown_count(Uint32 nodeId)
2869 {
2870   assert(nodeId < MAX_NODES);
2871   assert(theTransporters[nodeId] != NULL);
2872   return theTransporters[nodeId]->get_slowdown_count();
2873 }
2874 
2875 Uint32
get_connect_count(Uint32 nodeId)2876 TransporterRegistry::get_connect_count(Uint32 nodeId)
2877 {
2878   assert(nodeId < MAX_NODES);
2879   assert(theTransporters[nodeId] != NULL);
2880   return theTransporters[nodeId]->get_connect_count();
2881 }
2882 
2883 /**
2884  * We calculate the risk level for a send buffer.
2885  * The primary instrument is the current size of
2886  * the node send buffer. However if the total
2887  * buffer for all send buffers is also close to
2888  * empty, then we will adjust the node send
2889  * buffer size for this. In this manner a very
2890  * contested total buffer will also slow down
2891  * the entire node operation.
2892  */
2893 void
calculate_send_buffer_level(Uint64 node_send_buffer_size,Uint64 total_send_buffer_size,Uint64 total_used_send_buffer_size,Uint32 num_threads,SB_LevelType & level)2894 calculate_send_buffer_level(Uint64 node_send_buffer_size,
2895                             Uint64 total_send_buffer_size,
2896                             Uint64 total_used_send_buffer_size,
2897                             Uint32 num_threads,
2898                             SB_LevelType &level)
2899 {
2900   Uint64 percentage =
2901     (total_used_send_buffer_size * 100) / total_send_buffer_size;
2902 
2903   if (percentage < 90)
2904   {
2905     ;
2906   }
2907   else if (percentage < 95)
2908   {
2909     node_send_buffer_size *= 2;
2910   }
2911   else if (percentage < 97)
2912   {
2913     node_send_buffer_size *= 4;
2914   }
2915   else if (percentage < 98)
2916   {
2917     node_send_buffer_size *= 8;
2918   }
2919   else if (percentage < 99)
2920   {
2921     node_send_buffer_size *= 16;
2922   }
2923   else
2924   {
2925     level = SB_CRITICAL_LEVEL;
2926     return;
2927   }
2928 
2929   if (node_send_buffer_size < 128 * 1024)
2930   {
2931     level = SB_NO_RISK_LEVEL;
2932     return;
2933   }
2934   else if (node_send_buffer_size < 256 * 1024)
2935   {
2936     level = SB_LOW_LEVEL;
2937     return;
2938   }
2939   else if (node_send_buffer_size < 384 * 1024)
2940   {
2941     level = SB_MEDIUM_LEVEL;
2942     return;
2943   }
2944   else if (node_send_buffer_size < 1024 * 1024)
2945   {
2946     level = SB_HIGH_LEVEL;
2947     return;
2948   }
2949   else if (node_send_buffer_size < 2 * 1024 * 1024)
2950   {
2951     level = SB_RISK_LEVEL;
2952     return;
2953   }
2954   else
2955   {
2956     level = SB_CRITICAL_LEVEL;
2957     return;
2958   }
2959 }
2960 
2961 template class Vector<TransporterRegistry::Transporter_interface>;
2962