1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include <TransporterRegistry.hpp>
28 #include "TransporterInternalDefinitions.hpp"
29
30 #include "Transporter.hpp"
31 #include <SocketAuthenticator.hpp>
32
33 #ifdef NDB_TCP_TRANSPORTER
34 #include "TCP_Transporter.hpp"
35 #include "Loopback_Transporter.hpp"
36 #endif
37
38 #ifdef NDB_SCI_TRANSPORTER
39 #include "SCI_Transporter.hpp"
40 #endif
41
42 #ifdef NDB_SHM_TRANSPORTER
43 #include "SHM_Transporter.hpp"
44 extern int g_ndb_shm_signum;
45 #endif
46
47 #include "NdbOut.hpp"
48 #include <NdbSleep.h>
49 #include <NdbTick.h>
50 #include <InputStream.hpp>
51 #include <OutputStream.hpp>
52
53 #include <mgmapi/mgmapi.h>
54 #include <mgmapi_internal.h>
55 #include <mgmapi/mgmapi_debug.h>
56
57 #include <EventLogger.hpp>
58 extern EventLogger * g_eventLogger;
59
60 struct in_addr
get_connect_address(NodeId node_id) const61 TransporterRegistry::get_connect_address(NodeId node_id) const
62 {
63 return theTransporters[node_id]->m_connect_address;
64 }
65
newSession(NDB_SOCKET_TYPE sockfd)66 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
67 {
68 DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
69 if (m_auth && !m_auth->server_authenticate(sockfd)){
70 NDB_CLOSE_SOCKET(sockfd);
71 DBUG_RETURN(0);
72 }
73
74 BaseString msg;
75 if (!m_transporter_registry->connect_server(sockfd, msg))
76 {
77 NDB_CLOSE_SOCKET(sockfd);
78 DBUG_RETURN(0);
79 }
80
81 DBUG_RETURN(0);
82 }
83
TransporterRegistry(TransporterCallback * callback,bool use_default_send_buffer,unsigned _maxTransporters,unsigned sizeOfLongSignalMemory)84 TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
85 bool use_default_send_buffer,
86 unsigned _maxTransporters,
87 unsigned sizeOfLongSignalMemory) :
88 m_mgm_handle(0),
89 localNodeId(0),
90 m_transp_count(0),
91 m_use_default_send_buffer(use_default_send_buffer),
92 m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
93 m_total_max_send_buffer(0)
94 {
95 DBUG_ENTER("TransporterRegistry::TransporterRegistry");
96
97 maxTransporters = _maxTransporters;
98 sendCounter = 1;
99
100 callbackObj=callback;
101
102 theTCPTransporters = new TCP_Transporter * [maxTransporters];
103 theSCITransporters = new SCI_Transporter * [maxTransporters];
104 theSHMTransporters = new SHM_Transporter * [maxTransporters];
105 theTransporterTypes = new TransporterType [maxTransporters];
106 theTransporters = new Transporter * [maxTransporters];
107 performStates = new PerformState [maxTransporters];
108 ioStates = new IOState [maxTransporters];
109 m_disconnect_errnum = new int [maxTransporters];
110 m_error_states = new ErrorState [maxTransporters];
111
112 m_has_extra_wakeup_socket = false;
113 #if defined(HAVE_EPOLL_CREATE)
114 m_epoll_fd = -1;
115 m_epoll_events = new struct epoll_event[maxTransporters];
116 m_epoll_fd = epoll_create(maxTransporters);
117 if (m_epoll_fd == -1 || !m_epoll_events)
118 {
119 /* Failure to allocate data or get epoll socket, abort */
120 perror("Failed to alloc epoll-array or calling epoll_create... falling back to select!");
121 if (m_epoll_fd != -1)
122 {
123 close(m_epoll_fd);
124 m_epoll_fd = -1;
125 }
126 if (m_epoll_events)
127 {
128 delete [] m_epoll_events;
129 m_epoll_events = 0;
130 }
131 }
132 else
133 {
134 memset((char*)m_epoll_events, 0,
135 maxTransporters * sizeof(struct epoll_event));
136 }
137
138 #endif
139 #ifdef ERROR_INSERT
140 m_blocked.clear();
141 m_blocked_with_data.clear();
142 m_blocked_disconnected.clear();
143 #endif
144 // Initialize member variables
145 nTransporters = 0;
146 nTCPTransporters = 0;
147 nSCITransporters = 0;
148 nSHMTransporters = 0;
149
150 // Initialize the transporter arrays
151 ErrorState default_error_state = { TE_NO_ERROR, (const char *)~(UintPtr)0 };
152 for (unsigned i=0; i<maxTransporters; i++) {
153 theTCPTransporters[i] = NULL;
154 theSCITransporters[i] = NULL;
155 theSHMTransporters[i] = NULL;
156 theTransporters[i] = NULL;
157 performStates[i] = DISCONNECTED;
158 ioStates[i] = NoHalt;
159 m_disconnect_errnum[i]= 0;
160 m_error_states[i] = default_error_state;
161 }
162
163 DBUG_VOID_RETURN;
164 }
165
166 void
allocate_send_buffers(Uint64 total_send_buffer)167 TransporterRegistry::allocate_send_buffers(Uint64 total_send_buffer)
168 {
169 if (!m_use_default_send_buffer)
170 return;
171
172 if (total_send_buffer == 0)
173 total_send_buffer = get_total_max_send_buffer();
174
175 if (m_send_buffers)
176 {
177 /* Send buffers already allocated -> resize the buffer pages */
178 assert(m_send_buffer_memory);
179
180 // TODO resize send buffer pages
181
182 return;
183 }
184
185 /* Initialize transporter send buffers (initially empty). */
186 m_send_buffers = new SendBuffer[maxTransporters];
187 for (unsigned i = 0; i < maxTransporters; i++)
188 {
189 SendBuffer &b = m_send_buffers[i];
190 b.m_first_page = NULL;
191 b.m_last_page = NULL;
192 b.m_used_bytes = 0;
193 }
194
195 /* Initialize the page freelist. */
196 Uint64 send_buffer_pages =
197 (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
198 /* Add one extra page of internal fragmentation overhead per transporter. */
199 send_buffer_pages += nTransporters;
200
201 m_send_buffer_memory =
202 new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
203 if (m_send_buffer_memory == NULL)
204 {
205 ndbout << "Unable to allocate "
206 << send_buffer_pages * SendBufferPage::PGSIZE
207 << " bytes of memory for send buffers, aborting." << endl;
208 abort();
209 }
210
211 m_page_freelist = NULL;
212 for (unsigned i = 0; i < send_buffer_pages; i++)
213 {
214 SendBufferPage *page =
215 (SendBufferPage *)(m_send_buffer_memory + i * SendBufferPage::PGSIZE);
216 page->m_bytes = 0;
217 page->m_next = m_page_freelist;
218 m_page_freelist = page;
219 }
220 }
221
set_mgm_handle(NdbMgmHandle h)222 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
223 {
224 DBUG_ENTER("TransporterRegistry::set_mgm_handle");
225 if (m_mgm_handle)
226 ndb_mgm_destroy_handle(&m_mgm_handle);
227 m_mgm_handle= h;
228 ndb_mgm_set_timeout(m_mgm_handle, 5000);
229 #ifndef DBUG_OFF
230 if (h)
231 {
232 char buf[256];
233 DBUG_PRINT("info",("handle set with connectstring: %s",
234 ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
235 }
236 else
237 {
238 DBUG_PRINT("info",("handle set to NULL"));
239 }
240 #endif
241 DBUG_VOID_RETURN;
242 }
243
~TransporterRegistry()244 TransporterRegistry::~TransporterRegistry()
245 {
246 DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
247
248 removeAll();
249
250 delete[] theTCPTransporters;
251 delete[] theSCITransporters;
252 delete[] theSHMTransporters;
253 delete[] theTransporterTypes;
254 delete[] theTransporters;
255 delete[] performStates;
256 delete[] ioStates;
257 delete[] m_disconnect_errnum;
258 delete[] m_error_states;
259
260 if (m_send_buffers)
261 delete[] m_send_buffers;
262 m_page_freelist = NULL;
263 if (m_send_buffer_memory)
264 delete[] m_send_buffer_memory;
265
266 #if defined(HAVE_EPOLL_CREATE)
267 if (m_epoll_events) delete [] m_epoll_events;
268 if (m_epoll_fd != -1) close(m_epoll_fd);
269 #endif
270 if (m_mgm_handle)
271 ndb_mgm_destroy_handle(&m_mgm_handle);
272
273 if (m_has_extra_wakeup_socket)
274 {
275 my_socket_close(m_extra_wakeup_sockets[0]);
276 my_socket_close(m_extra_wakeup_sockets[1]);
277 }
278
279 DBUG_VOID_RETURN;
280 }
281
282 void
removeAll()283 TransporterRegistry::removeAll(){
284 for(unsigned i = 0; i<maxTransporters; i++){
285 if(theTransporters[i] != NULL)
286 removeTransporter(theTransporters[i]->getRemoteNodeId());
287 }
288 }
289
290 void
disconnectAll()291 TransporterRegistry::disconnectAll(){
292 for(unsigned i = 0; i<maxTransporters; i++){
293 if(theTransporters[i] != NULL)
294 theTransporters[i]->doDisconnect();
295 }
296 }
297
298 bool
init(NodeId nodeId)299 TransporterRegistry::init(NodeId nodeId) {
300 DBUG_ENTER("TransporterRegistry::init");
301 assert(localNodeId == 0 ||
302 localNodeId == nodeId);
303
304 localNodeId = nodeId;
305
306 DEBUG("TransporterRegistry started node: " << localNodeId);
307
308 if (!m_socket_poller.set_max_count(maxTransporters +
309 1 /* wakeup socket */))
310 DBUG_RETURN(false);
311
312 DBUG_RETURN(true);
313 }
314
315 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg) const316 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
317 BaseString & msg) const
318 {
319 DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
320
321 // Read "hello" that consists of node id and transporter
322 // type from client
323 SocketInputStream s_input(sockfd);
324 char buf[11+1+11+1]; // <int> <int>
325 if (s_input.gets(buf, sizeof(buf)) == 0) {
326 msg.assfmt("line: %u : Failed to get nodeid from client", __LINE__);
327 DBUG_PRINT("error", ("Failed to read 'hello' from client"));
328 DBUG_RETURN(false);
329 }
330
331 int nodeId, remote_transporter_type= -1;
332 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
333 switch (r) {
334 case 2:
335 break;
336 case 1:
337 // we're running version prior to 4.1.9
338 // ok, but with no checks on transporter configuration compatability
339 break;
340 default:
341 msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
342 DBUG_PRINT("error", ("Failed to parse 'hello' from client, buf: '%.*s'",
343 (int)sizeof(buf), buf));
344 DBUG_RETURN(false);
345 }
346
347 DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d",
348 nodeId, remote_transporter_type));
349
350
351 // Check that nodeid is in range before accessing the arrays
352 if (nodeId < 0 ||
353 nodeId >= (int)maxTransporters)
354 {
355 msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
356 DBUG_PRINT("error", ("Out of range nodeId: %d from client",
357 nodeId));
358 DBUG_RETURN(false);
359 }
360
361 // Check that transporter is allocated
362 Transporter *t= theTransporters[nodeId];
363 if (t == 0)
364 {
365 msg.assfmt("line: %u : Incorrect reply from client: >%s<, node: %u",
366 __LINE__, buf, nodeId);
367 DBUG_PRINT("error", ("No transporter available for node id %d", nodeId));
368 DBUG_RETURN(false);
369 }
370
371 // Check that the transporter should be connecting
372 if (performStates[nodeId] != TransporterRegistry::CONNECTING)
373 {
374 msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
375 __LINE__, nodeId,
376 getPerformStateString(performStates[nodeId]),
377 performStates[nodeId]);
378
379 DBUG_PRINT("error", ("Transporter for node id %d in wrong state",
380 nodeId));
381 DBUG_RETURN(false);
382 }
383
384 // Check transporter type
385 if (remote_transporter_type != -1 &&
386 remote_transporter_type != t->m_type)
387 {
388 g_eventLogger->error("Connection from node: %d uses different transporter "
389 "type: %d, expected type: %d",
390 nodeId, remote_transporter_type, t->m_type);
391 DBUG_RETURN(false);
392 }
393
394 // Send reply to client
395 SocketOutputStream s_output(sockfd);
396 if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
397 {
398 msg.assfmt("line: %u : Failed to reply to connecting socket (node: %u)",
399 __LINE__, nodeId);
400 DBUG_PRINT("error", ("Send of reply failed"));
401 DBUG_RETURN(false);
402 }
403
404 // Setup transporter (transporter responsible for closing sockfd)
405 bool res = t->connect_server(sockfd, msg);
406
407 if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
408 {
409 msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
410 __LINE__, nodeId,
411 getPerformStateString(performStates[nodeId]),
412 performStates[nodeId]);
413 // Connection suceeded, but not connecting anymore, return
414 // false to close the connection
415 DBUG_RETURN(false);
416 }
417
418 DBUG_RETURN(res);
419 }
420
421
422 bool
configureTransporter(TransporterConfiguration * config)423 TransporterRegistry::configureTransporter(TransporterConfiguration *config)
424 {
425 NodeId remoteNodeId = config->remoteNodeId;
426
427 assert(localNodeId);
428 assert(config->localNodeId == localNodeId);
429
430 if (remoteNodeId >= maxTransporters)
431 return false;
432
433 Transporter* t = theTransporters[remoteNodeId];
434 if(t != NULL)
435 {
436 // Transporter already exist, try to reconfigure it
437 return t->configure(config);
438 }
439
440 DEBUG("Configuring transporter from " << localNodeId
441 << " to " << remoteNodeId);
442
443 switch (config->type){
444 case tt_TCP_TRANSPORTER:
445 return createTCPTransporter(config);
446 case tt_SHM_TRANSPORTER:
447 return createSHMTransporter(config);
448 case tt_SCI_TRANSPORTER:
449 return createSCITransporter(config);
450 default:
451 abort();
452 break;
453 }
454 return false;
455 }
456
457
458 bool
createTCPTransporter(TransporterConfiguration * config)459 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
460 #ifdef NDB_TCP_TRANSPORTER
461
462 TCP_Transporter * t = 0;
463 if (config->remoteNodeId == config->localNodeId)
464 {
465 t = new Loopback_Transporter(* this, config);
466 }
467 else
468 {
469 t = new TCP_Transporter(*this, config);
470 }
471
472 if (t == NULL)
473 return false;
474 else if (!t->initTransporter()) {
475 delete t;
476 return false;
477 }
478
479 // Put the transporter in the transporter arrays
480 theTCPTransporters[nTCPTransporters] = t;
481 theTransporters[t->getRemoteNodeId()] = t;
482 theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
483 performStates[t->getRemoteNodeId()] = DISCONNECTED;
484 nTransporters++;
485 nTCPTransporters++;
486 m_total_max_send_buffer += t->get_max_send_buffer();
487
488 return true;
489 #else
490 return false;
491 #endif
492 }
493
494 bool
createSCITransporter(TransporterConfiguration * config)495 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
496 #ifdef NDB_SCI_TRANSPORTER
497
498 if(!SCI_Transporter::initSCI())
499 abort();
500
501 SCI_Transporter * t = new SCI_Transporter(*this,
502 config->localHostName,
503 config->remoteHostName,
504 config->s_port,
505 config->isMgmConnection,
506 config->sci.sendLimit,
507 config->sci.bufferSize,
508 config->sci.nLocalAdapters,
509 config->sci.remoteSciNodeId0,
510 config->sci.remoteSciNodeId1,
511 localNodeId,
512 config->remoteNodeId,
513 config->serverNodeId,
514 config->checksum,
515 config->signalId);
516
517 if (t == NULL)
518 return false;
519 else if (!t->initTransporter()) {
520 delete t;
521 return false;
522 }
523 // Put the transporter in the transporter arrays
524 theSCITransporters[nSCITransporters] = t;
525 theTransporters[t->getRemoteNodeId()] = t;
526 theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
527 performStates[t->getRemoteNodeId()] = DISCONNECTED;
528 nTransporters++;
529 nSCITransporters++;
530 m_total_max_send_buffer += t->get_max_send_buffer();
531
532 return true;
533 #else
534 return false;
535 #endif
536 }
537
538 bool
createSHMTransporter(TransporterConfiguration * config)539 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
540 DBUG_ENTER("TransporterRegistry::createTransporter SHM");
541 #ifdef NDB_SHM_TRANSPORTER
542
543 if (!g_ndb_shm_signum) {
544 g_ndb_shm_signum= config->shm.signum;
545 DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
546 /**
547 * Make sure to block g_ndb_shm_signum
548 * TransporterRegistry::init is run from "main" thread
549 */
550 NdbThread_set_shm_sigmask(TRUE);
551 }
552
553 if(config->shm.signum != g_ndb_shm_signum)
554 return false;
555
556 SHM_Transporter * t = new SHM_Transporter(*this,
557 config->localHostName,
558 config->remoteHostName,
559 config->s_port,
560 config->isMgmConnection,
561 localNodeId,
562 config->remoteNodeId,
563 config->serverNodeId,
564 config->checksum,
565 config->signalId,
566 config->shm.shmKey,
567 config->shm.shmSize
568 );
569 if (t == NULL)
570 return false;
571 else if (!t->initTransporter()) {
572 delete t;
573 return false;
574 }
575 // Put the transporter in the transporter arrays
576 theSHMTransporters[nSHMTransporters] = t;
577 theTransporters[t->getRemoteNodeId()] = t;
578 theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
579 performStates[t->getRemoteNodeId()] = DISCONNECTED;
580
581 nTransporters++;
582 nSHMTransporters++;
583 m_total_max_send_buffer += t->get_max_send_buffer();
584
585 DBUG_RETURN(true);
586 #else
587 DBUG_RETURN(false);
588 #endif
589 }
590
591
592 void
removeTransporter(NodeId nodeId)593 TransporterRegistry::removeTransporter(NodeId nodeId) {
594
595 DEBUG("Removing transporter from " << localNodeId
596 << " to " << nodeId);
597
598 if(theTransporters[nodeId] == NULL)
599 return;
600
601 theTransporters[nodeId]->doDisconnect();
602
603 const TransporterType type = theTransporterTypes[nodeId];
604
605 int ind = 0;
606 switch(type){
607 case tt_TCP_TRANSPORTER:
608 #ifdef NDB_TCP_TRANSPORTER
609 for(; ind < nTCPTransporters; ind++)
610 if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
611 break;
612 ind++;
613 for(; ind<nTCPTransporters; ind++)
614 theTCPTransporters[ind-1] = theTCPTransporters[ind];
615 nTCPTransporters --;
616 #endif
617 break;
618 case tt_SCI_TRANSPORTER:
619 #ifdef NDB_SCI_TRANSPORTER
620 for(; ind < nSCITransporters; ind++)
621 if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
622 break;
623 ind++;
624 for(; ind<nSCITransporters; ind++)
625 theSCITransporters[ind-1] = theSCITransporters[ind];
626 nSCITransporters --;
627 #endif
628 break;
629 case tt_SHM_TRANSPORTER:
630 #ifdef NDB_SHM_TRANSPORTER
631 for(; ind < nSHMTransporters; ind++)
632 if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
633 break;
634 ind++;
635 for(; ind<nSHMTransporters; ind++)
636 theSHMTransporters[ind-1] = theSHMTransporters[ind];
637 nSHMTransporters --;
638 #endif
639 break;
640 }
641
642 nTransporters--;
643
644 // Delete the transporter and remove it from theTransporters array
645 delete theTransporters[nodeId];
646 theTransporters[nodeId] = NULL;
647 }
648
649 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])650 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
651 const SignalHeader * const signalHeader,
652 Uint8 prio,
653 const Uint32 * const signalData,
654 NodeId nodeId,
655 const LinearSectionPtr ptr[3]){
656
657
658 Transporter *t = theTransporters[nodeId];
659 if(t != NULL &&
660 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
661 ((signalHeader->theReceiversBlockNumber == 252) ||
662 (signalHeader->theReceiversBlockNumber == 4002)))) {
663
664 if(t->isConnected()){
665 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
666 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
667 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
668 if(insertPtr != 0){
669 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
670 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
671 return SEND_OK;
672 }
673
674 int sleepTime = 2;
675
676 /**
677 * @note: on linux/i386 the granularity is 10ms
678 * so sleepTime = 2 generates a 10 ms sleep.
679 */
680 for(int i = 0; i<50; i++){
681 if((nSHMTransporters+nSCITransporters) == 0)
682 NdbSleep_MilliSleep(sleepTime);
683 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
684 if(insertPtr != 0){
685 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
686 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
687 break;
688 }
689 }
690
691 if(insertPtr != 0){
692 /**
693 * Send buffer full, but resend works
694 */
695 report_error(nodeId, TE_SEND_BUFFER_FULL);
696 return SEND_OK;
697 }
698
699 WARNING("Signal to " << nodeId << " lost(buffer)");
700 report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
701 return SEND_BUFFER_FULL;
702 } else {
703 return SEND_MESSAGE_TOO_BIG;
704 }
705 } else {
706 #ifdef ERROR_INSERT
707 if (m_blocked.get(nodeId))
708 {
709 /* Looks like it disconnected while blocked. We'll pretend
710 * not to notice for now
711 */
712 WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
713 return SEND_OK;
714 }
715 #endif
716 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
717 return SEND_DISCONNECTED;
718 }
719 } else {
720 DEBUG("Discarding message to block: "
721 << signalHeader->theReceiversBlockNumber
722 << " node: " << nodeId);
723
724 if(t == NULL)
725 return SEND_UNKNOWN_NODE;
726
727 return SEND_BLOCKED;
728 }
729 }
730
731 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])732 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
733 const SignalHeader * const signalHeader,
734 Uint8 prio,
735 const Uint32 * const signalData,
736 NodeId nodeId,
737 class SectionSegmentPool & thePool,
738 const SegmentedSectionPtr ptr[3]){
739
740
741 Transporter *t = theTransporters[nodeId];
742 if(t != NULL &&
743 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
744 ((signalHeader->theReceiversBlockNumber == 252)||
745 (signalHeader->theReceiversBlockNumber == 4002)))) {
746
747 if(t->isConnected()){
748 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
749 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
750 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
751 if(insertPtr != 0){
752 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
753 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
754 return SEND_OK;
755 }
756
757
758 /**
759 * @note: on linux/i386 the granularity is 10ms
760 * so sleepTime = 2 generates a 10 ms sleep.
761 */
762 int sleepTime = 2;
763 for(int i = 0; i<50; i++){
764 if((nSHMTransporters+nSCITransporters) == 0)
765 NdbSleep_MilliSleep(sleepTime);
766 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
767 if(insertPtr != 0){
768 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
769 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
770 break;
771 }
772 }
773
774 if(insertPtr != 0){
775 /**
776 * Send buffer full, but resend works
777 */
778 report_error(nodeId, TE_SEND_BUFFER_FULL);
779 return SEND_OK;
780 }
781
782 WARNING("Signal to " << nodeId << " lost(buffer)");
783 report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
784 return SEND_BUFFER_FULL;
785 } else {
786 return SEND_MESSAGE_TOO_BIG;
787 }
788 } else {
789 #ifdef ERROR_INSERT
790 if (m_blocked.get(nodeId))
791 {
792 /* Looks like it disconnected while blocked. We'll pretend
793 * not to notice for now
794 */
795 WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
796 return SEND_OK;
797 }
798 #endif
799 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
800 return SEND_DISCONNECTED;
801 }
802 } else {
803 DEBUG("Discarding message to block: "
804 << signalHeader->theReceiversBlockNumber
805 << " node: " << nodeId);
806
807 if(t == NULL)
808 return SEND_UNKNOWN_NODE;
809
810 return SEND_BLOCKED;
811 }
812 }
813
814
815 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const GenericSectionPtr ptr[3])816 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
817 const SignalHeader * const signalHeader,
818 Uint8 prio,
819 const Uint32 * const signalData,
820 NodeId nodeId,
821 const GenericSectionPtr ptr[3]){
822
823
824 Transporter *t = theTransporters[nodeId];
825 if(t != NULL &&
826 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
827 ((signalHeader->theReceiversBlockNumber == 252) ||
828 (signalHeader->theReceiversBlockNumber == 4002)))) {
829
830 if(t->isConnected()){
831 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
832 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
833 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
834 if(insertPtr != 0){
835 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
836 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
837 return SEND_OK;
838 }
839
840
841 /**
842 * @note: on linux/i386 the granularity is 10ms
843 * so sleepTime = 2 generates a 10 ms sleep.
844 */
845 int sleepTime = 2;
846 for(int i = 0; i<50; i++){
847 if((nSHMTransporters+nSCITransporters) == 0)
848 NdbSleep_MilliSleep(sleepTime);
849 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
850 if(insertPtr != 0){
851 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
852 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
853 break;
854 }
855 }
856
857 if(insertPtr != 0){
858 /**
859 * Send buffer full, but resend works
860 */
861 report_error(nodeId, TE_SEND_BUFFER_FULL);
862 return SEND_OK;
863 }
864
865 WARNING("Signal to " << nodeId << " lost(buffer)");
866 report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
867 return SEND_BUFFER_FULL;
868 } else {
869 return SEND_MESSAGE_TOO_BIG;
870 }
871 } else {
872 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
873 return SEND_DISCONNECTED;
874 }
875 } else {
876 DEBUG("Discarding message to block: "
877 << signalHeader->theReceiversBlockNumber
878 << " node: " << nodeId);
879
880 if(t == NULL)
881 return SEND_UNKNOWN_NODE;
882
883 return SEND_BLOCKED;
884 }
885 }
886
887 void
external_IO(Uint32 timeOutMillis)888 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
889 //-----------------------------------------------------------
890 // Most of the time we will send the buffers here and then wait
891 // for new signals. Thus we start by sending without timeout
892 // followed by the receive part where we expect to sleep for
893 // a while.
894 //-----------------------------------------------------------
895 if(pollReceive(timeOutMillis)){
896 performReceive();
897 }
898 performSend();
899 }
900
901 bool
setup_wakeup_socket()902 TransporterRegistry::setup_wakeup_socket()
903 {
904 if (m_has_extra_wakeup_socket)
905 {
906 return true;
907 }
908
909 if (my_socketpair(m_extra_wakeup_sockets))
910 {
911 perror("socketpair failed!");
912 return false;
913 }
914
915 if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
916 !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
917 {
918 goto err;
919 }
920
921 #if defined(HAVE_EPOLL_CREATE)
922 if (m_epoll_fd != -1)
923 {
924 int sock = m_extra_wakeup_sockets[0].fd;
925 struct epoll_event event_poll;
926 bzero(&event_poll, sizeof(event_poll));
927 event_poll.data.u32 = 0;
928 event_poll.events = EPOLLIN;
929 int ret_val = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, sock, &event_poll);
930 if (ret_val != 0)
931 {
932 int error= errno;
933 fprintf(stderr, "Failed to add extra sock %u to epoll-set: %u\n",
934 sock, error);
935 fflush(stderr);
936 goto err;
937 }
938 }
939 #endif
940 m_has_extra_wakeup_socket = true;
941 return true;
942
943 err:
944 my_socket_close(m_extra_wakeup_sockets[0]);
945 my_socket_close(m_extra_wakeup_sockets[1]);
946 my_socket_invalidate(m_extra_wakeup_sockets+0);
947 my_socket_invalidate(m_extra_wakeup_sockets+1);
948 return false;
949 }
950
951 void
wakeup()952 TransporterRegistry::wakeup()
953 {
954 if (m_has_extra_wakeup_socket)
955 {
956 static char c = 37;
957 my_send(m_extra_wakeup_sockets[1], &c, 1, 0);
958 }
959 }
960
961 Uint32
pollReceive(Uint32 timeOutMillis,NodeBitmask & mask)962 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
963 NodeBitmask& mask)
964 {
965 Uint32 retVal = 0;
966
967 /**
968 * If any transporters have left-over data that was not fully executed in
969 * last loop, don't wait and return 'data available' even if nothing new
970 */
971 if (!mask.isclear())
972 {
973 timeOutMillis = 0;
974 retVal = 1;
975 }
976
977 if (nSCITransporters > 0)
978 {
979 timeOutMillis=0;
980 }
981
982 #ifdef NDB_SHM_TRANSPORTER
983 if (nSHMTransporters > 0)
984 {
985 Uint32 res = poll_SHM(0, mask);
986 if(res)
987 {
988 retVal |= res;
989 timeOutMillis = 0;
990 }
991 }
992 #endif
993
994 #ifdef NDB_TCP_TRANSPORTER
995 #if defined(HAVE_EPOLL_CREATE)
996 if (likely(m_epoll_fd != -1))
997 {
998 Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
999
1000 if (num_trps)
1001 {
1002 tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
1003 num_trps, timeOutMillis);
1004 retVal |= tcpReadSelectReply;
1005 }
1006
1007 int num_socket_events = tcpReadSelectReply;
1008 if (num_socket_events > 0)
1009 {
1010 for (int i = 0; i < num_socket_events; i++)
1011 {
1012 const Uint32 trpid = m_epoll_events[i].data.u32;
1013 #ifdef ERROR_INSERT
1014 if (m_blocked.get(trpid))
1015 {
1016 /* Don't pull from socket now, wait till unblocked */
1017 m_blocked_with_data.set(trpid);
1018 continue;
1019 }
1020 #endif
1021 mask.set(trpid);
1022 }
1023 }
1024 else if (num_socket_events < 0)
1025 {
1026 assert(errno == EINTR);
1027 }
1028 }
1029 else
1030 #endif
1031 {
1032 if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
1033 {
1034 retVal |= poll_TCP(timeOutMillis, mask);
1035 }
1036 else
1037 tcpReadSelectReply = 0;
1038 }
1039 #endif
1040 #ifdef NDB_SCI_TRANSPORTER
1041 if (nSCITransporters > 0)
1042 retVal |= poll_SCI(timeOutMillis, mask);
1043 #endif
1044 #ifdef NDB_SHM_TRANSPORTER
1045 if (nSHMTransporters > 0)
1046 {
1047 int res = poll_SHM(0, mask);
1048 retVal |= res;
1049 }
1050 #endif
1051 return retVal;
1052 }
1053
1054
1055 #ifdef NDB_SCI_TRANSPORTER
1056 Uint32
poll_SCI(Uint32 timeOutMillis,NodeBitmask & mask)1057 TransporterRegistry::poll_SCI(Uint32 timeOutMillis, NodeBitmask& mask)
1058 {
1059 Uint32 retVal = 0;
1060 for (int i = 0; i < nSCITransporters; i++)
1061 {
1062 SCI_Transporter * t = theSCITransporters[i];
1063 Uint32 node_id = t->getRemoteNodeId();
1064 if (t->isConnected() && is_connected(node_id))
1065 {
1066 if (t->hasDataToRead())
1067 {
1068 mask.set(node_id);
1069 retVal = 1;
1070 }
1071 }
1072 }
1073 return retVal;
1074 }
1075 #endif
1076
1077
1078 #ifdef NDB_SHM_TRANSPORTER
1079 static int g_shm_counter = 0;
1080 Uint32
poll_SHM(Uint32 timeOutMillis,NodeBitmask & mask)1081 TransporterRegistry::poll_SHM(Uint32 timeOutMillis, NodeBitmask& mask)
1082 {
1083 Uint32 retVal = 0;
1084 for (int j = 0; j < 100; j++)
1085 {
1086 for (int i = 0; i<nSHMTransporters; i++)
1087 {
1088 SHM_Transporter * t = theSHMTransporters[i];
1089 Uint32 node_id = t->getRemoteNodeId();
1090 if (t->isConnected() && is_connected(node_id))
1091 {
1092 if (t->hasDataToRead())
1093 {
1094 j = 100;
1095 mask.set(node_id);
1096 retVal = 1;
1097 }
1098 }
1099 }
1100 }
1101 return retVal;
1102 }
1103 #endif
1104
1105 #ifdef NDB_TCP_TRANSPORTER
1106 /**
1107 * We do not want to hold any transporter locks during select(), so there
1108 * is no protection against a disconnect closing the socket during this call.
1109 *
1110 * That does not matter, at most we will get a spurious wakeup on the wrong
1111 * socket, which will be handled correctly in performReceive() (which _is_
1112 * protected by transporter locks on upper layer).
1113 */
1114 Uint32
poll_TCP(Uint32 timeOutMillis,NodeBitmask & mask)1115 TransporterRegistry::poll_TCP(Uint32 timeOutMillis, NodeBitmask& mask)
1116 {
1117 m_socket_poller.clear();
1118
1119 if (m_has_extra_wakeup_socket)
1120 {
1121 const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1122
1123 // Poll the wakup-socket for read
1124 m_socket_poller.add(socket, true, false, false);
1125 }
1126
1127 Uint16 idx[MAX_NODES];
1128 for (int i = 0; i < nTCPTransporters; i++)
1129 {
1130 TCP_Transporter * t = theTCPTransporters[i];
1131 const NDB_SOCKET_TYPE socket = t->getSocket();
1132 Uint32 node_id = t->getRemoteNodeId();
1133
1134 if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
1135 {
1136 idx[i] = m_socket_poller.add(socket, true, false, false);
1137 }
1138 else
1139 {
1140 idx[i] = MAX_NODES + 1;
1141 }
1142 }
1143
1144 tcpReadSelectReply = m_socket_poller.poll_unsafe(timeOutMillis);
1145
1146 if (tcpReadSelectReply > 0)
1147 {
1148 if (m_extra_wakeup_sockets)
1149 {
1150 if (m_socket_poller.has_read(0))
1151 mask.set((Uint32)0);
1152 }
1153
1154 for (int i = 0; i < nTCPTransporters; i++)
1155 {
1156 TCP_Transporter * t = theTCPTransporters[i];
1157 if (idx[i] != MAX_NODES + 1)
1158 {
1159 Uint32 node_id = t->getRemoteNodeId();
1160 #ifdef ERROR_INSERT
1161 if (m_blocked.get(i))
1162 {
1163 /* Don't pull from socket now, wait till unblocked */
1164 m_blocked_with_data.set(i);
1165 continue;
1166 }
1167 #endif
1168 if (m_socket_poller.has_read(idx[i]))
1169 mask.set(node_id);
1170 }
1171 }
1172 }
1173
1174 return tcpReadSelectReply;
1175 }
1176 #endif
1177
1178 #if defined(HAVE_EPOLL_CREATE)
1179 bool
change_epoll(TCP_Transporter * t,bool add)1180 TransporterRegistry::change_epoll(TCP_Transporter *t, bool add)
1181 {
1182 struct epoll_event event_poll;
1183 bzero(&event_poll, sizeof(event_poll));
1184 NDB_SOCKET_TYPE sock_fd = t->getSocket();
1185 int node_id = t->getRemoteNodeId();
1186 int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
1187 int ret_val, error;
1188
1189 if (!my_socket_valid(sock_fd))
1190 return FALSE;
1191
1192 event_poll.data.u32 = t->getRemoteNodeId();
1193 event_poll.events = EPOLLIN;
1194 ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
1195 if (!ret_val)
1196 goto ok;
1197 error= errno;
1198 if (error == ENOENT && !add)
1199 {
1200 /*
1201 * Could be that socket was closed premature to this call.
1202 * Not a problem that this occurs.
1203 */
1204 goto ok;
1205 }
1206 if (!add || (add && (error != ENOMEM)))
1207 {
1208 /*
1209 * Serious problems, we are either using wrong parameters,
1210 * have permission problems or the socket doesn't support
1211 * epoll!!
1212 */
1213 ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
1214 " node %u to epoll-set,"
1215 " errno: %u %s",
1216 add ? "ADD" : "DEL",
1217 m_epoll_fd,
1218 MY_SOCKET_FORMAT_VALUE(sock_fd),
1219 node_id,
1220 error,
1221 strerror(error));
1222 abort();
1223 }
1224 ndbout << "We lacked memory to add the socket for node id ";
1225 ndbout << node_id << endl;
1226 return TRUE;
1227
1228 ok:
1229 return FALSE;
1230 }
1231
1232 #endif
1233
1234 /**
1235 * In multi-threaded cases, this must be protected by a global receive lock.
1236 */
1237 void
performReceive()1238 TransporterRegistry::performReceive()
1239 {
1240 bool hasReceived = false;
1241
1242 if (m_has_data_transporters.get(0))
1243 {
1244 m_has_data_transporters.clear(Uint32(0));
1245 consume_extra_sockets();
1246 }
1247
1248 #ifdef ERROR_INSERT
1249 if (!m_blocked.isclear())
1250 {
1251 if (m_has_data_transporters.isclear())
1252 {
1253 /* poll sees data, but we want to ignore for now
1254 * sleep a little to avoid busy loop
1255 */
1256 NdbSleep_MilliSleep(1);
1257 }
1258 }
1259 #endif
1260
1261 #ifdef NDB_TCP_TRANSPORTER
1262 Uint32 id = 0;
1263 while ((id = m_has_data_transporters.find(id + 1)) != BitmaskImpl::NotFound)
1264 {
1265 bool hasdata = false;
1266 TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
1267 if (is_connected(id))
1268 {
1269 if (t->isConnected())
1270 {
1271 t->doReceive();
1272 if (hasReceived)
1273 callbackObj->checkJobBuffer();
1274 hasReceived = true;
1275 Uint32 * ptr;
1276 Uint32 sz = t->getReceiveData(&ptr);
1277 callbackObj->transporter_recv_from(id);
1278 Uint32 szUsed = unpack(ptr, sz, id, ioStates[id]);
1279 t->updateReceiveDataPtr(szUsed);
1280 hasdata = t->hasReceiveData();
1281 }
1282 }
1283 // If transporter still have data, make sure that it's remember to next time
1284 m_has_data_transporters.set(id, hasdata);
1285 }
1286 #endif
1287
1288 #ifdef NDB_SCI_TRANSPORTER
1289 //performReceive
1290 //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
1291 for (int i=0; i<nSCITransporters; i++)
1292 {
1293 SCI_Transporter *t = theSCITransporters[i];
1294 const NodeId nodeId = t->getRemoteNodeId();
1295 if(is_connected(nodeId))
1296 {
1297 if(t->isConnected() && t->checkConnected())
1298 {
1299 if (hasReceived)
1300 callbackObj->checkJobBuffer();
1301 hasReceived = true;
1302 Uint32 * readPtr, * eodPtr;
1303 t->getReceivePtr(&readPtr, &eodPtr);
1304 callbackObj->transporter_recv_from(nodeId);
1305 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1306 t->updateReceivePtr(newPtr);
1307 }
1308 }
1309 }
1310 #endif
1311 #ifdef NDB_SHM_TRANSPORTER
1312 for (int i=0; i<nSHMTransporters; i++)
1313 {
1314 SHM_Transporter *t = theSHMTransporters[i];
1315 const NodeId nodeId = t->getRemoteNodeId();
1316 if(is_connected(nodeId)){
1317 if(t->isConnected() && t->checkConnected())
1318 {
1319 if (hasReceived)
1320 callbackObj->checkJobBuffer();
1321 hasReceived = true;
1322 Uint32 * readPtr, * eodPtr;
1323 t->getReceivePtr(&readPtr, &eodPtr);
1324 callbackObj->transporter_recv_from(nodeId);
1325 Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
1326 t->updateReceivePtr(newPtr);
1327 }
1328 }
1329 }
1330 #endif
1331 }
1332
1333 /**
1334 * In multi-threaded cases, this must be protected by send lock (can use
1335 * different locks for each node).
1336 */
1337 int
performSend(NodeId nodeId)1338 TransporterRegistry::performSend(NodeId nodeId)
1339 {
1340 Transporter *t = get_transporter(nodeId);
1341 if (t && t->isConnected() && is_connected(nodeId))
1342 {
1343 return t->doSend();
1344 }
1345
1346 return 0;
1347 }
1348
1349 void
consume_extra_sockets()1350 TransporterRegistry::consume_extra_sockets()
1351 {
1352 char buf[4096];
1353 ssize_t ret;
1354 int err;
1355 NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
1356 do
1357 {
1358 ret = my_recv(sock, buf, sizeof(buf), 0);
1359 err = my_socket_errno();
1360 } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
1361 }
1362
1363 void
performSend()1364 TransporterRegistry::performSend()
1365 {
1366 int i;
1367 sendCounter = 1;
1368
1369 #ifdef NDB_TCP_TRANSPORTER
1370 for (i = m_transp_count; i < nTCPTransporters; i++)
1371 {
1372 TCP_Transporter *t = theTCPTransporters[i];
1373 if (t && t->has_data_to_send() &&
1374 t->isConnected() && is_connected(t->getRemoteNodeId()))
1375 {
1376 t->doSend();
1377 }
1378 }
1379 for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
1380 {
1381 TCP_Transporter *t = theTCPTransporters[i];
1382 if (t && t->has_data_to_send() &&
1383 t->isConnected() && is_connected(t->getRemoteNodeId()))
1384 {
1385 t->doSend();
1386 }
1387 }
1388 m_transp_count++;
1389 if (m_transp_count == nTCPTransporters) m_transp_count = 0;
1390 #endif
1391 #ifdef NDB_SCI_TRANSPORTER
1392 //scroll through the SCI transporters,
1393 // get each transporter, check if connected, send data
1394 for (i=0; i<nSCITransporters; i++) {
1395 SCI_Transporter *t = theSCITransporters[i];
1396 const NodeId nodeId = t->getRemoteNodeId();
1397
1398 if(is_connected(nodeId))
1399 {
1400 if(t->isConnected() && t->has_data_to_send())
1401 {
1402 t->doSend();
1403 } //if
1404 } //if
1405 }
1406 #endif
1407
1408 #ifdef NDB_SHM_TRANSPORTER
1409 for (i=0; i<nSHMTransporters; i++)
1410 {
1411 SHM_Transporter *t = theSHMTransporters[i];
1412 const NodeId nodeId = t->getRemoteNodeId();
1413 if(is_connected(nodeId))
1414 {
1415 if(t->isConnected())
1416 {
1417 t->doSend();
1418 }
1419 }
1420 }
1421 #endif
1422 }
1423
1424 int
forceSendCheck(int sendLimit)1425 TransporterRegistry::forceSendCheck(int sendLimit){
1426 int tSendCounter = sendCounter;
1427 sendCounter = tSendCounter + 1;
1428 if (tSendCounter >= sendLimit) {
1429 performSend();
1430 sendCounter = 1;
1431 return 1;
1432 }//if
1433 return 0;
1434 }//TransporterRegistry::forceSendCheck()
1435
1436 #ifdef DEBUG_TRANSPORTER
1437 void
printState()1438 TransporterRegistry::printState(){
1439 ndbout << "-- TransporterRegistry -- " << endl << endl
1440 << "Transporters = " << nTransporters << endl;
1441 for(int i = 0; i<maxTransporters; i++)
1442 if(theTransporters[i] != NULL){
1443 const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
1444 ndbout << "Transporter: " << remoteNodeId
1445 << " PerformState: " << performStates[remoteNodeId]
1446 << " IOState: " << ioStates[remoteNodeId] << endl;
1447 }
1448 }
1449 #endif
1450
1451 #ifdef ERROR_INSERT
1452 bool
isBlocked(NodeId nodeId)1453 TransporterRegistry::isBlocked(NodeId nodeId)
1454 {
1455 return m_blocked.get(nodeId);
1456 }
1457
1458 void
blockReceive(NodeId nodeId)1459 TransporterRegistry::blockReceive(NodeId nodeId)
1460 {
1461 /* Check that node is not already blocked?
1462 * Stop pulling from its socket (but track received data etc)
1463 */
1464 /* Shouldn't already be blocked with data */
1465 assert(!m_blocked.get(nodeId));
1466
1467 m_blocked.set(nodeId);
1468
1469 if (m_has_data_transporters.get(nodeId))
1470 {
1471 assert(!m_blocked_with_data.get(nodeId));
1472 m_blocked_with_data.set(nodeId);
1473 m_has_data_transporters.clear(nodeId);
1474 }
1475 }
1476
1477 void
unblockReceive(NodeId nodeId)1478 TransporterRegistry::unblockReceive(NodeId nodeId)
1479 {
1480 /* Check that node is blocked?
1481 * Resume pulling from its socket
1482 * Ensure in-flight data is processed if there was some
1483 */
1484 assert(m_blocked.get(nodeId));
1485 assert(!m_has_data_transporters.get(nodeId));
1486
1487 m_blocked.clear(nodeId);
1488
1489 if (m_blocked_with_data.get(nodeId))
1490 {
1491 m_has_data_transporters.set(nodeId);
1492 }
1493
1494 if (m_blocked_disconnected.get(nodeId))
1495 {
1496 /* Process disconnect notification/handling now */
1497 m_blocked_disconnected.clear(nodeId);
1498
1499 report_disconnect(nodeId, m_disconnect_errors[nodeId]);
1500 }
1501 }
1502 #endif
1503
1504 IOState
ioState(NodeId nodeId)1505 TransporterRegistry::ioState(NodeId nodeId) {
1506 return ioStates[nodeId];
1507 }
1508
1509 void
setIOState(NodeId nodeId,IOState state)1510 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
1511 if (ioStates[nodeId] == state)
1512 return;
1513
1514 DEBUG("TransporterRegistry::setIOState("
1515 << nodeId << ", " << state << ")");
1516
1517 ioStates[nodeId] = state;
1518 }
1519
1520 extern "C" void *
run_start_clients_C(void * me)1521 run_start_clients_C(void * me)
1522 {
1523 ((TransporterRegistry*) me)->start_clients_thread();
1524 return 0;
1525 }
1526
1527 /**
1528 * This method is used to initiate connection, called from the CMVMI blockx.
1529 *
1530 * This works asynchronously, no actions are taken directly in the calling
1531 * thread.
1532 */
1533 void
do_connect(NodeId node_id)1534 TransporterRegistry::do_connect(NodeId node_id)
1535 {
1536 PerformState &curr_state = performStates[node_id];
1537 switch(curr_state){
1538 case DISCONNECTED:
1539 break;
1540 case CONNECTED:
1541 return;
1542 case CONNECTING:
1543 return;
1544 case DISCONNECTING:
1545 break;
1546 }
1547 DBUG_ENTER("TransporterRegistry::do_connect");
1548 DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
1549
1550 /*
1551 No one else should be using the transporter now, reset
1552 its send buffer
1553 */
1554 callbackObj->reset_send_buffer(node_id);
1555
1556 curr_state= CONNECTING;
1557 DBUG_VOID_RETURN;
1558 }
1559
1560 /**
1561 * This method is used to initiate disconnect from CMVMI. It is also called
1562 * from the TCP transporter in case of an I/O error on the socket.
1563 *
1564 * This works asynchronously, similar to do_connect().
1565 */
1566 void
do_disconnect(NodeId node_id,int errnum)1567 TransporterRegistry::do_disconnect(NodeId node_id, int errnum)
1568 {
1569 PerformState &curr_state = performStates[node_id];
1570 switch(curr_state){
1571 case DISCONNECTED:
1572 return;
1573 case CONNECTED:
1574 break;
1575 case CONNECTING:
1576 break;
1577 case DISCONNECTING:
1578 return;
1579 }
1580 DBUG_ENTER("TransporterRegistry::do_disconnect");
1581 DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1582 curr_state= DISCONNECTING;
1583 m_disconnect_errnum[node_id] = errnum;
1584 DBUG_VOID_RETURN;
1585 }
1586
1587 void
report_connect(NodeId node_id)1588 TransporterRegistry::report_connect(NodeId node_id)
1589 {
1590 DBUG_ENTER("TransporterRegistry::report_connect");
1591 DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1592
1593 /*
1594 The send buffers was reset when this connection
1595 was set to CONNECTING. In order to make sure no stray
1596 signals has been written to the send buffer since then
1597 call 'reset_send_buffer' with the "should_be_empty" flag
1598 set
1599 */
1600 callbackObj->reset_send_buffer(node_id, true);
1601
1602 performStates[node_id] = CONNECTED;
1603 #if defined(HAVE_EPOLL_CREATE)
1604 if (likely(m_epoll_fd != -1))
1605 {
1606 if (change_epoll((TCP_Transporter*)theTransporters[node_id],
1607 TRUE))
1608 {
1609 performStates[node_id] = DISCONNECTING;
1610 DBUG_VOID_RETURN;
1611 }
1612 }
1613 #endif
1614 callbackObj->reportConnect(node_id);
1615 DBUG_VOID_RETURN;
1616 }
1617
1618 void
report_disconnect(NodeId node_id,int errnum)1619 TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
1620 {
1621 DBUG_ENTER("TransporterRegistry::report_disconnect");
1622 DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1623
1624 #ifdef ERROR_INSERT
1625 if (m_blocked.get(node_id))
1626 {
1627 /* We are simulating real latency, so control events experience
1628 * it too
1629 */
1630 m_blocked_disconnected.set(node_id);
1631 m_disconnect_errors[node_id] = errnum;
1632 DBUG_VOID_RETURN;
1633 }
1634 #endif
1635
1636 performStates[node_id] = DISCONNECTED;
1637 m_has_data_transporters.clear(node_id);
1638 callbackObj->reportDisconnect(node_id, errnum);
1639 DBUG_VOID_RETURN;
1640 }
1641
1642 /**
1643 * We only call TransporterCallback::reportError() from
1644 * TransporterRegistry::update_connections().
1645 *
1646 * In other places we call this method to enqueue the error that will later be
1647 * picked up by update_connections().
1648 */
1649 void
report_error(NodeId nodeId,TransporterError errorCode,const char * errorInfo)1650 TransporterRegistry::report_error(NodeId nodeId, TransporterError errorCode,
1651 const char *errorInfo)
1652 {
1653 if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
1654 m_error_states[nodeId].m_info == (const char *)~(UintPtr)0)
1655 {
1656 m_error_states[nodeId].m_code = errorCode;
1657 m_error_states[nodeId].m_info = errorInfo;
1658 }
1659 }
1660
1661 /**
1662 * update_connections(), together with the thread running in
1663 * start_clients_thread(), handle the state changes for transporters as they
1664 * connect and disconnect.
1665 */
1666 void
update_connections()1667 TransporterRegistry::update_connections()
1668 {
1669 for (int i= 0, n= 0; n < nTransporters; i++){
1670 Transporter * t = theTransporters[i];
1671 if (!t)
1672 continue;
1673 n++;
1674
1675 const NodeId nodeId = t->getRemoteNodeId();
1676
1677 TransporterError code = m_error_states[nodeId].m_code;
1678 const char *info = m_error_states[nodeId].m_info;
1679 if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
1680 {
1681 callbackObj->reportError(nodeId, code, info);
1682 m_error_states[nodeId].m_code = TE_NO_ERROR;
1683 m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
1684 }
1685
1686 switch(performStates[nodeId]){
1687 case CONNECTED:
1688 case DISCONNECTED:
1689 break;
1690 case CONNECTING:
1691 if(t->isConnected())
1692 report_connect(nodeId);
1693 break;
1694 case DISCONNECTING:
1695 if(!t->isConnected())
1696 report_disconnect(nodeId, m_disconnect_errnum[nodeId]);
1697 break;
1698 }
1699 }
1700 }
1701
1702 // run as own thread
1703 void
start_clients_thread()1704 TransporterRegistry::start_clients_thread()
1705 {
1706 int persist_mgm_count= 0;
1707 DBUG_ENTER("TransporterRegistry::start_clients_thread");
1708 while (m_run_start_clients_thread) {
1709 NdbSleep_MilliSleep(100);
1710 persist_mgm_count++;
1711 if(persist_mgm_count==50)
1712 {
1713 ndb_mgm_check_connection(m_mgm_handle);
1714 persist_mgm_count= 0;
1715 }
1716 for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
1717 Transporter * t = theTransporters[i];
1718 if (!t)
1719 continue;
1720 n++;
1721
1722 const NodeId nodeId = t->getRemoteNodeId();
1723 switch(performStates[nodeId]){
1724 case CONNECTING:
1725 if(!t->isConnected() && !t->isServer) {
1726 bool connected= false;
1727 /**
1728 * First, we try to connect (if we have a port number).
1729 */
1730
1731 if (t->get_s_port())
1732 {
1733 DBUG_PRINT("info", ("connecting to node %d using port %d",
1734 nodeId, t->get_s_port()));
1735 connected= t->connect_client();
1736 }
1737
1738 /**
1739 * If dynamic, get the port for connecting from the management server
1740 */
1741 if( !connected && t->get_s_port() <= 0) { // Port is dynamic
1742 int server_port= 0;
1743 struct ndb_mgm_reply mgm_reply;
1744
1745 DBUG_PRINT("info", ("connection to node %d should use "
1746 "dynamic port",
1747 nodeId));
1748
1749 if(!ndb_mgm_is_connected(m_mgm_handle))
1750 ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
1751
1752 if(ndb_mgm_is_connected(m_mgm_handle))
1753 {
1754 DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
1755 nodeId));
1756
1757 int res=
1758 ndb_mgm_get_connection_int_parameter(m_mgm_handle,
1759 t->getRemoteNodeId(),
1760 t->getLocalNodeId(),
1761 CFG_CONNECTION_SERVER_PORT,
1762 &server_port,
1763 &mgm_reply);
1764 DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
1765 server_port,t->getRemoteNodeId(),
1766 t->getLocalNodeId(),res));
1767 if( res >= 0 )
1768 {
1769 DBUG_PRINT("info", ("got port %d to use for connection to %d",
1770 server_port, nodeId));
1771 /**
1772 * Server_port == 0 just means that that a mgmt server
1773 * has not received a new port yet. Keep the old.
1774 */
1775 if (server_port)
1776 t->set_s_port(server_port);
1777 }
1778 else if(ndb_mgm_is_connected(m_mgm_handle))
1779 {
1780 DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
1781 res));
1782 g_eventLogger->info("Failed to get dynamic port, res: %d",
1783 res);
1784 ndb_mgm_disconnect(m_mgm_handle);
1785 }
1786 else
1787 {
1788 DBUG_PRINT("info", ("mgmd close connection early"));
1789 g_eventLogger->info
1790 ("Management server closed connection early. "
1791 "It is probably being shut down (or has problems). "
1792 "We will retry the connection. %d %s %s line: %d",
1793 ndb_mgm_get_latest_error(m_mgm_handle),
1794 ndb_mgm_get_latest_error_desc(m_mgm_handle),
1795 ndb_mgm_get_latest_error_msg(m_mgm_handle),
1796 ndb_mgm_get_latest_error_line(m_mgm_handle)
1797 );
1798 }
1799 }
1800 /** else
1801 * We will not be able to get a new port unless
1802 * the m_mgm_handle is connected. Note that not
1803 * being connected is an ok state, just continue
1804 * until it is able to connect. Continue using the
1805 * old port until we can connect again and get a
1806 * new port.
1807 */
1808 }
1809 }
1810 break;
1811 case DISCONNECTING:
1812 if(t->isConnected())
1813 t->doDisconnect();
1814 break;
1815 case DISCONNECTED:
1816 {
1817 if (t->isConnected())
1818 {
1819 g_eventLogger->warning("Found connection to %u in state DISCONNECTED "
1820 " while being connected, disconnecting!",
1821 t->getRemoteNodeId());
1822 t->doDisconnect();
1823 }
1824 break;
1825 }
1826 default:
1827 break;
1828 }
1829 }
1830 }
1831 DBUG_VOID_RETURN;
1832 }
1833
1834 struct NdbThread*
start_clients()1835 TransporterRegistry::start_clients()
1836 {
1837 m_run_start_clients_thread= true;
1838 m_start_clients_thread= NdbThread_Create(run_start_clients_C,
1839 (void**)this,
1840 0, // default stack size
1841 "ndb_start_clients",
1842 NDB_THREAD_PRIO_LOW);
1843 if (m_start_clients_thread == 0)
1844 {
1845 m_run_start_clients_thread= false;
1846 }
1847 return m_start_clients_thread;
1848 }
1849
1850 bool
stop_clients()1851 TransporterRegistry::stop_clients()
1852 {
1853 if (m_start_clients_thread) {
1854 m_run_start_clients_thread= false;
1855 void* status;
1856 NdbThread_WaitFor(m_start_clients_thread, &status);
1857 NdbThread_Destroy(&m_start_clients_thread);
1858 }
1859 return true;
1860 }
1861
1862 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)1863 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
1864 const char *interf,
1865 int s_port)
1866 {
1867 DBUG_ENTER("TransporterRegistry::add_transporter_interface");
1868 DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
1869 if (interf && strlen(interf) == 0)
1870 interf= 0;
1871
1872 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1873 {
1874 Transporter_interface &tmp= m_transporter_interface[i];
1875 if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
1876 continue;
1877 if (interf != 0 && tmp.m_interface != 0 &&
1878 strcmp(interf, tmp.m_interface) == 0)
1879 {
1880 DBUG_VOID_RETURN; // found match, no need to insert
1881 }
1882 if (interf == 0 && tmp.m_interface == 0)
1883 {
1884 DBUG_VOID_RETURN; // found match, no need to insert
1885 }
1886 }
1887 Transporter_interface t;
1888 t.m_remote_nodeId= remoteNodeId;
1889 t.m_s_service_port= s_port;
1890 t.m_interface= interf;
1891 m_transporter_interface.push_back(t);
1892 DBUG_PRINT("exit",("interface and port added"));
1893 DBUG_VOID_RETURN;
1894 }
1895
1896 bool
start_service(SocketServer & socket_server)1897 TransporterRegistry::start_service(SocketServer& socket_server)
1898 {
1899 DBUG_ENTER("TransporterRegistry::start_service");
1900 if (m_transporter_interface.size() > 0 &&
1901 localNodeId == 0)
1902 {
1903 g_eventLogger->error("INTERNAL ERROR: not initialized");
1904 DBUG_RETURN(false);
1905 }
1906
1907 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
1908 {
1909 Transporter_interface &t= m_transporter_interface[i];
1910
1911 unsigned short port= (unsigned short)t.m_s_service_port;
1912 if(t.m_s_service_port<0)
1913 port= -t.m_s_service_port; // is a dynamic port
1914 TransporterService *transporter_service =
1915 new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
1916 if(!socket_server.setup(transporter_service,
1917 &port, t.m_interface))
1918 {
1919 DBUG_PRINT("info", ("Trying new port"));
1920 port= 0;
1921 if(t.m_s_service_port>0
1922 || !socket_server.setup(transporter_service,
1923 &port, t.m_interface))
1924 {
1925 /*
1926 * If it wasn't a dynamically allocated port, or
1927 * our attempts at getting a new dynamic port failed
1928 */
1929 g_eventLogger->error("Unable to setup transporter service port: %s:%d!\n"
1930 "Please check if the port is already used,\n"
1931 "(perhaps the node is already running)",
1932 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
1933 delete transporter_service;
1934 DBUG_RETURN(false);
1935 }
1936 }
1937 t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
1938 DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
1939 transporter_service->setTransporterRegistry(this);
1940 }
1941 DBUG_RETURN(true);
1942 }
1943
1944 #ifdef NDB_SHM_TRANSPORTER
1945 extern "C"
1946 RETSIGTYPE
shm_sig_handler(int signo)1947 shm_sig_handler(int signo)
1948 {
1949 g_shm_counter++;
1950 }
1951 #endif
1952
1953 void
startReceiving()1954 TransporterRegistry::startReceiving()
1955 {
1956 DBUG_ENTER("TransporterRegistry::startReceiving");
1957
1958 #ifdef NDB_SHM_TRANSPORTER
1959 m_shm_own_pid = getpid();
1960 if (g_ndb_shm_signum)
1961 {
1962 DBUG_PRINT("info",("Install signal handler for signum %d",
1963 g_ndb_shm_signum));
1964 struct sigaction sa;
1965 NdbThread_set_shm_sigmask(FALSE);
1966 sigemptyset(&sa.sa_mask);
1967 sa.sa_handler = shm_sig_handler;
1968 sa.sa_flags = 0;
1969 int ret;
1970 while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR)
1971 ;
1972 if(ret != 0)
1973 {
1974 DBUG_PRINT("error",("Install failed"));
1975 g_eventLogger->error("Failed to install signal handler for"
1976 " SHM transporter, signum %d, errno: %d (%s)",
1977 g_ndb_shm_signum, errno, strerror(errno));
1978 }
1979 }
1980 #endif // NDB_SHM_TRANSPORTER
1981 DBUG_VOID_RETURN;
1982 }
1983
1984 void
stopReceiving()1985 TransporterRegistry::stopReceiving(){
1986 /**
1987 * Disconnect all transporters, this includes detach from remote node
1988 * and since that must be done from the same process that called attach
1989 * it's done here in the receive thread
1990 */
1991 disconnectAll();
1992 }
1993
1994 void
startSending()1995 TransporterRegistry::startSending(){
1996 }
1997
1998 void
stopSending()1999 TransporterRegistry::stopSending(){
2000 }
2001
operator <<(NdbOut & out,SignalHeader & sh)2002 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
2003 out << "-- Signal Header --" << endl;
2004 out << "theLength: " << sh.theLength << endl;
2005 out << "gsn: " << sh.theVerId_signalNumber << endl;
2006 out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
2007 out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
2008 out << "sendersSig: " << sh.theSendersSignalId << endl;
2009 out << "theSignalId: " << sh.theSignalId << endl;
2010 out << "trace: " << (int)sh.theTrace << endl;
2011 return out;
2012 }
2013
2014 Transporter*
get_transporter(NodeId nodeId)2015 TransporterRegistry::get_transporter(NodeId nodeId) {
2016 assert(nodeId < maxTransporters);
2017 return theTransporters[nodeId];
2018 }
2019
2020
connect_client(NdbMgmHandle * h)2021 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
2022 {
2023 DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
2024
2025 Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
2026
2027 if(!mgm_nodeid)
2028 {
2029 g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2030 return false;
2031 }
2032 Transporter * t = theTransporters[mgm_nodeid];
2033 if (!t)
2034 {
2035 g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2036 return false;
2037 }
2038
2039 bool res = t->connect_client(connect_ndb_mgmd(h));
2040 if (res == true)
2041 {
2042 performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
2043 }
2044 DBUG_RETURN(res);
2045 }
2046
2047
2048
2049 /**
2050 * Given a connected NdbMgmHandle, turns it into a transporter
2051 * and returns the socket.
2052 */
connect_ndb_mgmd(NdbMgmHandle * h)2053 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
2054 {
2055 struct ndb_mgm_reply mgm_reply;
2056 NDB_SOCKET_TYPE sockfd;
2057 my_socket_invalidate(&sockfd);
2058
2059 DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
2060
2061 if ( h==NULL || *h == NULL )
2062 {
2063 g_eventLogger->error("Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
2064 DBUG_RETURN(sockfd);
2065 }
2066
2067 for(unsigned int i=0;i < m_transporter_interface.size();i++)
2068 {
2069 if (m_transporter_interface[i].m_s_service_port >= 0)
2070 continue;
2071
2072 DBUG_PRINT("info", ("Setting dynamic port %d for connection from node %d",
2073 m_transporter_interface[i].m_s_service_port,
2074 m_transporter_interface[i].m_remote_nodeId));
2075
2076 if (ndb_mgm_set_connection_int_parameter(*h,
2077 localNodeId,
2078 m_transporter_interface[i].m_remote_nodeId,
2079 CFG_CONNECTION_SERVER_PORT,
2080 m_transporter_interface[i].m_s_service_port,
2081 &mgm_reply) < 0)
2082 {
2083 g_eventLogger->error("Could not set dynamic port for %d->%d (%s:%d)",
2084 localNodeId,
2085 m_transporter_interface[i].m_remote_nodeId,
2086 __FILE__, __LINE__);
2087 ndb_mgm_destroy_handle(h);
2088 DBUG_RETURN(sockfd);
2089 }
2090 }
2091
2092 /**
2093 * convert_to_transporter also disposes of the handle (i.e. we don't leak
2094 * memory here.
2095 */
2096 DBUG_PRINT("info", ("Converting handle to transporter"));
2097 sockfd= ndb_mgm_convert_to_transporter(h);
2098 if (!my_socket_valid(sockfd))
2099 {
2100 g_eventLogger->error("Failed to convert to transporter (%s: %d)",
2101 __FILE__, __LINE__);
2102 ndb_mgm_destroy_handle(h);
2103 }
2104 DBUG_RETURN(sockfd);
2105 }
2106
2107 /**
2108 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
2109 * and returns the socket.
2110 */
connect_ndb_mgmd(SocketClient * sc)2111 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(SocketClient *sc)
2112 {
2113 NdbMgmHandle h= ndb_mgm_create_handle();
2114 NDB_SOCKET_TYPE s;
2115 my_socket_invalidate(&s);
2116
2117 DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
2118
2119 if ( h == NULL )
2120 {
2121 DBUG_RETURN(s);
2122 }
2123
2124 /**
2125 * Set connectstring
2126 */
2127 {
2128 BaseString cs;
2129 cs.assfmt("%s:%u",sc->get_server_name(),sc->get_port());
2130 ndb_mgm_set_connectstring(h, cs.c_str());
2131 }
2132
2133 if(ndb_mgm_connect(h, 0, 0, 0)<0)
2134 {
2135 DBUG_PRINT("info", ("connection to mgmd failed"));
2136 ndb_mgm_destroy_handle(&h);
2137 DBUG_RETURN(s);
2138 }
2139
2140 DBUG_RETURN(connect_ndb_mgmd(&h));
2141 }
2142
2143 /**
2144 * Default implementation of transporter send buffer handler.
2145 */
2146
2147 Uint32 *
getWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2148 TransporterRegistry::getWritePtr(TransporterSendBufferHandle *handle,
2149 NodeId node, Uint32 lenBytes, Uint32 prio)
2150 {
2151 Transporter *t = theTransporters[node];
2152 Uint32 *insertPtr = handle->getWritePtr(node, lenBytes, prio,
2153 t->get_max_send_buffer());
2154
2155 if (insertPtr == 0) {
2156 //-------------------------------------------------
2157 // Buffer was completely full. We have severe problems.
2158 // We will attempt to wait for a small time
2159 //-------------------------------------------------
2160 if(t->send_is_possible(10)) {
2161 //-------------------------------------------------
2162 // Send is possible after the small timeout.
2163 //-------------------------------------------------
2164 if(!handle->forceSend(node)){
2165 return 0;
2166 } else {
2167 //-------------------------------------------------
2168 // Since send was successful we will make a renewed
2169 // attempt at inserting the signal into the buffer.
2170 //-------------------------------------------------
2171 insertPtr = handle->getWritePtr(node, lenBytes, prio,
2172 t->get_max_send_buffer());
2173 }//if
2174 } else {
2175 return 0;
2176 }//if
2177 }
2178 return insertPtr;
2179 }
2180
2181 void
updateWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2182 TransporterRegistry::updateWritePtr(TransporterSendBufferHandle *handle,
2183 NodeId node, Uint32 lenBytes, Uint32 prio)
2184 {
2185 Transporter *t = theTransporters[node];
2186
2187 Uint32 used = handle->updateWritePtr(node, lenBytes, prio);
2188 t->update_status_overloaded(used);
2189
2190 if(t->send_limit_reached(used)) {
2191 //-------------------------------------------------
2192 // Buffer is full and we are ready to send. We will
2193 // not wait since the signal is already in the buffer.
2194 // Force flag set has the same indication that we
2195 // should always send. If it is not possible to send
2196 // we will not worry since we will soon be back for
2197 // a renewed trial.
2198 //-------------------------------------------------
2199 if(t->send_is_possible(0)) {
2200 //-------------------------------------------------
2201 // Send was possible, attempt at a send.
2202 //-------------------------------------------------
2203 handle->forceSend(node);
2204 }//if
2205 }
2206 }
2207
2208 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)2209 TransporterRegistry::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
2210 Uint32 max)
2211 {
2212 assert(m_use_default_send_buffer);
2213
2214 if (max == 0)
2215 return 0;
2216
2217 Uint32 count = 0;
2218 SendBuffer *b = m_send_buffers + node;
2219 SendBufferPage *page = b->m_first_page;
2220 while (page != NULL && count < max)
2221 {
2222 dst[count].iov_base = page->m_data+page->m_start;
2223 dst[count].iov_len = page->m_bytes;
2224 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2225 page = page->m_next;
2226 count++;
2227 }
2228
2229 return count;
2230 }
2231
2232 Uint32
bytes_sent(NodeId node,Uint32 bytes)2233 TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
2234 {
2235 assert(m_use_default_send_buffer);
2236
2237 SendBuffer *b = m_send_buffers + node;
2238 Uint32 used_bytes = b->m_used_bytes;
2239
2240 if (bytes == 0)
2241 return used_bytes;
2242
2243 used_bytes -= bytes;
2244 b->m_used_bytes = used_bytes;
2245
2246 SendBufferPage *page = b->m_first_page;
2247 while (bytes && bytes >= page->m_bytes)
2248 {
2249 SendBufferPage * tmp = page;
2250 bytes -= page->m_bytes;
2251 page = page->m_next;
2252 release_page(tmp);
2253 }
2254
2255 if (used_bytes == 0)
2256 {
2257 b->m_first_page = 0;
2258 b->m_last_page = 0;
2259 }
2260 else
2261 {
2262 page->m_start += bytes;
2263 page->m_bytes -= bytes;
2264 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2265 b->m_first_page = page;
2266 }
2267
2268 return used_bytes;
2269 }
2270
2271 bool
has_data_to_send(NodeId node)2272 TransporterRegistry::has_data_to_send(NodeId node)
2273 {
2274 assert(m_use_default_send_buffer);
2275
2276 SendBuffer *b = m_send_buffers + node;
2277 return (b->m_first_page != NULL && b->m_first_page->m_bytes);
2278 }
2279
2280 void
reset_send_buffer(NodeId node,bool should_be_empty)2281 TransporterRegistry::reset_send_buffer(NodeId node, bool should_be_empty)
2282 {
2283 assert(m_use_default_send_buffer);
2284
2285 // Make sure that buffer is already empty if the "should_be_empty"
2286 // flag is set. This is done to quickly catch any stray signals
2287 // written to the send buffer while not being connected
2288 if (should_be_empty && !has_data_to_send(node))
2289 return;
2290 assert(!should_be_empty);
2291
2292 SendBuffer *b = m_send_buffers + node;
2293 SendBufferPage *page = b->m_first_page;
2294 while (page != NULL)
2295 {
2296 SendBufferPage *next = page->m_next;
2297 release_page(page);
2298 page = next;
2299 }
2300 b->m_first_page = NULL;
2301 b->m_last_page = NULL;
2302 b->m_used_bytes = 0;
2303 }
2304
2305 TransporterRegistry::SendBufferPage *
alloc_page()2306 TransporterRegistry::alloc_page()
2307 {
2308 SendBufferPage *page = m_page_freelist;
2309 if (page != NULL)
2310 {
2311 m_page_freelist = page->m_next;
2312 return page;
2313 }
2314
2315 ndbout << "ERROR: out of send buffers in kernel." << endl;
2316 return NULL;
2317 }
2318
2319 void
release_page(SendBufferPage * page)2320 TransporterRegistry::release_page(SendBufferPage *page)
2321 {
2322 assert(page != NULL);
2323 page->m_next = m_page_freelist;
2324 m_page_freelist = page;
2325 }
2326
2327 Uint32 *
getWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio,Uint32 max_use)2328 TransporterRegistry::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
2329 Uint32 max_use)
2330 {
2331 assert(m_use_default_send_buffer);
2332
2333 SendBuffer *b = m_send_buffers + node;
2334
2335 /* First check if we have room in already allocated page. */
2336 SendBufferPage *page = b->m_last_page;
2337 if (page != NULL && page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
2338 {
2339 return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
2340 }
2341
2342 if (b->m_used_bytes + lenBytes > max_use)
2343 return NULL;
2344
2345 /* Allocate a new page. */
2346 page = alloc_page();
2347 if (page == NULL)
2348 return NULL;
2349 page->m_next = NULL;
2350 page->m_bytes = 0;
2351 page->m_start = 0;
2352
2353 if (b->m_last_page == NULL)
2354 {
2355 b->m_first_page = page;
2356 b->m_last_page = page;
2357 }
2358 else
2359 {
2360 assert(b->m_first_page != NULL);
2361 b->m_last_page->m_next = page;
2362 b->m_last_page = page;
2363 }
2364 return (Uint32 *)(page->m_data);
2365 }
2366
2367 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)2368 TransporterRegistry::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2369 {
2370 assert(m_use_default_send_buffer);
2371
2372 SendBuffer *b = m_send_buffers + node;
2373 SendBufferPage *page = b->m_last_page;
2374 assert(page != NULL);
2375 assert(page->m_bytes + lenBytes <= page->max_data_bytes());
2376 page->m_bytes += lenBytes;
2377 b->m_used_bytes += lenBytes;
2378 return b->m_used_bytes;
2379 }
2380
2381 bool
forceSend(NodeId node)2382 TransporterRegistry::forceSend(NodeId node)
2383 {
2384 Transporter *t = get_transporter(node);
2385 if (t)
2386 return t->doSend();
2387 else
2388 return false;
2389 }
2390
2391
2392 void
print_transporters(const char * where,NdbOut & out)2393 TransporterRegistry::print_transporters(const char* where, NdbOut& out)
2394 {
2395 out << where << " >>" << endl;
2396
2397 for(unsigned i = 0; i < maxTransporters; i++){
2398 if(theTransporters[i] == NULL)
2399 continue;
2400
2401 const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
2402
2403 out << i << " "
2404 << getPerformStateString(remoteNodeId) << " to node: "
2405 << remoteNodeId << " at "
2406 << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
2407 }
2408
2409 out << "<<" << endl;
2410
2411 for (size_t i= 0; i < m_transporter_interface.size(); i++){
2412 Transporter_interface tf= m_transporter_interface[i];
2413
2414 out << i
2415 << " remote node: " << tf.m_remote_nodeId
2416 << " port: " << tf.m_s_service_port
2417 << " interface: " << tf.m_interface << endl;
2418 }
2419 }
2420
2421
2422 template class Vector<TransporterRegistry::Transporter_interface>;
2423