1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include <TransporterRegistry.hpp>
28 #include "TransporterInternalDefinitions.hpp"
29 
30 #include "Transporter.hpp"
31 #include <SocketAuthenticator.hpp>
32 
33 #ifdef NDB_TCP_TRANSPORTER
34 #include "TCP_Transporter.hpp"
35 #include "Loopback_Transporter.hpp"
36 #endif
37 
38 #ifdef NDB_SCI_TRANSPORTER
39 #include "SCI_Transporter.hpp"
40 #endif
41 
42 #ifdef NDB_SHM_TRANSPORTER
43 #include "SHM_Transporter.hpp"
44 extern int g_ndb_shm_signum;
45 #endif
46 
47 #include "NdbOut.hpp"
48 #include <NdbSleep.h>
49 #include <NdbTick.h>
50 #include <InputStream.hpp>
51 #include <OutputStream.hpp>
52 
53 #include <mgmapi/mgmapi.h>
54 #include <mgmapi_internal.h>
55 #include <mgmapi/mgmapi_debug.h>
56 
57 #include <EventLogger.hpp>
58 extern EventLogger * g_eventLogger;
59 
60 struct in_addr
get_connect_address(NodeId node_id) const61 TransporterRegistry::get_connect_address(NodeId node_id) const
62 {
63   return theTransporters[node_id]->m_connect_address;
64 }
65 
newSession(NDB_SOCKET_TYPE sockfd)66 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
67 {
68   DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
69   if (m_auth && !m_auth->server_authenticate(sockfd)){
70     NDB_CLOSE_SOCKET(sockfd);
71     DBUG_RETURN(0);
72   }
73 
74   BaseString msg;
75   if (!m_transporter_registry->connect_server(sockfd, msg))
76   {
77     NDB_CLOSE_SOCKET(sockfd);
78     DBUG_RETURN(0);
79   }
80 
81   DBUG_RETURN(0);
82 }
83 
TransporterRegistry(TransporterCallback * callback,bool use_default_send_buffer,unsigned _maxTransporters,unsigned sizeOfLongSignalMemory)84 TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
85                                          bool use_default_send_buffer,
86 					 unsigned _maxTransporters,
87 					 unsigned sizeOfLongSignalMemory) :
88   m_mgm_handle(0),
89   localNodeId(0),
90   m_transp_count(0),
91   m_use_default_send_buffer(use_default_send_buffer),
92   m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
93   m_total_max_send_buffer(0)
94 {
95   DBUG_ENTER("TransporterRegistry::TransporterRegistry");
96 
97   maxTransporters = _maxTransporters;
98   sendCounter = 1;
99 
100   callbackObj=callback;
101 
102   theTCPTransporters  = new TCP_Transporter * [maxTransporters];
103   theSCITransporters  = new SCI_Transporter * [maxTransporters];
104   theSHMTransporters  = new SHM_Transporter * [maxTransporters];
105   theTransporterTypes = new TransporterType   [maxTransporters];
106   theTransporters     = new Transporter     * [maxTransporters];
107   performStates       = new PerformState      [maxTransporters];
108   ioStates            = new IOState           [maxTransporters];
109   m_disconnect_errnum = new int               [maxTransporters];
110   m_error_states      = new ErrorState        [maxTransporters];
111 
112   m_has_extra_wakeup_socket = false;
113 #if defined(HAVE_EPOLL_CREATE)
114  m_epoll_fd = -1;
115  m_epoll_events       = new struct epoll_event[maxTransporters];
116  m_epoll_fd = epoll_create(maxTransporters);
117  if (m_epoll_fd == -1 || !m_epoll_events)
118  {
119    /* Failure to allocate data or get epoll socket, abort */
120    perror("Failed to alloc epoll-array or calling epoll_create... falling back to select!");
121    if (m_epoll_fd != -1)
122    {
123      close(m_epoll_fd);
124      m_epoll_fd = -1;
125    }
126    if (m_epoll_events)
127    {
128      delete [] m_epoll_events;
129      m_epoll_events = 0;
130    }
131  }
132  else
133  {
134    memset((char*)m_epoll_events, 0,
135           maxTransporters * sizeof(struct epoll_event));
136  }
137 
138 #endif
139 #ifdef ERROR_INSERT
140   m_blocked.clear();
141   m_blocked_with_data.clear();
142   m_blocked_disconnected.clear();
143 #endif
144   // Initialize member variables
145   nTransporters    = 0;
146   nTCPTransporters = 0;
147   nSCITransporters = 0;
148   nSHMTransporters = 0;
149 
150   // Initialize the transporter arrays
151   ErrorState default_error_state = { TE_NO_ERROR, (const char *)~(UintPtr)0 };
152   for (unsigned i=0; i<maxTransporters; i++) {
153     theTCPTransporters[i] = NULL;
154     theSCITransporters[i] = NULL;
155     theSHMTransporters[i] = NULL;
156     theTransporters[i]    = NULL;
157     performStates[i]      = DISCONNECTED;
158     ioStates[i]           = NoHalt;
159     m_disconnect_errnum[i]= 0;
160     m_error_states[i]     = default_error_state;
161   }
162 
163   DBUG_VOID_RETURN;
164 }
165 
166 void
allocate_send_buffers(Uint64 total_send_buffer)167 TransporterRegistry::allocate_send_buffers(Uint64 total_send_buffer)
168 {
169   if (!m_use_default_send_buffer)
170     return;
171 
172   if (total_send_buffer == 0)
173     total_send_buffer = get_total_max_send_buffer();
174 
175   if (m_send_buffers)
176   {
177     /* Send buffers already allocated -> resize the buffer pages */
178     assert(m_send_buffer_memory);
179 
180     // TODO resize send buffer pages
181 
182     return;
183   }
184 
185   /* Initialize transporter send buffers (initially empty). */
186   m_send_buffers = new SendBuffer[maxTransporters];
187   for (unsigned i = 0; i < maxTransporters; i++)
188   {
189     SendBuffer &b = m_send_buffers[i];
190     b.m_first_page = NULL;
191     b.m_last_page = NULL;
192     b.m_used_bytes = 0;
193   }
194 
195   /* Initialize the page freelist. */
196   Uint64 send_buffer_pages =
197     (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
198   /* Add one extra page of internal fragmentation overhead per transporter. */
199   send_buffer_pages += nTransporters;
200 
201   m_send_buffer_memory =
202     new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
203   if (m_send_buffer_memory == NULL)
204   {
205     ndbout << "Unable to allocate "
206            << send_buffer_pages * SendBufferPage::PGSIZE
207            << " bytes of memory for send buffers, aborting." << endl;
208     abort();
209   }
210 
211   m_page_freelist = NULL;
212   for (unsigned i = 0; i < send_buffer_pages; i++)
213   {
214     SendBufferPage *page =
215       (SendBufferPage *)(m_send_buffer_memory + i * SendBufferPage::PGSIZE);
216     page->m_bytes = 0;
217     page->m_next = m_page_freelist;
218     m_page_freelist = page;
219   }
220 }
221 
set_mgm_handle(NdbMgmHandle h)222 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
223 {
224   DBUG_ENTER("TransporterRegistry::set_mgm_handle");
225   if (m_mgm_handle)
226     ndb_mgm_destroy_handle(&m_mgm_handle);
227   m_mgm_handle= h;
228   ndb_mgm_set_timeout(m_mgm_handle, 5000);
229 #ifndef DBUG_OFF
230   if (h)
231   {
232     char buf[256];
233     DBUG_PRINT("info",("handle set with connectstring: %s",
234 		       ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
235   }
236   else
237   {
238     DBUG_PRINT("info",("handle set to NULL"));
239   }
240 #endif
241   DBUG_VOID_RETURN;
242 }
243 
~TransporterRegistry()244 TransporterRegistry::~TransporterRegistry()
245 {
246   DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
247 
248   removeAll();
249 
250   delete[] theTCPTransporters;
251   delete[] theSCITransporters;
252   delete[] theSHMTransporters;
253   delete[] theTransporterTypes;
254   delete[] theTransporters;
255   delete[] performStates;
256   delete[] ioStates;
257   delete[] m_disconnect_errnum;
258   delete[] m_error_states;
259 
260   if (m_send_buffers)
261     delete[] m_send_buffers;
262   m_page_freelist = NULL;
263   if (m_send_buffer_memory)
264     delete[] m_send_buffer_memory;
265 
266 #if defined(HAVE_EPOLL_CREATE)
267   if (m_epoll_events) delete [] m_epoll_events;
268   if (m_epoll_fd != -1) close(m_epoll_fd);
269 #endif
270   if (m_mgm_handle)
271     ndb_mgm_destroy_handle(&m_mgm_handle);
272 
273   if (m_has_extra_wakeup_socket)
274   {
275     my_socket_close(m_extra_wakeup_sockets[0]);
276     my_socket_close(m_extra_wakeup_sockets[1]);
277   }
278 
279   DBUG_VOID_RETURN;
280 }
281 
282 void
removeAll()283 TransporterRegistry::removeAll(){
284   for(unsigned i = 0; i<maxTransporters; i++){
285     if(theTransporters[i] != NULL)
286       removeTransporter(theTransporters[i]->getRemoteNodeId());
287   }
288 }
289 
290 void
disconnectAll()291 TransporterRegistry::disconnectAll(){
292   for(unsigned i = 0; i<maxTransporters; i++){
293     if(theTransporters[i] != NULL)
294       theTransporters[i]->doDisconnect();
295   }
296 }
297 
298 bool
init(NodeId nodeId)299 TransporterRegistry::init(NodeId nodeId) {
300   DBUG_ENTER("TransporterRegistry::init");
301   assert(localNodeId == 0 ||
302          localNodeId == nodeId);
303 
304   localNodeId = nodeId;
305 
306   DEBUG("TransporterRegistry started node: " << localNodeId);
307 
308   if (!m_socket_poller.set_max_count(maxTransporters +
309 				     1 /* wakeup socket */))
310     DBUG_RETURN(false);
311 
312   DBUG_RETURN(true);
313 }
314 
315 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg) const316 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
317                                     BaseString & msg) const
318 {
319   DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
320 
321   // Read "hello" that consists of node id and transporter
322   // type from client
323   SocketInputStream s_input(sockfd);
324   char buf[11+1+11+1]; // <int> <int>
325   if (s_input.gets(buf, sizeof(buf)) == 0) {
326     msg.assfmt("line: %u : Failed to get nodeid from client", __LINE__);
327     DBUG_PRINT("error", ("Failed to read 'hello' from client"));
328     DBUG_RETURN(false);
329   }
330 
331   int nodeId, remote_transporter_type= -1;
332   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
333   switch (r) {
334   case 2:
335     break;
336   case 1:
337     // we're running version prior to 4.1.9
338     // ok, but with no checks on transporter configuration compatability
339     break;
340   default:
341     msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
342     DBUG_PRINT("error", ("Failed to parse 'hello' from client, buf: '%.*s'",
343                          (int)sizeof(buf), buf));
344     DBUG_RETURN(false);
345   }
346 
347   DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d",
348 		      nodeId, remote_transporter_type));
349 
350 
351   // Check that nodeid is in range before accessing the arrays
352   if (nodeId < 0 ||
353       nodeId >= (int)maxTransporters)
354   {
355     msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
356     DBUG_PRINT("error", ("Out of range nodeId: %d from client",
357                          nodeId));
358     DBUG_RETURN(false);
359   }
360 
361   // Check that transporter is allocated
362   Transporter *t= theTransporters[nodeId];
363   if (t == 0)
364   {
365     msg.assfmt("line: %u : Incorrect reply from client: >%s<, node: %u",
366                __LINE__, buf, nodeId);
367     DBUG_PRINT("error", ("No transporter available for node id %d", nodeId));
368     DBUG_RETURN(false);
369   }
370 
371   // Check that the transporter should be connecting
372   if (performStates[nodeId] != TransporterRegistry::CONNECTING)
373   {
374     msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
375                __LINE__, nodeId,
376                getPerformStateString(performStates[nodeId]),
377                performStates[nodeId]);
378 
379     DBUG_PRINT("error", ("Transporter for node id %d in wrong state",
380                          nodeId));
381     DBUG_RETURN(false);
382   }
383 
384   // Check transporter type
385   if (remote_transporter_type != -1 &&
386       remote_transporter_type != t->m_type)
387   {
388     g_eventLogger->error("Connection from node: %d uses different transporter "
389                          "type: %d, expected type: %d",
390                          nodeId, remote_transporter_type, t->m_type);
391     DBUG_RETURN(false);
392   }
393 
394   // Send reply to client
395   SocketOutputStream s_output(sockfd);
396   if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
397   {
398     msg.assfmt("line: %u : Failed to reply to connecting socket (node: %u)",
399                __LINE__, nodeId);
400     DBUG_PRINT("error", ("Send of reply failed"));
401     DBUG_RETURN(false);
402   }
403 
404   // Setup transporter (transporter responsible for closing sockfd)
405   bool res = t->connect_server(sockfd, msg);
406 
407   if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
408   {
409     msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
410                __LINE__, nodeId,
411                getPerformStateString(performStates[nodeId]),
412                performStates[nodeId]);
413     // Connection suceeded, but not connecting anymore, return
414     // false to close the connection
415     DBUG_RETURN(false);
416   }
417 
418   DBUG_RETURN(res);
419 }
420 
421 
422 bool
configureTransporter(TransporterConfiguration * config)423 TransporterRegistry::configureTransporter(TransporterConfiguration *config)
424 {
425   NodeId remoteNodeId = config->remoteNodeId;
426 
427   assert(localNodeId);
428   assert(config->localNodeId == localNodeId);
429 
430   if (remoteNodeId >= maxTransporters)
431     return false;
432 
433   Transporter* t = theTransporters[remoteNodeId];
434   if(t != NULL)
435   {
436     // Transporter already exist, try to reconfigure it
437     return t->configure(config);
438   }
439 
440   DEBUG("Configuring transporter from " << localNodeId
441 	<< " to " << remoteNodeId);
442 
443   switch (config->type){
444   case tt_TCP_TRANSPORTER:
445     return createTCPTransporter(config);
446   case tt_SHM_TRANSPORTER:
447     return createSHMTransporter(config);
448   case tt_SCI_TRANSPORTER:
449     return createSCITransporter(config);
450   default:
451     abort();
452     break;
453   }
454   return false;
455 }
456 
457 
458 bool
createTCPTransporter(TransporterConfiguration * config)459 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
460 #ifdef NDB_TCP_TRANSPORTER
461 
462   TCP_Transporter * t = 0;
463   if (config->remoteNodeId == config->localNodeId)
464   {
465     t = new Loopback_Transporter(* this, config);
466   }
467   else
468   {
469     t = new TCP_Transporter(*this, config);
470   }
471 
472   if (t == NULL)
473     return false;
474   else if (!t->initTransporter()) {
475     delete t;
476     return false;
477   }
478 
479   // Put the transporter in the transporter arrays
480   theTCPTransporters[nTCPTransporters]      = t;
481   theTransporters[t->getRemoteNodeId()]     = t;
482   theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
483   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
484   nTransporters++;
485   nTCPTransporters++;
486   m_total_max_send_buffer += t->get_max_send_buffer();
487 
488   return true;
489 #else
490   return false;
491 #endif
492 }
493 
494 bool
createSCITransporter(TransporterConfiguration * config)495 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
496 #ifdef NDB_SCI_TRANSPORTER
497 
498   if(!SCI_Transporter::initSCI())
499     abort();
500 
501   SCI_Transporter * t = new SCI_Transporter(*this,
502                                             config->localHostName,
503                                             config->remoteHostName,
504                                             config->s_port,
505 					    config->isMgmConnection,
506                                             config->sci.sendLimit,
507 					    config->sci.bufferSize,
508 					    config->sci.nLocalAdapters,
509 					    config->sci.remoteSciNodeId0,
510 					    config->sci.remoteSciNodeId1,
511 					    localNodeId,
512 					    config->remoteNodeId,
513 					    config->serverNodeId,
514 					    config->checksum,
515 					    config->signalId);
516 
517   if (t == NULL)
518     return false;
519   else if (!t->initTransporter()) {
520     delete t;
521     return false;
522   }
523   // Put the transporter in the transporter arrays
524   theSCITransporters[nSCITransporters]      = t;
525   theTransporters[t->getRemoteNodeId()]     = t;
526   theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
527   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
528   nTransporters++;
529   nSCITransporters++;
530   m_total_max_send_buffer += t->get_max_send_buffer();
531 
532   return true;
533 #else
534   return false;
535 #endif
536 }
537 
538 bool
createSHMTransporter(TransporterConfiguration * config)539 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
540   DBUG_ENTER("TransporterRegistry::createTransporter SHM");
541 #ifdef NDB_SHM_TRANSPORTER
542 
543   if (!g_ndb_shm_signum) {
544     g_ndb_shm_signum= config->shm.signum;
545     DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
546     /**
547      * Make sure to block g_ndb_shm_signum
548      *   TransporterRegistry::init is run from "main" thread
549      */
550     NdbThread_set_shm_sigmask(TRUE);
551   }
552 
553   if(config->shm.signum != g_ndb_shm_signum)
554     return false;
555 
556   SHM_Transporter * t = new SHM_Transporter(*this,
557 					    config->localHostName,
558 					    config->remoteHostName,
559 					    config->s_port,
560 					    config->isMgmConnection,
561 					    localNodeId,
562 					    config->remoteNodeId,
563 					    config->serverNodeId,
564 					    config->checksum,
565 					    config->signalId,
566 					    config->shm.shmKey,
567 					    config->shm.shmSize
568 					    );
569   if (t == NULL)
570     return false;
571   else if (!t->initTransporter()) {
572     delete t;
573     return false;
574   }
575   // Put the transporter in the transporter arrays
576   theSHMTransporters[nSHMTransporters]      = t;
577   theTransporters[t->getRemoteNodeId()]     = t;
578   theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
579   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
580 
581   nTransporters++;
582   nSHMTransporters++;
583   m_total_max_send_buffer += t->get_max_send_buffer();
584 
585   DBUG_RETURN(true);
586 #else
587   DBUG_RETURN(false);
588 #endif
589 }
590 
591 
592 void
removeTransporter(NodeId nodeId)593 TransporterRegistry::removeTransporter(NodeId nodeId) {
594 
595   DEBUG("Removing transporter from " << localNodeId
596 	<< " to " << nodeId);
597 
598   if(theTransporters[nodeId] == NULL)
599     return;
600 
601   theTransporters[nodeId]->doDisconnect();
602 
603   const TransporterType type = theTransporterTypes[nodeId];
604 
605   int ind = 0;
606   switch(type){
607   case tt_TCP_TRANSPORTER:
608 #ifdef NDB_TCP_TRANSPORTER
609     for(; ind < nTCPTransporters; ind++)
610       if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
611 	break;
612     ind++;
613     for(; ind<nTCPTransporters; ind++)
614       theTCPTransporters[ind-1] = theTCPTransporters[ind];
615     nTCPTransporters --;
616 #endif
617     break;
618   case tt_SCI_TRANSPORTER:
619 #ifdef NDB_SCI_TRANSPORTER
620     for(; ind < nSCITransporters; ind++)
621       if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
622 	break;
623     ind++;
624     for(; ind<nSCITransporters; ind++)
625       theSCITransporters[ind-1] = theSCITransporters[ind];
626     nSCITransporters --;
627 #endif
628     break;
629   case tt_SHM_TRANSPORTER:
630 #ifdef NDB_SHM_TRANSPORTER
631     for(; ind < nSHMTransporters; ind++)
632       if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
633 	break;
634     ind++;
635     for(; ind<nSHMTransporters; ind++)
636       theSHMTransporters[ind-1] = theSHMTransporters[ind];
637     nSHMTransporters --;
638 #endif
639     break;
640   }
641 
642   nTransporters--;
643 
644   // Delete the transporter and remove it from theTransporters array
645   delete theTransporters[nodeId];
646   theTransporters[nodeId] = NULL;
647 }
648 
649 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])650 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
651                                  const SignalHeader * const signalHeader,
652 				 Uint8 prio,
653 				 const Uint32 * const signalData,
654 				 NodeId nodeId,
655 				 const LinearSectionPtr ptr[3]){
656 
657 
658   Transporter *t = theTransporters[nodeId];
659   if(t != NULL &&
660      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
661       ((signalHeader->theReceiversBlockNumber == 252) ||
662        (signalHeader->theReceiversBlockNumber == 4002)))) {
663 
664     if(t->isConnected()){
665       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
666       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
667 	Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
668 	if(insertPtr != 0){
669 	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
670 	  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
671 	  return SEND_OK;
672 	}
673 
674 	int sleepTime = 2;
675 
676 	/**
677 	 * @note: on linux/i386 the granularity is 10ms
678 	 *        so sleepTime = 2 generates a 10 ms sleep.
679 	 */
680 	for(int i = 0; i<50; i++){
681 	  if((nSHMTransporters+nSCITransporters) == 0)
682 	    NdbSleep_MilliSleep(sleepTime);
683 	  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
684 	  if(insertPtr != 0){
685 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
686 	    updateWritePtr(sendHandle, nodeId, lenBytes, prio);
687 	    break;
688 	  }
689 	}
690 
691 	if(insertPtr != 0){
692 	  /**
693 	   * Send buffer full, but resend works
694 	   */
695 	  report_error(nodeId, TE_SEND_BUFFER_FULL);
696 	  return SEND_OK;
697 	}
698 
699 	WARNING("Signal to " << nodeId << " lost(buffer)");
700 	report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
701 	return SEND_BUFFER_FULL;
702       } else {
703 	return SEND_MESSAGE_TOO_BIG;
704       }
705     } else {
706 #ifdef ERROR_INSERT
707       if (m_blocked.get(nodeId))
708       {
709         /* Looks like it disconnected while blocked.  We'll pretend
710          * not to notice for now
711          */
712         WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
713         return SEND_OK;
714       }
715 #endif
716       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
717       return SEND_DISCONNECTED;
718     }
719   } else {
720     DEBUG("Discarding message to block: "
721 	  << signalHeader->theReceiversBlockNumber
722 	  << " node: " << nodeId);
723 
724     if(t == NULL)
725       return SEND_UNKNOWN_NODE;
726 
727     return SEND_BLOCKED;
728   }
729 }
730 
731 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])732 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
733                                  const SignalHeader * const signalHeader,
734 				 Uint8 prio,
735 				 const Uint32 * const signalData,
736 				 NodeId nodeId,
737 				 class SectionSegmentPool & thePool,
738 				 const SegmentedSectionPtr ptr[3]){
739 
740 
741   Transporter *t = theTransporters[nodeId];
742   if(t != NULL &&
743      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
744       ((signalHeader->theReceiversBlockNumber == 252)||
745        (signalHeader->theReceiversBlockNumber == 4002)))) {
746 
747     if(t->isConnected()){
748       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
749       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
750 	Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
751 	if(insertPtr != 0){
752 	  t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
753 	  updateWritePtr(sendHandle, nodeId, lenBytes, prio);
754 	  return SEND_OK;
755 	}
756 
757 
758 	/**
759 	 * @note: on linux/i386 the granularity is 10ms
760 	 *        so sleepTime = 2 generates a 10 ms sleep.
761 	 */
762 	int sleepTime = 2;
763 	for(int i = 0; i<50; i++){
764 	  if((nSHMTransporters+nSCITransporters) == 0)
765 	    NdbSleep_MilliSleep(sleepTime);
766 	  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
767 	  if(insertPtr != 0){
768 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
769 	    updateWritePtr(sendHandle, nodeId, lenBytes, prio);
770 	    break;
771 	  }
772 	}
773 
774 	if(insertPtr != 0){
775 	  /**
776 	   * Send buffer full, but resend works
777 	   */
778 	  report_error(nodeId, TE_SEND_BUFFER_FULL);
779 	  return SEND_OK;
780 	}
781 
782 	WARNING("Signal to " << nodeId << " lost(buffer)");
783 	report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
784 	return SEND_BUFFER_FULL;
785       } else {
786 	return SEND_MESSAGE_TOO_BIG;
787       }
788     } else {
789 #ifdef ERROR_INSERT
790       if (m_blocked.get(nodeId))
791       {
792         /* Looks like it disconnected while blocked.  We'll pretend
793          * not to notice for now
794          */
795         WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
796         return SEND_OK;
797       }
798 #endif
799       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
800       return SEND_DISCONNECTED;
801     }
802   } else {
803     DEBUG("Discarding message to block: "
804 	  << signalHeader->theReceiversBlockNumber
805 	  << " node: " << nodeId);
806 
807     if(t == NULL)
808       return SEND_UNKNOWN_NODE;
809 
810     return SEND_BLOCKED;
811   }
812 }
813 
814 
815 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const GenericSectionPtr ptr[3])816 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
817                                  const SignalHeader * const signalHeader,
818 				 Uint8 prio,
819 				 const Uint32 * const signalData,
820 				 NodeId nodeId,
821 				 const GenericSectionPtr ptr[3]){
822 
823 
824   Transporter *t = theTransporters[nodeId];
825   if(t != NULL &&
826      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
827       ((signalHeader->theReceiversBlockNumber == 252) ||
828        (signalHeader->theReceiversBlockNumber == 4002)))) {
829 
830     if(t->isConnected()){
831       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
832       if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
833         Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
834         if(insertPtr != 0){
835           t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
836           updateWritePtr(sendHandle, nodeId, lenBytes, prio);
837           return SEND_OK;
838 	}
839 
840 
841 	/**
842 	 * @note: on linux/i386 the granularity is 10ms
843 	 *        so sleepTime = 2 generates a 10 ms sleep.
844 	 */
845         int sleepTime = 2;
846 	for(int i = 0; i<50; i++){
847 	  if((nSHMTransporters+nSCITransporters) == 0)
848 	    NdbSleep_MilliSleep(sleepTime);
849 	  insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
850 	  if(insertPtr != 0){
851 	    t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
852 	    updateWritePtr(sendHandle, nodeId, lenBytes, prio);
853 	    break;
854 	  }
855 	}
856 
857 	if(insertPtr != 0){
858 	  /**
859 	   * Send buffer full, but resend works
860 	   */
861 	  report_error(nodeId, TE_SEND_BUFFER_FULL);
862 	  return SEND_OK;
863 	}
864 
865 	WARNING("Signal to " << nodeId << " lost(buffer)");
866 	report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
867 	return SEND_BUFFER_FULL;
868       } else {
869 	return SEND_MESSAGE_TOO_BIG;
870       }
871     } else {
872       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
873       return SEND_DISCONNECTED;
874     }
875   } else {
876     DEBUG("Discarding message to block: "
877 	  << signalHeader->theReceiversBlockNumber
878 	  << " node: " << nodeId);
879 
880     if(t == NULL)
881       return SEND_UNKNOWN_NODE;
882 
883     return SEND_BLOCKED;
884   }
885 }
886 
887 void
external_IO(Uint32 timeOutMillis)888 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
889   //-----------------------------------------------------------
890   // Most of the time we will send the buffers here and then wait
891   // for new signals. Thus we start by sending without timeout
892   // followed by the receive part where we expect to sleep for
893   // a while.
894   //-----------------------------------------------------------
895   if(pollReceive(timeOutMillis)){
896     performReceive();
897   }
898   performSend();
899 }
900 
901 bool
setup_wakeup_socket()902 TransporterRegistry::setup_wakeup_socket()
903 {
904   if (m_has_extra_wakeup_socket)
905   {
906     return true;
907   }
908 
909   if (my_socketpair(m_extra_wakeup_sockets))
910   {
911     perror("socketpair failed!");
912     return false;
913   }
914 
915   if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
916       !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
917   {
918     goto err;
919   }
920 
921 #if defined(HAVE_EPOLL_CREATE)
922   if (m_epoll_fd != -1)
923   {
924     int sock = m_extra_wakeup_sockets[0].fd;
925     struct epoll_event event_poll;
926     bzero(&event_poll, sizeof(event_poll));
927     event_poll.data.u32 = 0;
928     event_poll.events = EPOLLIN;
929     int ret_val = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, sock, &event_poll);
930     if (ret_val != 0)
931     {
932       int error= errno;
933       fprintf(stderr, "Failed to add extra sock %u to epoll-set: %u\n",
934               sock, error);
935       fflush(stderr);
936       goto err;
937     }
938   }
939 #endif
940   m_has_extra_wakeup_socket = true;
941   return true;
942 
943 err:
944   my_socket_close(m_extra_wakeup_sockets[0]);
945   my_socket_close(m_extra_wakeup_sockets[1]);
946   my_socket_invalidate(m_extra_wakeup_sockets+0);
947   my_socket_invalidate(m_extra_wakeup_sockets+1);
948   return false;
949 }
950 
951 void
wakeup()952 TransporterRegistry::wakeup()
953 {
954   if (m_has_extra_wakeup_socket)
955   {
956     static char c = 37;
957     my_send(m_extra_wakeup_sockets[1], &c, 1, 0);
958   }
959 }
960 
961 Uint32
pollReceive(Uint32 timeOutMillis,NodeBitmask & mask)962 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
963                                  NodeBitmask& mask)
964 {
965   Uint32 retVal = 0;
966 
967   /**
968    * If any transporters have left-over data that was not fully executed in
969    * last loop, don't wait and return 'data available' even if nothing new
970    */
971   if (!mask.isclear())
972   {
973     timeOutMillis = 0;
974     retVal = 1;
975   }
976 
977   if (nSCITransporters > 0)
978   {
979     timeOutMillis=0;
980   }
981 
982 #ifdef NDB_SHM_TRANSPORTER
983   if (nSHMTransporters > 0)
984   {
985     Uint32 res = poll_SHM(0, mask);
986     if(res)
987     {
988       retVal |= res;
989       timeOutMillis = 0;
990     }
991   }
992 #endif
993 
994 #ifdef NDB_TCP_TRANSPORTER
995 #if defined(HAVE_EPOLL_CREATE)
996   if (likely(m_epoll_fd != -1))
997   {
998     Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
999 
1000     if (num_trps)
1001     {
1002       tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
1003                                       num_trps, timeOutMillis);
1004       retVal |= tcpReadSelectReply;
1005     }
1006 
1007     int num_socket_events = tcpReadSelectReply;
1008     if (num_socket_events > 0)
1009     {
1010       for (int i = 0; i < num_socket_events; i++)
1011       {
1012         const Uint32 trpid = m_epoll_events[i].data.u32;
1013 #ifdef ERROR_INSERT
1014         if (m_blocked.get(trpid))
1015         {
1016           /* Don't pull from socket now, wait till unblocked */
1017           m_blocked_with_data.set(trpid);
1018           continue;
1019         }
1020 #endif
1021         mask.set(trpid);
1022       }
1023     }
1024     else if (num_socket_events < 0)
1025     {
1026       assert(errno == EINTR);
1027     }
1028   }
1029   else
1030 #endif
1031   {
1032     if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
1033     {
1034       retVal |= poll_TCP(timeOutMillis, mask);
1035     }
1036     else
1037       tcpReadSelectReply = 0;
1038   }
1039 #endif
1040 #ifdef NDB_SCI_TRANSPORTER
1041   if (nSCITransporters > 0)
1042     retVal |= poll_SCI(timeOutMillis, mask);
1043 #endif
1044 #ifdef NDB_SHM_TRANSPORTER
1045   if (nSHMTransporters > 0)
1046   {
1047     int res = poll_SHM(0, mask);
1048     retVal |= res;
1049   }
1050 #endif
1051   return retVal;
1052 }
1053 
1054 
1055 #ifdef NDB_SCI_TRANSPORTER
1056 Uint32
poll_SCI(Uint32 timeOutMillis,NodeBitmask & mask)1057 TransporterRegistry::poll_SCI(Uint32 timeOutMillis, NodeBitmask& mask)
1058 {
1059   Uint32 retVal = 0;
1060   for (int i = 0; i < nSCITransporters; i++)
1061   {
1062     SCI_Transporter * t = theSCITransporters[i];
1063     Uint32 node_id = t->getRemoteNodeId();
1064     if (t->isConnected() && is_connected(node_id))
1065     {
1066       if (t->hasDataToRead())
1067       {
1068         mask.set(node_id);
1069 	retVal = 1;
1070       }
1071     }
1072   }
1073   return retVal;
1074 }
1075 #endif
1076 
1077 
1078 #ifdef NDB_SHM_TRANSPORTER
1079 static int g_shm_counter = 0;
1080 Uint32
poll_SHM(Uint32 timeOutMillis,NodeBitmask & mask)1081 TransporterRegistry::poll_SHM(Uint32 timeOutMillis, NodeBitmask& mask)
1082 {
1083   Uint32 retVal = 0;
1084   for (int j = 0; j < 100; j++)
1085   {
1086     for (int i = 0; i<nSHMTransporters; i++)
1087     {
1088       SHM_Transporter * t = theSHMTransporters[i];
1089       Uint32 node_id = t->getRemoteNodeId();
1090       if (t->isConnected() && is_connected(node_id))
1091       {
1092 	if (t->hasDataToRead())
1093         {
1094           j = 100;
1095           mask.set(node_id);
1096           retVal = 1;
1097 	}
1098       }
1099     }
1100   }
1101   return retVal;
1102 }
1103 #endif
1104 
1105 #ifdef NDB_TCP_TRANSPORTER
1106 /**
1107  * We do not want to hold any transporter locks during select(), so there
1108  * is no protection against a disconnect closing the socket during this call.
1109  *
1110  * That does not matter, at most we will get a spurious wakeup on the wrong
1111  * socket, which will be handled correctly in performReceive() (which _is_
1112  * protected by transporter locks on upper layer).
1113  */
1114 Uint32
poll_TCP(Uint32 timeOutMillis,NodeBitmask & mask)1115 TransporterRegistry::poll_TCP(Uint32 timeOutMillis, NodeBitmask& mask)
1116 {
1117   m_socket_poller.clear();
1118 
1119   if (m_has_extra_wakeup_socket)
1120   {
1121     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1122 
1123     // Poll the wakup-socket for read
1124     m_socket_poller.add(socket, true, false, false);
1125   }
1126 
1127   Uint16 idx[MAX_NODES];
1128   for (int i = 0; i < nTCPTransporters; i++)
1129   {
1130     TCP_Transporter * t = theTCPTransporters[i];
1131     const NDB_SOCKET_TYPE socket = t->getSocket();
1132     Uint32 node_id = t->getRemoteNodeId();
1133 
1134     if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
1135     {
1136       idx[i] = m_socket_poller.add(socket, true, false, false);
1137     }
1138     else
1139     {
1140       idx[i] = MAX_NODES + 1;
1141     }
1142   }
1143 
1144   tcpReadSelectReply = m_socket_poller.poll_unsafe(timeOutMillis);
1145 
1146   if (tcpReadSelectReply > 0)
1147   {
1148     if (m_extra_wakeup_sockets)
1149     {
1150       if (m_socket_poller.has_read(0))
1151         mask.set((Uint32)0);
1152     }
1153 
1154     for (int i = 0; i < nTCPTransporters; i++)
1155     {
1156       TCP_Transporter * t = theTCPTransporters[i];
1157       if (idx[i] != MAX_NODES + 1)
1158       {
1159         Uint32 node_id = t->getRemoteNodeId();
1160 #ifdef ERROR_INSERT
1161         if (m_blocked.get(i))
1162         {
1163           /* Don't pull from socket now, wait till unblocked */
1164           m_blocked_with_data.set(i);
1165           continue;
1166         }
1167 #endif
1168         if (m_socket_poller.has_read(idx[i]))
1169           mask.set(node_id);
1170       }
1171     }
1172   }
1173 
1174   return tcpReadSelectReply;
1175 }
1176 #endif
1177 
1178 #if defined(HAVE_EPOLL_CREATE)
1179 bool
change_epoll(TCP_Transporter * t,bool add)1180 TransporterRegistry::change_epoll(TCP_Transporter *t, bool add)
1181 {
1182   struct epoll_event event_poll;
1183   bzero(&event_poll, sizeof(event_poll));
1184   NDB_SOCKET_TYPE sock_fd = t->getSocket();
1185   int node_id = t->getRemoteNodeId();
1186   int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
1187   int ret_val, error;
1188 
1189   if (!my_socket_valid(sock_fd))
1190     return FALSE;
1191 
1192   event_poll.data.u32 = t->getRemoteNodeId();
1193   event_poll.events = EPOLLIN;
1194   ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
1195   if (!ret_val)
1196     goto ok;
1197   error= errno;
1198   if (error == ENOENT && !add)
1199   {
1200     /*
1201      * Could be that socket was closed premature to this call.
1202      * Not a problem that this occurs.
1203      */
1204     goto ok;
1205   }
1206   if (!add || (add && (error != ENOMEM)))
1207   {
1208     /*
1209      * Serious problems, we are either using wrong parameters,
1210      * have permission problems or the socket doesn't support
1211      * epoll!!
1212      */
1213     ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
1214              " node %u to epoll-set,"
1215              " errno: %u %s",
1216              add ? "ADD" : "DEL",
1217              m_epoll_fd,
1218              MY_SOCKET_FORMAT_VALUE(sock_fd),
1219              node_id,
1220              error,
1221              strerror(error));
1222     abort();
1223   }
1224   ndbout << "We lacked memory to add the socket for node id ";
1225   ndbout << node_id << endl;
1226   return TRUE;
1227 
1228 ok:
1229   return FALSE;
1230 }
1231 
1232 #endif
1233 
1234 /**
1235  * In multi-threaded cases, this must be protected by a global receive lock.
1236  */
1237 void
performReceive()1238 TransporterRegistry::performReceive()
1239 {
1240   bool hasReceived = false;
1241 
1242   if (m_has_data_transporters.get(0))
1243   {
1244     m_has_data_transporters.clear(Uint32(0));
1245     consume_extra_sockets();
1246   }
1247 
1248 #ifdef ERROR_INSERT
1249   if (!m_blocked.isclear())
1250   {
1251     if (m_has_data_transporters.isclear())
1252     {
1253         /* poll sees data, but we want to ignore for now
1254          * sleep a little to avoid busy loop
1255          */
1256       NdbSleep_MilliSleep(1);
1257     }
1258   }
1259 #endif
1260 
1261 #ifdef NDB_TCP_TRANSPORTER
1262   Uint32 id = 0;
1263   while ((id = m_has_data_transporters.find(id + 1)) != BitmaskImpl::NotFound)
1264   {
1265     bool hasdata = false;
1266     TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
1267     if (is_connected(id))
1268     {
1269       if (t->isConnected())
1270       {
1271         t->doReceive();
1272         if (hasReceived)
1273           callbackObj->checkJobBuffer();
1274         hasReceived = true;
1275         Uint32 * ptr;
1276         Uint32 sz = t->getReceiveData(&ptr);
1277         callbackObj->transporter_recv_from(id);
1278         Uint32 szUsed = unpack(ptr, sz, id, ioStates[id]);
1279         t->updateReceiveDataPtr(szUsed);
1280         hasdata = t->hasReceiveData();
1281       }
1282     }
1283     // If transporter still have data, make sure that it's remember to next time
1284     m_has_data_transporters.set(id, hasdata);
1285   }
1286 #endif
1287 
1288 #ifdef NDB_SCI_TRANSPORTER
1289   //performReceive
1290   //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))
1291   for (int i=0; i<nSCITransporters; i++)
1292   {
1293     SCI_Transporter  *t = theSCITransporters[i];
1294     const NodeId nodeId = t->getRemoteNodeId();
1295     if(is_connected(nodeId))
1296     {
1297       if(t->isConnected() && t->checkConnected())
1298       {
1299         if (hasReceived)
1300           callbackObj->checkJobBuffer();
1301         hasReceived = true;
1302         Uint32 * readPtr, * eodPtr;
1303         t->getReceivePtr(&readPtr, &eodPtr);
1304         callbackObj->transporter_recv_from(nodeId);
1305         Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1306         t->updateReceivePtr(newPtr);
1307       }
1308     }
1309   }
1310 #endif
1311 #ifdef NDB_SHM_TRANSPORTER
1312   for (int i=0; i<nSHMTransporters; i++)
1313   {
1314     SHM_Transporter *t = theSHMTransporters[i];
1315     const NodeId nodeId = t->getRemoteNodeId();
1316     if(is_connected(nodeId)){
1317       if(t->isConnected() && t->checkConnected())
1318       {
1319         if (hasReceived)
1320           callbackObj->checkJobBuffer();
1321         hasReceived = true;
1322         Uint32 * readPtr, * eodPtr;
1323         t->getReceivePtr(&readPtr, &eodPtr);
1324         callbackObj->transporter_recv_from(nodeId);
1325         Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1326         t->updateReceivePtr(newPtr);
1327       }
1328     }
1329   }
1330 #endif
1331 }
1332 
1333 /**
1334  * In multi-threaded cases, this must be protected by send lock (can use
1335  * different locks for each node).
1336  */
1337 int
performSend(NodeId nodeId)1338 TransporterRegistry::performSend(NodeId nodeId)
1339 {
1340   Transporter *t = get_transporter(nodeId);
1341   if (t && t->isConnected() && is_connected(nodeId))
1342   {
1343     return t->doSend();
1344   }
1345 
1346   return 0;
1347 }
1348 
1349 void
consume_extra_sockets()1350 TransporterRegistry::consume_extra_sockets()
1351 {
1352   char buf[4096];
1353   ssize_t ret;
1354   int err;
1355   NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
1356   do
1357   {
1358     ret = my_recv(sock, buf, sizeof(buf), 0);
1359     err = my_socket_errno();
1360   } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
1361 }
1362 
1363 void
performSend()1364 TransporterRegistry::performSend()
1365 {
1366   int i;
1367   sendCounter = 1;
1368 
1369 #ifdef NDB_TCP_TRANSPORTER
1370   for (i = m_transp_count; i < nTCPTransporters; i++)
1371   {
1372     TCP_Transporter *t = theTCPTransporters[i];
1373     if (t && t->has_data_to_send() &&
1374         t->isConnected() && is_connected(t->getRemoteNodeId()))
1375     {
1376       t->doSend();
1377     }
1378   }
1379   for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
1380   {
1381     TCP_Transporter *t = theTCPTransporters[i];
1382     if (t && t->has_data_to_send() &&
1383         t->isConnected() && is_connected(t->getRemoteNodeId()))
1384     {
1385       t->doSend();
1386     }
1387   }
1388   m_transp_count++;
1389   if (m_transp_count == nTCPTransporters) m_transp_count = 0;
1390 #endif
1391 #ifdef NDB_SCI_TRANSPORTER
1392   //scroll through the SCI transporters,
1393   // get each transporter, check if connected, send data
1394   for (i=0; i<nSCITransporters; i++) {
1395     SCI_Transporter  *t = theSCITransporters[i];
1396     const NodeId nodeId = t->getRemoteNodeId();
1397 
1398     if(is_connected(nodeId))
1399     {
1400       if(t->isConnected() && t->has_data_to_send())
1401       {
1402 	t->doSend();
1403       } //if
1404     } //if
1405   }
1406 #endif
1407 
1408 #ifdef NDB_SHM_TRANSPORTER
1409   for (i=0; i<nSHMTransporters; i++)
1410   {
1411     SHM_Transporter  *t = theSHMTransporters[i];
1412     const NodeId nodeId = t->getRemoteNodeId();
1413     if(is_connected(nodeId))
1414     {
1415       if(t->isConnected())
1416       {
1417 	t->doSend();
1418       }
1419     }
1420   }
1421 #endif
1422 }
1423 
1424 int
forceSendCheck(int sendLimit)1425 TransporterRegistry::forceSendCheck(int sendLimit){
1426   int tSendCounter = sendCounter;
1427   sendCounter = tSendCounter + 1;
1428   if (tSendCounter >= sendLimit) {
1429     performSend();
1430     sendCounter = 1;
1431     return 1;
1432   }//if
1433   return 0;
1434 }//TransporterRegistry::forceSendCheck()
1435 
1436 #ifdef DEBUG_TRANSPORTER
1437 void
printState()1438 TransporterRegistry::printState(){
1439   ndbout << "-- TransporterRegistry -- " << endl << endl
1440 	 << "Transporters = " << nTransporters << endl;
1441   for(int i = 0; i<maxTransporters; i++)
1442     if(theTransporters[i] != NULL){
1443       const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
1444       ndbout << "Transporter: " << remoteNodeId
1445 	     << " PerformState: " << performStates[remoteNodeId]
1446 	     << " IOState: " << ioStates[remoteNodeId] << endl;
1447     }
1448 }
1449 #endif
1450 
1451 #ifdef ERROR_INSERT
1452 bool
isBlocked(NodeId nodeId)1453 TransporterRegistry::isBlocked(NodeId nodeId)
1454 {
1455   return m_blocked.get(nodeId);
1456 }
1457 
1458 void
blockReceive(NodeId nodeId)1459 TransporterRegistry::blockReceive(NodeId nodeId)
1460 {
1461   /* Check that node is not already blocked?
1462    * Stop pulling from its socket (but track received data etc)
1463    */
1464   /* Shouldn't already be blocked with data */
1465   assert(!m_blocked.get(nodeId));
1466 
1467   m_blocked.set(nodeId);
1468 
1469   if (m_has_data_transporters.get(nodeId))
1470   {
1471     assert(!m_blocked_with_data.get(nodeId));
1472     m_blocked_with_data.set(nodeId);
1473     m_has_data_transporters.clear(nodeId);
1474   }
1475 }
1476 
1477 void
unblockReceive(NodeId nodeId)1478 TransporterRegistry::unblockReceive(NodeId nodeId)
1479 {
1480   /* Check that node is blocked?
1481    * Resume pulling from its socket
1482    * Ensure in-flight data is processed if there was some
1483    */
1484   assert(m_blocked.get(nodeId));
1485   assert(!m_has_data_transporters.get(nodeId));
1486 
1487   m_blocked.clear(nodeId);
1488 
1489   if (m_blocked_with_data.get(nodeId))
1490   {
1491     m_has_data_transporters.set(nodeId);
1492   }
1493 
1494   if (m_blocked_disconnected.get(nodeId))
1495   {
1496     /* Process disconnect notification/handling now */
1497     m_blocked_disconnected.clear(nodeId);
1498 
1499     report_disconnect(nodeId, m_disconnect_errors[nodeId]);
1500   }
1501 }
1502 #endif
1503 
1504 IOState
ioState(NodeId nodeId)1505 TransporterRegistry::ioState(NodeId nodeId) {
1506   return ioStates[nodeId];
1507 }
1508 
1509 void
setIOState(NodeId nodeId,IOState state)1510 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
1511   if (ioStates[nodeId] == state)
1512     return;
1513 
1514   DEBUG("TransporterRegistry::setIOState("
1515         << nodeId << ", " << state << ")");
1516 
1517   ioStates[nodeId] = state;
1518 }
1519 
1520 extern "C" void *
run_start_clients_C(void * me)1521 run_start_clients_C(void * me)
1522 {
1523   ((TransporterRegistry*) me)->start_clients_thread();
1524   return 0;
1525 }
1526 
1527 /**
1528  * This method is used to initiate connection, called from the CMVMI blockx.
1529  *
1530  * This works asynchronously, no actions are taken directly in the calling
1531  * thread.
1532  */
1533 void
do_connect(NodeId node_id)1534 TransporterRegistry::do_connect(NodeId node_id)
1535 {
1536   PerformState &curr_state = performStates[node_id];
1537   switch(curr_state){
1538   case DISCONNECTED:
1539     break;
1540   case CONNECTED:
1541     return;
1542   case CONNECTING:
1543     return;
1544   case DISCONNECTING:
1545     break;
1546   }
1547   DBUG_ENTER("TransporterRegistry::do_connect");
1548   DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
1549 
1550   /*
1551     No one else should be using the transporter now, reset
1552     its send buffer
1553    */
1554   callbackObj->reset_send_buffer(node_id);
1555 
1556   curr_state= CONNECTING;
1557   DBUG_VOID_RETURN;
1558 }
1559 
1560 /**
1561  * This method is used to initiate disconnect from CMVMI. It is also called
1562  * from the TCP transporter in case of an I/O error on the socket.
1563  *
1564  * This works asynchronously, similar to do_connect().
1565  */
1566 void
do_disconnect(NodeId node_id,int errnum)1567 TransporterRegistry::do_disconnect(NodeId node_id, int errnum)
1568 {
1569   PerformState &curr_state = performStates[node_id];
1570   switch(curr_state){
1571   case DISCONNECTED:
1572     return;
1573   case CONNECTED:
1574     break;
1575   case CONNECTING:
1576     break;
1577   case DISCONNECTING:
1578     return;
1579   }
1580   DBUG_ENTER("TransporterRegistry::do_disconnect");
1581   DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1582   curr_state= DISCONNECTING;
1583   m_disconnect_errnum[node_id] = errnum;
1584   DBUG_VOID_RETURN;
1585 }
1586 
1587 void
report_connect(NodeId node_id)1588 TransporterRegistry::report_connect(NodeId node_id)
1589 {
1590   DBUG_ENTER("TransporterRegistry::report_connect");
1591   DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1592 
1593   /*
1594     The send buffers was reset when this connection
1595     was set to CONNECTING. In order to make sure no stray
1596     signals has been written to the send buffer since then
1597     call 'reset_send_buffer' with the "should_be_empty" flag
1598     set
1599   */
1600   callbackObj->reset_send_buffer(node_id, true);
1601 
1602   performStates[node_id] = CONNECTED;
1603 #if defined(HAVE_EPOLL_CREATE)
1604   if (likely(m_epoll_fd != -1))
1605   {
1606     if (change_epoll((TCP_Transporter*)theTransporters[node_id],
1607                      TRUE))
1608     {
1609       performStates[node_id] = DISCONNECTING;
1610       DBUG_VOID_RETURN;
1611     }
1612   }
1613 #endif
1614   callbackObj->reportConnect(node_id);
1615   DBUG_VOID_RETURN;
1616 }
1617 
1618 void
report_disconnect(NodeId node_id,int errnum)1619 TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
1620 {
1621   DBUG_ENTER("TransporterRegistry::report_disconnect");
1622   DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1623 
1624 #ifdef ERROR_INSERT
1625   if (m_blocked.get(node_id))
1626   {
1627     /* We are simulating real latency, so control events experience
1628      * it too
1629      */
1630     m_blocked_disconnected.set(node_id);
1631     m_disconnect_errors[node_id] = errnum;
1632     DBUG_VOID_RETURN;
1633   }
1634 #endif
1635 
1636   performStates[node_id] = DISCONNECTED;
1637   m_has_data_transporters.clear(node_id);
1638   callbackObj->reportDisconnect(node_id, errnum);
1639   DBUG_VOID_RETURN;
1640 }
1641 
1642 /**
1643  * We only call TransporterCallback::reportError() from
1644  * TransporterRegistry::update_connections().
1645  *
1646  * In other places we call this method to enqueue the error that will later be
1647  * picked up by update_connections().
1648  */
1649 void
report_error(NodeId nodeId,TransporterError errorCode,const char * errorInfo)1650 TransporterRegistry::report_error(NodeId nodeId, TransporterError errorCode,
1651                                   const char *errorInfo)
1652 {
1653   if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
1654       m_error_states[nodeId].m_info == (const char *)~(UintPtr)0)
1655   {
1656     m_error_states[nodeId].m_code = errorCode;
1657     m_error_states[nodeId].m_info = errorInfo;
1658   }
1659 }
1660 
1661 /**
1662  * update_connections(), together with the thread running in
1663  * start_clients_thread(), handle the state changes for transporters as they
1664  * connect and disconnect.
1665  */
1666 void
update_connections()1667 TransporterRegistry::update_connections()
1668 {
1669   for (int i= 0, n= 0; n < nTransporters; i++){
1670     Transporter * t = theTransporters[i];
1671     if (!t)
1672       continue;
1673     n++;
1674 
1675     const NodeId nodeId = t->getRemoteNodeId();
1676 
1677     TransporterError code = m_error_states[nodeId].m_code;
1678     const char *info = m_error_states[nodeId].m_info;
1679     if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
1680     {
1681       callbackObj->reportError(nodeId, code, info);
1682       m_error_states[nodeId].m_code = TE_NO_ERROR;
1683       m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
1684     }
1685 
1686     switch(performStates[nodeId]){
1687     case CONNECTED:
1688     case DISCONNECTED:
1689       break;
1690     case CONNECTING:
1691       if(t->isConnected())
1692 	report_connect(nodeId);
1693       break;
1694     case DISCONNECTING:
1695       if(!t->isConnected())
1696 	report_disconnect(nodeId, m_disconnect_errnum[nodeId]);
1697       break;
1698     }
1699   }
1700 }
1701 
1702 // run as own thread
1703 void
start_clients_thread()1704 TransporterRegistry::start_clients_thread()
1705 {
1706   int persist_mgm_count= 0;
1707   DBUG_ENTER("TransporterRegistry::start_clients_thread");
1708   while (m_run_start_clients_thread) {
1709     NdbSleep_MilliSleep(100);
1710     persist_mgm_count++;
1711     if(persist_mgm_count==50)
1712     {
1713       ndb_mgm_check_connection(m_mgm_handle);
1714       persist_mgm_count= 0;
1715     }
1716     for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
1717       Transporter * t = theTransporters[i];
1718       if (!t)
1719 	continue;
1720       n++;
1721 
1722       const NodeId nodeId = t->getRemoteNodeId();
1723       switch(performStates[nodeId]){
1724       case CONNECTING:
1725 	if(!t->isConnected() && !t->isServer) {
1726 	  bool connected= false;
1727 	  /**
1728 	   * First, we try to connect (if we have a port number).
1729 	   */
1730 
1731 	  if (t->get_s_port())
1732           {
1733             DBUG_PRINT("info", ("connecting to node %d using port %d",
1734                                 nodeId, t->get_s_port()));
1735             connected= t->connect_client();
1736           }
1737 
1738 	  /**
1739 	   * If dynamic, get the port for connecting from the management server
1740 	   */
1741 	  if( !connected && t->get_s_port() <= 0) {	// Port is dynamic
1742 	    int server_port= 0;
1743 	    struct ndb_mgm_reply mgm_reply;
1744 
1745             DBUG_PRINT("info", ("connection to node %d should use "
1746                                 "dynamic port",
1747                                 nodeId));
1748 
1749 	    if(!ndb_mgm_is_connected(m_mgm_handle))
1750 	      ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
1751 
1752 	    if(ndb_mgm_is_connected(m_mgm_handle))
1753 	    {
1754               DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
1755                                   nodeId));
1756 
1757 	      int res=
1758 		ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1759 						     t->getRemoteNodeId(),
1760 						     t->getLocalNodeId(),
1761 						     CFG_CONNECTION_SERVER_PORT,
1762 						     &server_port,
1763 						     &mgm_reply);
1764 	      DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1765 				 server_port,t->getRemoteNodeId(),
1766 				 t->getLocalNodeId(),res));
1767 	      if( res >= 0 )
1768 	      {
1769                 DBUG_PRINT("info", ("got port %d to use for connection to %d",
1770                                     server_port, nodeId));
1771 		/**
1772 		 * Server_port == 0 just means that that a mgmt server
1773 		 * has not received a new port yet. Keep the old.
1774 		 */
1775 		if (server_port)
1776 		  t->set_s_port(server_port);
1777 	      }
1778 	      else if(ndb_mgm_is_connected(m_mgm_handle))
1779 	      {
1780                 DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
1781                                     res));
1782                 g_eventLogger->info("Failed to get dynamic port, res: %d",
1783                                     res);
1784 		ndb_mgm_disconnect(m_mgm_handle);
1785 	      }
1786 	      else
1787 	      {
1788                 DBUG_PRINT("info", ("mgmd close connection early"));
1789                 g_eventLogger->info
1790                   ("Management server closed connection early. "
1791                    "It is probably being shut down (or has problems). "
1792                    "We will retry the connection. %d %s %s line: %d",
1793                    ndb_mgm_get_latest_error(m_mgm_handle),
1794                    ndb_mgm_get_latest_error_desc(m_mgm_handle),
1795                    ndb_mgm_get_latest_error_msg(m_mgm_handle),
1796                    ndb_mgm_get_latest_error_line(m_mgm_handle)
1797                    );
1798 	      }
1799 	    }
1800 	    /** else
1801 	     * We will not be able to get a new port unless
1802 	     * the m_mgm_handle is connected. Note that not
1803 	     * being connected is an ok state, just continue
1804 	     * until it is able to connect. Continue using the
1805 	     * old port until we can connect again and get a
1806 	     * new port.
1807 	     */
1808 	  }
1809 	}
1810 	break;
1811       case DISCONNECTING:
1812 	if(t->isConnected())
1813 	  t->doDisconnect();
1814 	break;
1815       case DISCONNECTED:
1816       {
1817         if (t->isConnected())
1818         {
1819           g_eventLogger->warning("Found connection to %u in state DISCONNECTED "
1820                                  " while being connected, disconnecting!",
1821                                  t->getRemoteNodeId());
1822           t->doDisconnect();
1823         }
1824         break;
1825       }
1826       default:
1827 	break;
1828       }
1829     }
1830   }
1831   DBUG_VOID_RETURN;
1832 }
1833 
1834 struct NdbThread*
start_clients()1835 TransporterRegistry::start_clients()
1836 {
1837   m_run_start_clients_thread= true;
1838   m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1839 					   (void**)this,
1840                                            0, // default stack size
1841 					   "ndb_start_clients",
1842 					   NDB_THREAD_PRIO_LOW);
1843   if (m_start_clients_thread == 0)
1844   {
1845     m_run_start_clients_thread= false;
1846   }
1847   return m_start_clients_thread;
1848 }
1849 
1850 bool
stop_clients()1851 TransporterRegistry::stop_clients()
1852 {
1853   if (m_start_clients_thread) {
1854     m_run_start_clients_thread= false;
1855     void* status;
1856     NdbThread_WaitFor(m_start_clients_thread, &status);
1857     NdbThread_Destroy(&m_start_clients_thread);
1858   }
1859   return true;
1860 }
1861 
1862 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)1863 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1864 					       const char *interf,
1865 					       int s_port)
1866 {
1867   DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1868   DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
1869   if (interf && strlen(interf) == 0)
1870     interf= 0;
1871 
1872   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1873   {
1874     Transporter_interface &tmp= m_transporter_interface[i];
1875     if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1876       continue;
1877     if (interf != 0 && tmp.m_interface != 0 &&
1878 	strcmp(interf, tmp.m_interface) == 0)
1879     {
1880       DBUG_VOID_RETURN; // found match, no need to insert
1881     }
1882     if (interf == 0 && tmp.m_interface == 0)
1883     {
1884       DBUG_VOID_RETURN; // found match, no need to insert
1885     }
1886   }
1887   Transporter_interface t;
1888   t.m_remote_nodeId= remoteNodeId;
1889   t.m_s_service_port= s_port;
1890   t.m_interface= interf;
1891   m_transporter_interface.push_back(t);
1892   DBUG_PRINT("exit",("interface and port added"));
1893   DBUG_VOID_RETURN;
1894 }
1895 
1896 bool
start_service(SocketServer & socket_server)1897 TransporterRegistry::start_service(SocketServer& socket_server)
1898 {
1899   DBUG_ENTER("TransporterRegistry::start_service");
1900   if (m_transporter_interface.size() > 0 &&
1901       localNodeId == 0)
1902   {
1903     g_eventLogger->error("INTERNAL ERROR: not initialized");
1904     DBUG_RETURN(false);
1905   }
1906 
1907   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1908   {
1909     Transporter_interface &t= m_transporter_interface[i];
1910 
1911     unsigned short port= (unsigned short)t.m_s_service_port;
1912     if(t.m_s_service_port<0)
1913       port= -t.m_s_service_port; // is a dynamic port
1914     TransporterService *transporter_service =
1915       new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1916     if(!socket_server.setup(transporter_service,
1917 			    &port, t.m_interface))
1918     {
1919       DBUG_PRINT("info", ("Trying new port"));
1920       port= 0;
1921       if(t.m_s_service_port>0
1922 	 || !socket_server.setup(transporter_service,
1923 				 &port, t.m_interface))
1924       {
1925 	/*
1926 	 * If it wasn't a dynamically allocated port, or
1927 	 * our attempts at getting a new dynamic port failed
1928 	 */
1929         g_eventLogger->error("Unable to setup transporter service port: %s:%d!\n"
1930                              "Please check if the port is already used,\n"
1931                              "(perhaps the node is already running)",
1932                              t.m_interface ? t.m_interface : "*", t.m_s_service_port);
1933 	delete transporter_service;
1934 	DBUG_RETURN(false);
1935       }
1936     }
1937     t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
1938     DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
1939     transporter_service->setTransporterRegistry(this);
1940   }
1941   DBUG_RETURN(true);
1942 }
1943 
1944 #ifdef NDB_SHM_TRANSPORTER
1945 extern "C"
1946 RETSIGTYPE
shm_sig_handler(int signo)1947 shm_sig_handler(int signo)
1948 {
1949   g_shm_counter++;
1950 }
1951 #endif
1952 
1953 void
startReceiving()1954 TransporterRegistry::startReceiving()
1955 {
1956   DBUG_ENTER("TransporterRegistry::startReceiving");
1957 
1958 #ifdef NDB_SHM_TRANSPORTER
1959   m_shm_own_pid = getpid();
1960   if (g_ndb_shm_signum)
1961   {
1962     DBUG_PRINT("info",("Install signal handler for signum %d",
1963 		       g_ndb_shm_signum));
1964     struct sigaction sa;
1965     NdbThread_set_shm_sigmask(FALSE);
1966     sigemptyset(&sa.sa_mask);
1967     sa.sa_handler = shm_sig_handler;
1968     sa.sa_flags = 0;
1969     int ret;
1970     while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR)
1971       ;
1972     if(ret != 0)
1973     {
1974       DBUG_PRINT("error",("Install failed"));
1975       g_eventLogger->error("Failed to install signal handler for"
1976                            " SHM transporter, signum %d, errno: %d (%s)",
1977                            g_ndb_shm_signum, errno, strerror(errno));
1978     }
1979   }
1980 #endif // NDB_SHM_TRANSPORTER
1981   DBUG_VOID_RETURN;
1982 }
1983 
1984 void
stopReceiving()1985 TransporterRegistry::stopReceiving(){
1986   /**
1987    * Disconnect all transporters, this includes detach from remote node
1988    * and since that must be done from the same process that called attach
1989    * it's done here in the receive thread
1990    */
1991   disconnectAll();
1992 }
1993 
1994 void
startSending()1995 TransporterRegistry::startSending(){
1996 }
1997 
1998 void
stopSending()1999 TransporterRegistry::stopSending(){
2000 }
2001 
operator <<(NdbOut & out,SignalHeader & sh)2002 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
2003   out << "-- Signal Header --" << endl;
2004   out << "theLength:    " << sh.theLength << endl;
2005   out << "gsn:          " << sh.theVerId_signalNumber << endl;
2006   out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
2007   out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
2008   out << "sendersSig:   " << sh.theSendersSignalId << endl;
2009   out << "theSignalId:  " << sh.theSignalId << endl;
2010   out << "trace:        " << (int)sh.theTrace << endl;
2011   return out;
2012 }
2013 
2014 Transporter*
get_transporter(NodeId nodeId)2015 TransporterRegistry::get_transporter(NodeId nodeId) {
2016   assert(nodeId < maxTransporters);
2017   return theTransporters[nodeId];
2018 }
2019 
2020 
connect_client(NdbMgmHandle * h)2021 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
2022 {
2023   DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
2024 
2025   Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
2026 
2027   if(!mgm_nodeid)
2028   {
2029     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2030     return false;
2031   }
2032   Transporter * t = theTransporters[mgm_nodeid];
2033   if (!t)
2034   {
2035     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2036     return false;
2037   }
2038 
2039   bool res = t->connect_client(connect_ndb_mgmd(h));
2040   if (res == true)
2041   {
2042     performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
2043   }
2044   DBUG_RETURN(res);
2045 }
2046 
2047 
2048 
2049 /**
2050  * Given a connected NdbMgmHandle, turns it into a transporter
2051  * and returns the socket.
2052  */
connect_ndb_mgmd(NdbMgmHandle * h)2053 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
2054 {
2055   struct ndb_mgm_reply mgm_reply;
2056   NDB_SOCKET_TYPE sockfd;
2057   my_socket_invalidate(&sockfd);
2058 
2059   DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
2060 
2061   if ( h==NULL || *h == NULL )
2062   {
2063     g_eventLogger->error("Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
2064     DBUG_RETURN(sockfd);
2065   }
2066 
2067   for(unsigned int i=0;i < m_transporter_interface.size();i++)
2068   {
2069     if (m_transporter_interface[i].m_s_service_port >= 0)
2070       continue;
2071 
2072     DBUG_PRINT("info", ("Setting dynamic port %d for connection from node %d",
2073                         m_transporter_interface[i].m_s_service_port,
2074                         m_transporter_interface[i].m_remote_nodeId));
2075 
2076     if (ndb_mgm_set_connection_int_parameter(*h,
2077                                    localNodeId,
2078 				   m_transporter_interface[i].m_remote_nodeId,
2079 				   CFG_CONNECTION_SERVER_PORT,
2080 				   m_transporter_interface[i].m_s_service_port,
2081 				   &mgm_reply) < 0)
2082     {
2083       g_eventLogger->error("Could not set dynamic port for %d->%d (%s:%d)",
2084                            localNodeId,
2085                            m_transporter_interface[i].m_remote_nodeId,
2086                            __FILE__, __LINE__);
2087       ndb_mgm_destroy_handle(h);
2088       DBUG_RETURN(sockfd);
2089     }
2090   }
2091 
2092   /**
2093    * convert_to_transporter also disposes of the handle (i.e. we don't leak
2094    * memory here.
2095    */
2096   DBUG_PRINT("info", ("Converting handle to transporter"));
2097   sockfd= ndb_mgm_convert_to_transporter(h);
2098   if (!my_socket_valid(sockfd))
2099   {
2100     g_eventLogger->error("Failed to convert to transporter (%s: %d)",
2101                          __FILE__, __LINE__);
2102     ndb_mgm_destroy_handle(h);
2103   }
2104   DBUG_RETURN(sockfd);
2105 }
2106 
2107 /**
2108  * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
2109  * and returns the socket.
2110  */
connect_ndb_mgmd(SocketClient * sc)2111 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
2112 {
2113   NdbMgmHandle h= ndb_mgm_create_handle();
2114   NDB_SOCKET_TYPE s;
2115   my_socket_invalidate(&s);
2116 
2117   DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
2118 
2119   if ( h == NULL )
2120   {
2121     DBUG_RETURN(s);
2122   }
2123 
2124   /**
2125    * Set connectstring
2126    */
2127   {
2128     BaseString cs;
2129     cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
2130     ndb_mgm_set_connectstring(h, cs.c_str());
2131   }
2132 
2133   if(ndb_mgm_connect(h, 0, 0, 0)<0)
2134   {
2135     DBUG_PRINT("info", ("connection to mgmd failed"));
2136     ndb_mgm_destroy_handle(&h);
2137     DBUG_RETURN(s);
2138   }
2139 
2140   DBUG_RETURN(connect_ndb_mgmd(&h));
2141 }
2142 
2143 /**
2144  * Default implementation of transporter send buffer handler.
2145  */
2146 
2147 Uint32 *
getWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2148 TransporterRegistry::getWritePtr(TransporterSendBufferHandle *handle,
2149                                  NodeId node, Uint32 lenBytes, Uint32 prio)
2150 {
2151   Transporter *t = theTransporters[node];
2152   Uint32 *insertPtr = handle->getWritePtr(node, lenBytes, prio,
2153                                           t->get_max_send_buffer());
2154 
2155   if (insertPtr == 0) {
2156     //-------------------------------------------------
2157     // Buffer was completely full. We have severe problems.
2158     // We will attempt to wait for a small time
2159     //-------------------------------------------------
2160     if(t->send_is_possible(10)) {
2161       //-------------------------------------------------
2162       // Send is possible after the small timeout.
2163       //-------------------------------------------------
2164       if(!handle->forceSend(node)){
2165 	return 0;
2166       } else {
2167 	//-------------------------------------------------
2168 	// Since send was successful we will make a renewed
2169 	// attempt at inserting the signal into the buffer.
2170 	//-------------------------------------------------
2171         insertPtr = handle->getWritePtr(node, lenBytes, prio,
2172                                         t->get_max_send_buffer());
2173       }//if
2174     } else {
2175       return 0;
2176     }//if
2177   }
2178   return insertPtr;
2179 }
2180 
2181 void
updateWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2182 TransporterRegistry::updateWritePtr(TransporterSendBufferHandle *handle,
2183                                     NodeId node, Uint32 lenBytes, Uint32 prio)
2184 {
2185   Transporter *t = theTransporters[node];
2186 
2187   Uint32 used = handle->updateWritePtr(node, lenBytes, prio);
2188   t->update_status_overloaded(used);
2189 
2190   if(t->send_limit_reached(used)) {
2191     //-------------------------------------------------
2192     // Buffer is full and we are ready to send. We will
2193     // not wait since the signal is already in the buffer.
2194     // Force flag set has the same indication that we
2195     // should always send. If it is not possible to send
2196     // we will not worry since we will soon be back for
2197     // a renewed trial.
2198     //-------------------------------------------------
2199     if(t->send_is_possible(0)) {
2200       //-------------------------------------------------
2201       // Send was possible, attempt at a send.
2202       //-------------------------------------------------
2203       handle->forceSend(node);
2204     }//if
2205   }
2206 }
2207 
2208 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)2209 TransporterRegistry::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
2210                                              Uint32 max)
2211 {
2212   assert(m_use_default_send_buffer);
2213 
2214   if (max == 0)
2215     return 0;
2216 
2217   Uint32 count = 0;
2218   SendBuffer *b = m_send_buffers + node;
2219   SendBufferPage *page = b->m_first_page;
2220   while (page != NULL && count < max)
2221   {
2222     dst[count].iov_base = page->m_data+page->m_start;
2223     dst[count].iov_len = page->m_bytes;
2224     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2225     page = page->m_next;
2226     count++;
2227   }
2228 
2229   return count;
2230 }
2231 
2232 Uint32
bytes_sent(NodeId node,Uint32 bytes)2233 TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
2234 {
2235   assert(m_use_default_send_buffer);
2236 
2237   SendBuffer *b = m_send_buffers + node;
2238   Uint32 used_bytes = b->m_used_bytes;
2239 
2240   if (bytes == 0)
2241     return used_bytes;
2242 
2243   used_bytes -= bytes;
2244   b->m_used_bytes = used_bytes;
2245 
2246   SendBufferPage *page = b->m_first_page;
2247   while (bytes && bytes >= page->m_bytes)
2248   {
2249     SendBufferPage * tmp = page;
2250     bytes -= page->m_bytes;
2251     page = page->m_next;
2252     release_page(tmp);
2253   }
2254 
2255   if (used_bytes == 0)
2256   {
2257     b->m_first_page = 0;
2258     b->m_last_page = 0;
2259   }
2260   else
2261   {
2262     page->m_start += bytes;
2263     page->m_bytes -= bytes;
2264     assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2265     b->m_first_page = page;
2266   }
2267 
2268   return used_bytes;
2269 }
2270 
2271 bool
has_data_to_send(NodeId node)2272 TransporterRegistry::has_data_to_send(NodeId node)
2273 {
2274   assert(m_use_default_send_buffer);
2275 
2276   SendBuffer *b = m_send_buffers + node;
2277   return (b->m_first_page != NULL && b->m_first_page->m_bytes);
2278 }
2279 
2280 void
reset_send_buffer(NodeId node,bool should_be_empty)2281 TransporterRegistry::reset_send_buffer(NodeId node, bool should_be_empty)
2282 {
2283   assert(m_use_default_send_buffer);
2284 
2285   // Make sure that buffer is already empty if the "should_be_empty"
2286   // flag is set. This is done to quickly catch any stray signals
2287   // written to the send buffer while not being connected
2288   if (should_be_empty && !has_data_to_send(node))
2289     return;
2290   assert(!should_be_empty);
2291 
2292   SendBuffer *b = m_send_buffers + node;
2293   SendBufferPage *page = b->m_first_page;
2294   while (page != NULL)
2295   {
2296     SendBufferPage *next = page->m_next;
2297     release_page(page);
2298     page = next;
2299   }
2300   b->m_first_page = NULL;
2301   b->m_last_page = NULL;
2302   b->m_used_bytes = 0;
2303 }
2304 
2305 TransporterRegistry::SendBufferPage *
alloc_page()2306 TransporterRegistry::alloc_page()
2307 {
2308   SendBufferPage *page = m_page_freelist;
2309   if (page != NULL)
2310   {
2311     m_page_freelist = page->m_next;
2312     return page;
2313   }
2314 
2315   ndbout << "ERROR: out of send buffers in kernel." << endl;
2316   return NULL;
2317 }
2318 
2319 void
release_page(SendBufferPage * page)2320 TransporterRegistry::release_page(SendBufferPage *page)
2321 {
2322   assert(page != NULL);
2323   page->m_next = m_page_freelist;
2324   m_page_freelist = page;
2325 }
2326 
2327 Uint32 *
getWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio,Uint32 max_use)2328 TransporterRegistry::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
2329                                  Uint32 max_use)
2330 {
2331   assert(m_use_default_send_buffer);
2332 
2333   SendBuffer *b = m_send_buffers + node;
2334 
2335   /* First check if we have room in already allocated page. */
2336   SendBufferPage *page = b->m_last_page;
2337   if (page != NULL && page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
2338   {
2339     return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
2340   }
2341 
2342   if (b->m_used_bytes + lenBytes > max_use)
2343     return NULL;
2344 
2345   /* Allocate a new page. */
2346   page = alloc_page();
2347   if (page == NULL)
2348     return NULL;
2349   page->m_next = NULL;
2350   page->m_bytes = 0;
2351   page->m_start = 0;
2352 
2353   if (b->m_last_page == NULL)
2354   {
2355     b->m_first_page = page;
2356     b->m_last_page = page;
2357   }
2358   else
2359   {
2360     assert(b->m_first_page != NULL);
2361     b->m_last_page->m_next = page;
2362     b->m_last_page = page;
2363   }
2364   return (Uint32 *)(page->m_data);
2365 }
2366 
2367 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)2368 TransporterRegistry::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2369 {
2370   assert(m_use_default_send_buffer);
2371 
2372   SendBuffer *b = m_send_buffers + node;
2373   SendBufferPage *page = b->m_last_page;
2374   assert(page != NULL);
2375   assert(page->m_bytes + lenBytes <= page->max_data_bytes());
2376   page->m_bytes += lenBytes;
2377   b->m_used_bytes += lenBytes;
2378   return b->m_used_bytes;
2379 }
2380 
2381 bool
forceSend(NodeId node)2382 TransporterRegistry::forceSend(NodeId node)
2383 {
2384   Transporter *t = get_transporter(node);
2385   if (t)
2386     return t->doSend();
2387   else
2388     return false;
2389 }
2390 
2391 
2392 void
print_transporters(const char * where,NdbOut & out)2393 TransporterRegistry::print_transporters(const char* where, NdbOut& out)
2394 {
2395   out << where << " >>" << endl;
2396 
2397   for(unsigned i = 0; i < maxTransporters; i++){
2398     if(theTransporters[i] == NULL)
2399       continue;
2400 
2401     const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
2402 
2403     out << i << " "
2404         << getPerformStateString(remoteNodeId) << " to node: "
2405         << remoteNodeId << " at "
2406         << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
2407   }
2408 
2409   out << "<<" << endl;
2410 
2411   for (size_t i= 0; i < m_transporter_interface.size(); i++){
2412     Transporter_interface tf= m_transporter_interface[i];
2413 
2414     out << i
2415         << " remote node: " << tf.m_remote_nodeId
2416         << " port: " << tf.m_s_service_port
2417         << " interface: " << tf.m_interface << endl;
2418   }
2419 }
2420 
2421 
2422 template class Vector<TransporterRegistry::Transporter_interface>;
2423