1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include <TransporterRegistry.hpp>
28 #include "TransporterInternalDefinitions.hpp"
29
30 #include "Transporter.hpp"
31 #include <SocketAuthenticator.hpp>
32
33 #ifdef NDB_TCP_TRANSPORTER
34 #include "TCP_Transporter.hpp"
35 #include "Loopback_Transporter.hpp"
36 #endif
37
38 #ifdef NDB_SCI_TRANSPORTER
39 #include "SCI_Transporter.hpp"
40 #endif
41
42 #ifdef NDB_SHM_TRANSPORTER
43 #include "SHM_Transporter.hpp"
44 extern int g_ndb_shm_signum;
45 #endif
46
47 #include "NdbOut.hpp"
48 #include <NdbSleep.h>
49 #include <InputStream.hpp>
50 #include <OutputStream.hpp>
51 #include <socket_io.h>
52
53 #include <mgmapi/mgmapi.h>
54 #include <mgmapi_internal.h>
55 #include <mgmapi/mgmapi_debug.h>
56
57 #include <EventLogger.hpp>
58 extern EventLogger * g_eventLogger;
59
60 /**
61 * There is a requirement in the Transporter design that
62 * ::performReceive() and ::update_connections()
63 * on the same 'TransporterReceiveHandle' should not be
64 * run concurrently. class TransporterReceiveWatchdog provides a
65 * simple mechanism to assert that this rule is followed.
66 * Does nothing if NDEBUG is defined (in production code)
67 */
68 class TransporterReceiveWatchdog
69 {
70 public:
71 #if NDEBUG
TransporterReceiveWatchdog(TransporterReceiveHandle & recvdata)72 TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
73 {}
74
75 #else
76 TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
77 : m_recvdata(recvdata)
78 {
79 assert(m_recvdata.m_active == false);
80 m_recvdata.m_active = true;
81 }
82
83 ~TransporterReceiveWatchdog()
84 {
85 assert(m_recvdata.m_active == true);
86 m_recvdata.m_active = false;
87 }
88
89 private:
90 TransporterReceiveHandle& m_recvdata;
91 #endif
92 };
93
94
95 struct in_addr
get_connect_address(NodeId node_id) const96 TransporterRegistry::get_connect_address(NodeId node_id) const
97 {
98 return theTransporters[node_id]->m_connect_address;
99 }
100
101 Uint64
get_bytes_sent(NodeId node_id) const102 TransporterRegistry::get_bytes_sent(NodeId node_id) const
103 {
104 return theTransporters[node_id]->m_bytes_sent;
105 }
106
107 Uint64
get_bytes_received(NodeId node_id) const108 TransporterRegistry::get_bytes_received(NodeId node_id) const
109 {
110 return theTransporters[node_id]->m_bytes_received;
111 }
112
newSession(NDB_SOCKET_TYPE sockfd)113 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
114 {
115 DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
116 if (m_auth && !m_auth->server_authenticate(sockfd))
117 {
118 ndb_socket_close(sockfd, true); // Close with reset
119 DBUG_RETURN(0);
120 }
121
122 BaseString msg;
123 bool close_with_reset = true;
124 if (!m_transporter_registry->connect_server(sockfd, msg, close_with_reset))
125 {
126 ndb_socket_close(sockfd, close_with_reset);
127 DBUG_RETURN(0);
128 }
129
130 DBUG_RETURN(0);
131 }
132
TransporterReceiveData()133 TransporterReceiveData::TransporterReceiveData()
134 {
135 /**
136 * With multi receiver threads
137 * an interface to reassign these is needed...
138 */
139 m_transporters.set(); // Handle all
140 m_transporters.clear(Uint32(0)); // Except wakeup socket...
141 m_handled_transporters.clear();
142
143 #if defined(HAVE_EPOLL_CREATE)
144 m_epoll_fd = -1;
145 m_epoll_events = 0;
146 #endif
147 }
148
149 bool
init(unsigned maxTransporters)150 TransporterReceiveData::init(unsigned maxTransporters)
151 {
152 maxTransporters += 1; /* wakeup socket */
153 #if defined(HAVE_EPOLL_CREATE)
154 m_epoll_fd = epoll_create(maxTransporters);
155 if (m_epoll_fd == -1)
156 {
157 perror("epoll_create failed... falling back to select!");
158 goto fallback;
159 }
160 m_epoll_events = new struct epoll_event[maxTransporters];
161 if (m_epoll_events == 0)
162 {
163 perror("Failed to alloc epoll-array... falling back to select!");
164 close(m_epoll_fd);
165 m_epoll_fd = -1;
166 goto fallback;
167 }
168 bzero(m_epoll_events, maxTransporters * sizeof(struct epoll_event));
169 return true;
170 fallback:
171 #endif
172 return m_socket_poller.set_max_count(maxTransporters);
173 }
174
175 bool
epoll_add(TCP_Transporter * t)176 TransporterReceiveData::epoll_add(TCP_Transporter *t)
177 {
178 assert(m_transporters.get(t->getRemoteNodeId()));
179 #if defined(HAVE_EPOLL_CREATE)
180 if (m_epoll_fd != -1)
181 {
182 bool add = true;
183 struct epoll_event event_poll;
184 bzero(&event_poll, sizeof(event_poll));
185 NDB_SOCKET_TYPE sock_fd = t->getSocket();
186 int node_id = t->getRemoteNodeId();
187 int op = EPOLL_CTL_ADD;
188 int ret_val, error;
189
190 if (!my_socket_valid(sock_fd))
191 return FALSE;
192
193 event_poll.data.u32 = t->getRemoteNodeId();
194 event_poll.events = EPOLLIN;
195 ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
196 if (!ret_val)
197 goto ok;
198 error= errno;
199 if (error == ENOENT && !add)
200 {
201 /*
202 * Could be that socket was closed premature to this call.
203 * Not a problem that this occurs.
204 */
205 goto ok;
206 }
207 if (!add || (add && (error != ENOMEM)))
208 {
209 /*
210 * Serious problems, we are either using wrong parameters,
211 * have permission problems or the socket doesn't support
212 * epoll!!
213 */
214 ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
215 " node %u to epoll-set,"
216 " errno: %u %s",
217 add ? "ADD" : "DEL",
218 m_epoll_fd,
219 MY_SOCKET_FORMAT_VALUE(sock_fd),
220 node_id,
221 error,
222 strerror(error));
223 abort();
224 }
225 ndbout << "We lacked memory to add the socket for node id ";
226 ndbout << node_id << endl;
227 return false;
228 }
229
230 ok:
231 #endif
232 return true;
233 }
234
~TransporterReceiveData()235 TransporterReceiveData::~TransporterReceiveData()
236 {
237 #if defined(HAVE_EPOLL_CREATE)
238 if (m_epoll_fd != -1)
239 {
240 close(m_epoll_fd);
241 m_epoll_fd = -1;
242 }
243
244 if (m_epoll_events)
245 {
246 delete [] m_epoll_events;
247 m_epoll_events = 0;
248 }
249 #endif
250 }
251
TransporterRegistry(TransporterCallback * callback,TransporterReceiveHandle * recvHandle,bool use_default_send_buffer,unsigned _maxTransporters)252 TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
253 TransporterReceiveHandle * recvHandle,
254 bool use_default_send_buffer,
255 unsigned _maxTransporters) :
256 m_mgm_handle(0),
257 localNodeId(0),
258 connectBackoffMaxTime(0),
259 m_transp_count(0),
260 m_use_default_send_buffer(use_default_send_buffer),
261 m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
262 m_total_max_send_buffer(0)
263 {
264 DBUG_ENTER("TransporterRegistry::TransporterRegistry");
265
266 receiveHandle = recvHandle;
267 maxTransporters = _maxTransporters;
268 sendCounter = 1;
269
270 callbackObj=callback;
271
272 theTCPTransporters = new TCP_Transporter * [maxTransporters];
273 theSCITransporters = new SCI_Transporter * [maxTransporters];
274 theSHMTransporters = new SHM_Transporter * [maxTransporters];
275 theTransporterTypes = new TransporterType [maxTransporters];
276 theTransporters = new Transporter * [maxTransporters];
277 performStates = new PerformState [maxTransporters];
278 ioStates = new IOState [maxTransporters];
279 peerUpIndicators = new bool [maxTransporters];
280 connectingTime = new Uint32 [maxTransporters];
281 m_disconnect_errnum = new int [maxTransporters];
282 m_error_states = new ErrorState [maxTransporters];
283
284 m_has_extra_wakeup_socket = false;
285
286 #ifdef ERROR_INSERT
287 m_blocked.clear();
288 m_blocked_disconnected.clear();
289
290 m_mixology_level = 0;
291 #endif
292
293 // Initialize member variables
294 nTransporters = 0;
295 nTCPTransporters = 0;
296 nSCITransporters = 0;
297 nSHMTransporters = 0;
298
299 // Initialize the transporter arrays
300 ErrorState default_error_state = { TE_NO_ERROR, (const char *)~(UintPtr)0 };
301 for (unsigned i=0; i<maxTransporters; i++) {
302 theTCPTransporters[i] = NULL;
303 theSCITransporters[i] = NULL;
304 theSHMTransporters[i] = NULL;
305 theTransporters[i] = NULL;
306 performStates[i] = DISCONNECTED;
307 ioStates[i] = NoHalt;
308 peerUpIndicators[i] = true; // Assume all nodes are up, will be
309 // cleared at first connect attempt
310 connectingTime[i] = 0;
311 m_disconnect_errnum[i]= 0;
312 m_error_states[i] = default_error_state;
313 }
314
315 DBUG_VOID_RETURN;
316 }
317
318 #define MIN_SEND_BUFFER_SIZE (4 * 1024 * 1024)
319
320 void
allocate_send_buffers(Uint64 total_send_buffer,Uint64 extra_send_buffer)321 TransporterRegistry::allocate_send_buffers(Uint64 total_send_buffer,
322 Uint64 extra_send_buffer)
323 {
324 if (!m_use_default_send_buffer)
325 return;
326
327 if (total_send_buffer == 0)
328 total_send_buffer = get_total_max_send_buffer();
329
330 total_send_buffer += extra_send_buffer;
331
332 if (!extra_send_buffer)
333 {
334 /**
335 * If extra send buffer memory is 0 it means we can decide on an
336 * appropriate value for it. We select to always ensure that the
337 * minimum send buffer memory is 4M, otherwise we simply don't
338 * add any extra send buffer memory at all.
339 */
340 if (total_send_buffer < MIN_SEND_BUFFER_SIZE)
341 {
342 total_send_buffer = (Uint64)MIN_SEND_BUFFER_SIZE;
343 }
344 }
345
346 if (m_send_buffers)
347 {
348 /* Send buffers already allocated -> resize the buffer pages */
349 assert(m_send_buffer_memory);
350
351 // TODO resize send buffer pages
352
353 return;
354 }
355
356 /* Initialize transporter send buffers (initially empty). */
357 m_send_buffers = new SendBuffer[maxTransporters];
358 for (unsigned i = 0; i < maxTransporters; i++)
359 {
360 SendBuffer &b = m_send_buffers[i];
361 b.m_first_page = NULL;
362 b.m_last_page = NULL;
363 b.m_used_bytes = 0;
364 }
365
366 /* Initialize the page freelist. */
367 Uint64 send_buffer_pages =
368 (total_send_buffer + SendBufferPage::PGSIZE - 1)/SendBufferPage::PGSIZE;
369 /* Add one extra page of internal fragmentation overhead per transporter. */
370 send_buffer_pages += nTransporters;
371
372 m_send_buffer_memory =
373 new unsigned char[UintPtr(send_buffer_pages * SendBufferPage::PGSIZE)];
374 if (m_send_buffer_memory == NULL)
375 {
376 ndbout << "Unable to allocate "
377 << send_buffer_pages * SendBufferPage::PGSIZE
378 << " bytes of memory for send buffers, aborting." << endl;
379 abort();
380 }
381
382 m_page_freelist = NULL;
383 for (unsigned i = 0; i < send_buffer_pages; i++)
384 {
385 SendBufferPage *page =
386 (SendBufferPage *)(m_send_buffer_memory + i * SendBufferPage::PGSIZE);
387 page->m_bytes = 0;
388 page->m_next = m_page_freelist;
389 m_page_freelist = page;
390 }
391 m_tot_send_buffer_memory = SendBufferPage::PGSIZE * send_buffer_pages;
392 m_tot_used_buffer_memory = 0;
393 }
394
set_mgm_handle(NdbMgmHandle h)395 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
396 {
397 DBUG_ENTER("TransporterRegistry::set_mgm_handle");
398 if (m_mgm_handle)
399 ndb_mgm_destroy_handle(&m_mgm_handle);
400 m_mgm_handle= h;
401 ndb_mgm_set_timeout(m_mgm_handle, 5000);
402 #ifndef NDEBUG
403 if (h)
404 {
405 char buf[256];
406 DBUG_PRINT("info",("handle set with connectstring: %s",
407 ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
408 }
409 else
410 {
411 DBUG_PRINT("info",("handle set to NULL"));
412 }
413 #endif
414 DBUG_VOID_RETURN;
415 }
416
~TransporterRegistry()417 TransporterRegistry::~TransporterRegistry()
418 {
419 DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
420
421 disconnectAll();
422 removeAll();
423
424 delete[] theTCPTransporters;
425 delete[] theSCITransporters;
426 delete[] theSHMTransporters;
427 delete[] theTransporterTypes;
428 delete[] theTransporters;
429 delete[] performStates;
430 delete[] ioStates;
431 delete[] peerUpIndicators;
432 delete[] connectingTime;
433 delete[] m_disconnect_errnum;
434 delete[] m_error_states;
435
436 if (m_send_buffers)
437 delete[] m_send_buffers;
438 m_page_freelist = NULL;
439 if (m_send_buffer_memory)
440 delete[] m_send_buffer_memory;
441
442 if (m_mgm_handle)
443 ndb_mgm_destroy_handle(&m_mgm_handle);
444
445 if (m_has_extra_wakeup_socket)
446 {
447 my_socket_close(m_extra_wakeup_sockets[0]);
448 my_socket_close(m_extra_wakeup_sockets[1]);
449 }
450
451 DBUG_VOID_RETURN;
452 }
453
454 void
removeAll()455 TransporterRegistry::removeAll(){
456 for(unsigned i = 0; i<maxTransporters; i++){
457 if(theTransporters[i] != NULL)
458 removeTransporter(theTransporters[i]->getRemoteNodeId());
459 }
460 }
461
462 void
disconnectAll()463 TransporterRegistry::disconnectAll(){
464 for(unsigned i = 0; i<maxTransporters; i++){
465 if(theTransporters[i] != NULL)
466 theTransporters[i]->doDisconnect();
467 }
468 }
469
470 bool
init(NodeId nodeId)471 TransporterRegistry::init(NodeId nodeId) {
472 DBUG_ENTER("TransporterRegistry::init");
473 assert(localNodeId == 0 ||
474 localNodeId == nodeId);
475
476 localNodeId = nodeId;
477
478 DEBUG("TransporterRegistry started node: " << localNodeId);
479
480 if (receiveHandle)
481 {
482 if (!init(* receiveHandle))
483 DBUG_RETURN(false);
484 }
485
486 DBUG_RETURN(true);
487 }
488
489 bool
init(TransporterReceiveHandle & recvhandle)490 TransporterRegistry::init(TransporterReceiveHandle& recvhandle)
491 {
492 return recvhandle.init(maxTransporters);
493 }
494
495 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg,bool & close_with_reset) const496 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
497 BaseString & msg,
498 bool& close_with_reset) const
499 {
500 DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
501
502 // Read "hello" that consists of node id and transporter
503 // type from client
504 SocketInputStream s_input(sockfd);
505 char buf[11+1+11+1]; // <int> <int>
506 if (s_input.gets(buf, sizeof(buf)) == 0) {
507 msg.assfmt("line: %u : Failed to get nodeid from client", __LINE__);
508 DBUG_PRINT("error", ("Failed to read 'hello' from client"));
509 DBUG_RETURN(false);
510 }
511
512 int nodeId, remote_transporter_type= -1;
513 int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
514 switch (r) {
515 case 2:
516 break;
517 case 1:
518 // we're running version prior to 4.1.9
519 // ok, but with no checks on transporter configuration compatability
520 break;
521 default:
522 msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
523 DBUG_PRINT("error", ("Failed to parse 'hello' from client, buf: '%.*s'",
524 (int)sizeof(buf), buf));
525 DBUG_RETURN(false);
526 }
527
528 DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d",
529 nodeId, remote_transporter_type));
530
531
532 // Check that nodeid is in range before accessing the arrays
533 if (nodeId < 0 ||
534 nodeId >= (int)maxTransporters)
535 {
536 msg.assfmt("line: %u : Incorrect reply from client: >%s<", __LINE__, buf);
537 DBUG_PRINT("error", ("Out of range nodeId: %d from client",
538 nodeId));
539 DBUG_RETURN(false);
540 }
541
542 // Check that transporter is allocated
543 Transporter *t= theTransporters[nodeId];
544 if (t == 0)
545 {
546 msg.assfmt("line: %u : Incorrect reply from client: >%s<, node: %u",
547 __LINE__, buf, nodeId);
548 DBUG_PRINT("error", ("No transporter available for node id %d", nodeId));
549 DBUG_RETURN(false);
550 }
551
552 // Check that the transporter should be connecting
553 if (performStates[nodeId] != TransporterRegistry::CONNECTING)
554 {
555 msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
556 __LINE__, nodeId,
557 getPerformStateString(nodeId),
558 performStates[nodeId]);
559
560 DBUG_PRINT("error", ("Transporter for node id %d in wrong state",
561 nodeId));
562
563 // Avoid TIME_WAIT on server by requesting client to close connection
564 SocketOutputStream s_output(sockfd);
565 if (s_output.println("BYE") < 0)
566 {
567 // Failed to request client close
568 DBUG_PRINT("error", ("Failed to send client BYE"));
569 DBUG_RETURN(false);
570 }
571
572 // Wait for to close connection by reading EOF(i.e read returns 0)
573 const int read_eof_timeout = 1000; // Fairly short timeout
574 if (read_socket(sockfd, read_eof_timeout,
575 buf, sizeof(buf)) == 0)
576 {
577 // Client gracefully closed connection, turn off close_with_reset
578 close_with_reset = false;
579 DBUG_RETURN(false);
580 }
581
582 // Failed to request client close
583 DBUG_RETURN(false);
584 }
585
586 // Check transporter type
587 if (remote_transporter_type != -1 &&
588 remote_transporter_type != t->m_type)
589 {
590 g_eventLogger->error("Connection from node: %d uses different transporter "
591 "type: %d, expected type: %d",
592 nodeId, remote_transporter_type, t->m_type);
593 DBUG_RETURN(false);
594 }
595
596 // Send reply to client
597 SocketOutputStream s_output(sockfd);
598 if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
599 {
600 msg.assfmt("line: %u : Failed to reply to connecting socket (node: %u)",
601 __LINE__, nodeId);
602 DBUG_PRINT("error", ("Send of reply failed"));
603 DBUG_RETURN(false);
604 }
605
606 // Setup transporter (transporter responsible for closing sockfd)
607 bool res = t->connect_server(sockfd, msg);
608
609 if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
610 {
611 msg.assfmt("line: %u : Incorrect state for node %u state: %s (%u)",
612 __LINE__, nodeId,
613 getPerformStateString(nodeId),
614 performStates[nodeId]);
615 // Connection suceeded, but not connecting anymore, return
616 // false to close the connection
617 DBUG_RETURN(false);
618 }
619
620 DBUG_RETURN(res);
621 }
622
623
624 bool
configureTransporter(TransporterConfiguration * config)625 TransporterRegistry::configureTransporter(TransporterConfiguration *config)
626 {
627 NodeId remoteNodeId = config->remoteNodeId;
628
629 assert(localNodeId);
630 assert(config->localNodeId == localNodeId);
631
632 if (remoteNodeId >= maxTransporters)
633 return false;
634
635 Transporter* t = theTransporters[remoteNodeId];
636 if(t != NULL)
637 {
638 // Transporter already exist, try to reconfigure it
639 return t->configure(config);
640 }
641
642 DEBUG("Configuring transporter from " << localNodeId
643 << " to " << remoteNodeId);
644
645 switch (config->type){
646 case tt_TCP_TRANSPORTER:
647 return createTCPTransporter(config);
648 case tt_SHM_TRANSPORTER:
649 return createSHMTransporter(config);
650 case tt_SCI_TRANSPORTER:
651 return createSCITransporter(config);
652 default:
653 abort();
654 break;
655 }
656 return false;
657 }
658
659
660 bool
createTCPTransporter(TransporterConfiguration * config)661 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
662 #ifdef NDB_TCP_TRANSPORTER
663
664 TCP_Transporter * t = 0;
665 if (config->remoteNodeId == config->localNodeId)
666 {
667 t = new Loopback_Transporter(* this, config);
668 }
669 else
670 {
671 t = new TCP_Transporter(*this, config);
672 }
673
674 if (t == NULL)
675 return false;
676 else if (!t->initTransporter()) {
677 delete t;
678 return false;
679 }
680
681 // Put the transporter in the transporter arrays
682 theTCPTransporters[nTCPTransporters] = t;
683 theTransporters[t->getRemoteNodeId()] = t;
684 theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
685 performStates[t->getRemoteNodeId()] = DISCONNECTED;
686 nTransporters++;
687 nTCPTransporters++;
688 m_total_max_send_buffer += t->get_max_send_buffer();
689
690 return true;
691 #else
692 return false;
693 #endif
694 }
695
696 bool
createSCITransporter(TransporterConfiguration * config)697 TransporterRegistry::createSCITransporter(TransporterConfiguration *config) {
698 #ifdef NDB_SCI_TRANSPORTER
699
700 if(!SCI_Transporter::initSCI())
701 abort();
702
703 SCI_Transporter * t = new SCI_Transporter(*this,
704 config->localHostName,
705 config->remoteHostName,
706 config->s_port,
707 config->isMgmConnection,
708 config->sci.sendLimit,
709 config->sci.bufferSize,
710 config->sci.nLocalAdapters,
711 config->sci.remoteSciNodeId0,
712 config->sci.remoteSciNodeId1,
713 localNodeId,
714 config->remoteNodeId,
715 config->serverNodeId,
716 config->checksum,
717 config->signalId);
718
719 if (t == NULL)
720 return false;
721 else if (!t->initTransporter()) {
722 delete t;
723 return false;
724 }
725 // Put the transporter in the transporter arrays
726 theSCITransporters[nSCITransporters] = t;
727 theTransporters[t->getRemoteNodeId()] = t;
728 theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
729 performStates[t->getRemoteNodeId()] = DISCONNECTED;
730 nTransporters++;
731 nSCITransporters++;
732 m_total_max_send_buffer += t->get_max_send_buffer();
733
734 return true;
735 #else
736 return false;
737 #endif
738 }
739
740 bool
createSHMTransporter(TransporterConfiguration * config)741 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
742 DBUG_ENTER("TransporterRegistry::createTransporter SHM");
743 #ifdef NDB_SHM_TRANSPORTER
744
745 if (!g_ndb_shm_signum) {
746 g_ndb_shm_signum= config->shm.signum;
747 DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
748 /**
749 * Make sure to block g_ndb_shm_signum
750 * TransporterRegistry::init is run from "main" thread
751 */
752 NdbThread_set_shm_sigmask(TRUE);
753 }
754
755 if(config->shm.signum != g_ndb_shm_signum)
756 return false;
757
758 SHM_Transporter * t = new SHM_Transporter(*this,
759 config->localHostName,
760 config->remoteHostName,
761 config->s_port,
762 config->isMgmConnection,
763 localNodeId,
764 config->remoteNodeId,
765 config->serverNodeId,
766 config->checksum,
767 config->signalId,
768 config->shm.shmKey,
769 config->shm.shmSize
770 );
771 if (t == NULL)
772 return false;
773 else if (!t->initTransporter()) {
774 delete t;
775 return false;
776 }
777 // Put the transporter in the transporter arrays
778 theSHMTransporters[nSHMTransporters] = t;
779 theTransporters[t->getRemoteNodeId()] = t;
780 theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
781 performStates[t->getRemoteNodeId()] = DISCONNECTED;
782
783 nTransporters++;
784 nSHMTransporters++;
785 m_total_max_send_buffer += t->get_max_send_buffer();
786
787 DBUG_RETURN(true);
788 #else
789 DBUG_RETURN(false);
790 #endif
791 }
792
793
794 void
removeTransporter(NodeId nodeId)795 TransporterRegistry::removeTransporter(NodeId nodeId) {
796
797 DEBUG("Removing transporter from " << localNodeId
798 << " to " << nodeId);
799
800 if(theTransporters[nodeId] == NULL)
801 return;
802
803 theTransporters[nodeId]->doDisconnect();
804
805 const TransporterType type = theTransporterTypes[nodeId];
806
807 int ind = 0;
808 switch(type){
809 case tt_TCP_TRANSPORTER:
810 #ifdef NDB_TCP_TRANSPORTER
811 for(; ind < nTCPTransporters; ind++)
812 if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
813 break;
814 ind++;
815 for(; ind<nTCPTransporters; ind++)
816 theTCPTransporters[ind-1] = theTCPTransporters[ind];
817 nTCPTransporters --;
818 #endif
819 break;
820 case tt_SCI_TRANSPORTER:
821 #ifdef NDB_SCI_TRANSPORTER
822 for(; ind < nSCITransporters; ind++)
823 if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
824 break;
825 ind++;
826 for(; ind<nSCITransporters; ind++)
827 theSCITransporters[ind-1] = theSCITransporters[ind];
828 nSCITransporters --;
829 #endif
830 break;
831 case tt_SHM_TRANSPORTER:
832 #ifdef NDB_SHM_TRANSPORTER
833 for(; ind < nSHMTransporters; ind++)
834 if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
835 break;
836 ind++;
837 for(; ind<nSHMTransporters; ind++)
838 theSHMTransporters[ind-1] = theSHMTransporters[ind];
839 nSHMTransporters --;
840 #endif
841 break;
842 }
843
844 nTransporters--;
845
846 // Delete the transporter and remove it from theTransporters array
847 delete theTransporters[nodeId];
848 theTransporters[nodeId] = NULL;
849 }
850
851 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const LinearSectionPtr ptr[3])852 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
853 const SignalHeader * const signalHeader,
854 Uint8 prio,
855 const Uint32 * const signalData,
856 NodeId nodeId,
857 const LinearSectionPtr ptr[3]){
858
859
860 Transporter *t = theTransporters[nodeId];
861 if(t != NULL &&
862 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
863 ((signalHeader->theReceiversBlockNumber == 252) ||
864 (signalHeader->theReceiversBlockNumber == 4002)))) {
865
866 if(t->isConnected()){
867 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
868 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
869 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
870 if(insertPtr != 0){
871 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
872 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
873 return SEND_OK;
874 }
875
876 set_status_overloaded(nodeId, true);
877 int sleepTime = 2;
878
879 /**
880 * @note: on linux/i386 the granularity is 10ms
881 * so sleepTime = 2 generates a 10 ms sleep.
882 */
883 for(int i = 0; i<50; i++){
884 if((nSHMTransporters+nSCITransporters) == 0)
885 NdbSleep_MilliSleep(sleepTime);
886 /* FC : Consider counting sleeps here */
887 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
888 if(insertPtr != 0){
889 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
890 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
891 break;
892 }
893 }
894
895 if(insertPtr != 0){
896 /**
897 * Send buffer full, but resend works
898 */
899 report_error(nodeId, TE_SEND_BUFFER_FULL);
900 return SEND_OK;
901 }
902
903 WARNING("Signal to " << nodeId << " lost(buffer)");
904 report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
905 return SEND_BUFFER_FULL;
906 } else {
907 return SEND_MESSAGE_TOO_BIG;
908 }
909 } else {
910 #ifdef ERROR_INSERT
911 if (m_blocked.get(nodeId))
912 {
913 /* Looks like it disconnected while blocked. We'll pretend
914 * not to notice for now
915 */
916 WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
917 return SEND_OK;
918 }
919 #endif
920 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
921 return SEND_DISCONNECTED;
922 }
923 } else {
924 DEBUG("Discarding message to block: "
925 << signalHeader->theReceiversBlockNumber
926 << " node: " << nodeId);
927
928 if(t == NULL)
929 return SEND_UNKNOWN_NODE;
930
931 return SEND_BLOCKED;
932 }
933 }
934
935 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])936 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
937 const SignalHeader * const signalHeader,
938 Uint8 prio,
939 const Uint32 * const signalData,
940 NodeId nodeId,
941 class SectionSegmentPool & thePool,
942 const SegmentedSectionPtr ptr[3]){
943
944
945 Transporter *t = theTransporters[nodeId];
946 if(t != NULL &&
947 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
948 ((signalHeader->theReceiversBlockNumber == 252)||
949 (signalHeader->theReceiversBlockNumber == 4002)))) {
950
951 if(t->isConnected()){
952 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
953 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
954 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
955 if(insertPtr != 0){
956 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
957 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
958 return SEND_OK;
959 }
960
961 /**
962 * @note: on linux/i386 the granularity is 10ms
963 * so sleepTime = 2 generates a 10 ms sleep.
964 */
965 set_status_overloaded(nodeId, true);
966 int sleepTime = 2;
967 for(int i = 0; i<50; i++){
968 if((nSHMTransporters+nSCITransporters) == 0)
969 NdbSleep_MilliSleep(sleepTime);
970 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
971 if(insertPtr != 0){
972 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
973 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
974 break;
975 }
976 }
977
978 if(insertPtr != 0){
979 /**
980 * Send buffer full, but resend works
981 */
982 report_error(nodeId, TE_SEND_BUFFER_FULL);
983 return SEND_OK;
984 }
985
986 WARNING("Signal to " << nodeId << " lost(buffer)");
987 report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
988 return SEND_BUFFER_FULL;
989 } else {
990 return SEND_MESSAGE_TOO_BIG;
991 }
992 } else {
993 #ifdef ERROR_INSERT
994 if (m_blocked.get(nodeId))
995 {
996 /* Looks like it disconnected while blocked. We'll pretend
997 * not to notice for now
998 */
999 WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
1000 return SEND_OK;
1001 }
1002 #endif
1003 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
1004 return SEND_DISCONNECTED;
1005 }
1006 } else {
1007 DEBUG("Discarding message to block: "
1008 << signalHeader->theReceiversBlockNumber
1009 << " node: " << nodeId);
1010
1011 if(t == NULL)
1012 return SEND_UNKNOWN_NODE;
1013
1014 return SEND_BLOCKED;
1015 }
1016 }
1017
1018
1019 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * const signalHeader,Uint8 prio,const Uint32 * const signalData,NodeId nodeId,const GenericSectionPtr ptr[3])1020 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1021 const SignalHeader * const signalHeader,
1022 Uint8 prio,
1023 const Uint32 * const signalData,
1024 NodeId nodeId,
1025 const GenericSectionPtr ptr[3]){
1026
1027
1028 Transporter *t = theTransporters[nodeId];
1029 if(t != NULL &&
1030 (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
1031 ((signalHeader->theReceiversBlockNumber == 252) ||
1032 (signalHeader->theReceiversBlockNumber == 4002)))) {
1033
1034 if(t->isConnected()){
1035 Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
1036 if(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE){
1037 Uint32 * insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
1038 if(insertPtr != 0){
1039 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
1040 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
1041 return SEND_OK;
1042 }
1043
1044 /**
1045 * @note: on linux/i386 the granularity is 10ms
1046 * so sleepTime = 2 generates a 10 ms sleep.
1047 */
1048 set_status_overloaded(nodeId, true);
1049 int sleepTime = 2;
1050 for(int i = 0; i<50; i++){
1051 if((nSHMTransporters+nSCITransporters) == 0)
1052 NdbSleep_MilliSleep(sleepTime);
1053 insertPtr = getWritePtr(sendHandle, nodeId, lenBytes, prio);
1054 if(insertPtr != 0){
1055 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
1056 updateWritePtr(sendHandle, nodeId, lenBytes, prio);
1057 break;
1058 }
1059 }
1060
1061 if(insertPtr != 0){
1062 /**
1063 * Send buffer full, but resend works
1064 */
1065 report_error(nodeId, TE_SEND_BUFFER_FULL);
1066 return SEND_OK;
1067 }
1068
1069 WARNING("Signal to " << nodeId << " lost(buffer)");
1070 report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
1071 return SEND_BUFFER_FULL;
1072 } else {
1073 return SEND_MESSAGE_TOO_BIG;
1074 }
1075 } else {
1076 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
1077 return SEND_DISCONNECTED;
1078 }
1079 } else {
1080 DEBUG("Discarding message to block: "
1081 << signalHeader->theReceiversBlockNumber
1082 << " node: " << nodeId);
1083
1084 if(t == NULL)
1085 return SEND_UNKNOWN_NODE;
1086
1087 return SEND_BLOCKED;
1088 }
1089 }
1090
1091 void
external_IO(Uint32 timeOutMillis)1092 TransporterRegistry::external_IO(Uint32 timeOutMillis) {
1093 //-----------------------------------------------------------
1094 // Most of the time we will send the buffers here and then wait
1095 // for new signals. Thus we start by sending without timeout
1096 // followed by the receive part where we expect to sleep for
1097 // a while.
1098 //-----------------------------------------------------------
1099 if(pollReceive(timeOutMillis, * receiveHandle)){
1100 performReceive(* receiveHandle);
1101 }
1102 performSend();
1103 }
1104
1105 bool
setup_wakeup_socket(TransporterReceiveHandle & recvdata)1106 TransporterRegistry::setup_wakeup_socket(TransporterReceiveHandle& recvdata)
1107 {
1108 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1109
1110 if (m_has_extra_wakeup_socket)
1111 {
1112 return true;
1113 }
1114
1115 assert(!recvdata.m_transporters.get(0));
1116
1117 if (my_socketpair(m_extra_wakeup_sockets))
1118 {
1119 perror("socketpair failed!");
1120 return false;
1121 }
1122
1123 if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
1124 !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
1125 {
1126 goto err;
1127 }
1128
1129 #if defined(HAVE_EPOLL_CREATE)
1130 if (recvdata.m_epoll_fd != -1)
1131 {
1132 int sock = m_extra_wakeup_sockets[0].fd;
1133 struct epoll_event event_poll;
1134 bzero(&event_poll, sizeof(event_poll));
1135 event_poll.data.u32 = 0;
1136 event_poll.events = EPOLLIN;
1137 int ret_val = epoll_ctl(recvdata.m_epoll_fd, EPOLL_CTL_ADD, sock,
1138 &event_poll);
1139 if (ret_val != 0)
1140 {
1141 int error= errno;
1142 fprintf(stderr, "Failed to add extra sock %u to epoll-set: %u\n",
1143 sock, error);
1144 fflush(stderr);
1145 goto err;
1146 }
1147 }
1148 #endif
1149 m_has_extra_wakeup_socket = true;
1150 recvdata.m_transporters.set(Uint32(0));
1151 return true;
1152
1153 err:
1154 my_socket_close(m_extra_wakeup_sockets[0]);
1155 my_socket_close(m_extra_wakeup_sockets[1]);
1156 my_socket_invalidate(m_extra_wakeup_sockets+0);
1157 my_socket_invalidate(m_extra_wakeup_sockets+1);
1158 return false;
1159 }
1160
1161 void
wakeup()1162 TransporterRegistry::wakeup()
1163 {
1164 if (m_has_extra_wakeup_socket)
1165 {
1166 static char c = 37;
1167 my_send(m_extra_wakeup_sockets[1], &c, 1, 0);
1168 }
1169 }
1170
1171 Uint32
pollReceive(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1172 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
1173 TransporterReceiveHandle& recvdata)
1174 {
1175 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1176
1177 Uint32 retVal = 0;
1178 recvdata.m_recv_transporters.clear();
1179
1180 /**
1181 * If any transporters have left-over data that was not fully executed in
1182 * last loop, don't wait and return 'data available' even if nothing new
1183 */
1184 if (!recvdata.m_has_data_transporters.isclear())
1185 {
1186 timeOutMillis = 0;
1187 retVal = 1;
1188 }
1189
1190 if (nSCITransporters > 0)
1191 {
1192 timeOutMillis=0;
1193 }
1194
1195 #ifdef NDB_SHM_TRANSPORTER
1196 if (nSHMTransporters > 0)
1197 {
1198 Uint32 res = poll_SHM(0, recvdata);
1199 if(res)
1200 {
1201 retVal |= res;
1202 timeOutMillis = 0;
1203 }
1204 }
1205 #endif
1206
1207 #ifdef NDB_TCP_TRANSPORTER
1208 #if defined(HAVE_EPOLL_CREATE)
1209 if (likely(recvdata.m_epoll_fd != -1))
1210 {
1211 int tcpReadSelectReply = 0;
1212 Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
1213
1214 if (num_trps)
1215 {
1216 tcpReadSelectReply = epoll_wait(recvdata.m_epoll_fd,
1217 recvdata.m_epoll_events,
1218 num_trps, timeOutMillis);
1219 retVal |= tcpReadSelectReply;
1220 }
1221
1222 int num_socket_events = tcpReadSelectReply;
1223 if (num_socket_events > 0)
1224 {
1225 for (int i = 0; i < num_socket_events; i++)
1226 {
1227 const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
1228 /**
1229 * check that it's assigned to "us"
1230 */
1231 assert(recvdata.m_transporters.get(trpid));
1232
1233 recvdata.m_recv_transporters.set(trpid);
1234 }
1235 }
1236 else if (num_socket_events < 0)
1237 {
1238 assert(errno == EINTR);
1239 }
1240 }
1241 else
1242 #endif
1243 {
1244 if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
1245 {
1246 retVal |= poll_TCP(timeOutMillis, recvdata);
1247 }
1248 }
1249 #endif
1250 #ifdef NDB_SCI_TRANSPORTER
1251 if (nSCITransporters > 0)
1252 retVal |= poll_SCI(timeOutMillis, recvdata);
1253 #endif
1254 #ifdef NDB_SHM_TRANSPORTER
1255 if (nSHMTransporters > 0)
1256 {
1257 int res = poll_SHM(0, recvdata);
1258 retVal |= res;
1259 }
1260 #endif
1261 return retVal;
1262 }
1263
1264
1265 #ifdef NDB_SCI_TRANSPORTER
1266 Uint32
poll_SCI(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1267 TransporterRegistry::poll_SCI(Uint32 timeOutMillis,
1268 TransporterReceiveHandle& recvdata)
1269 {
1270 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1271
1272 Uint32 retVal = 0;
1273 for (int i = 0; i < nSCITransporters; i++)
1274 {
1275 SCI_Transporter * t = theSCITransporters[i];
1276 Uint32 node_id = t->getRemoteNodeId();
1277
1278 if (!recvdata.m_transporters.get(nodeId))
1279 continue;
1280
1281 if (t->isConnected() && is_connected(node_id))
1282 {
1283 if (t->hasDataToRead())
1284 {
1285 recvdata.m_has_data_transporters.set(node_id);
1286 retVal = 1;
1287 }
1288 }
1289 }
1290 return retVal;
1291 }
1292 #endif
1293
1294
1295 #ifdef NDB_SHM_TRANSPORTER
1296 static int g_shm_counter = 0;
1297 Uint32
poll_SHM(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1298 TransporterRegistry::poll_SHM(Uint32 timeOutMillis,
1299 TransporterReceiveHandle& recvdata)
1300 {
1301 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1302
1303 Uint32 retVal = 0;
1304 for (int j = 0; j < 100; j++)
1305 {
1306 for (int i = 0; i<nSHMTransporters; i++)
1307 {
1308 SHM_Transporter * t = theSHMTransporters[i];
1309 Uint32 node_id = t->getRemoteNodeId();
1310
1311 if (!recvdata.m_transporters.get(node_id))
1312 continue;
1313
1314 if (t->isConnected() && is_connected(node_id))
1315 {
1316 if (t->hasDataToRead())
1317 {
1318 j = 100;
1319 recvdata.m_has_data_transporters.set(node_id);
1320 retVal = 1;
1321 }
1322 }
1323 }
1324 }
1325 return retVal;
1326 }
1327 #endif
1328
1329 #ifdef NDB_TCP_TRANSPORTER
1330 /**
1331 * We do not want to hold any transporter locks during select(), so there
1332 * is no protection against a disconnect closing the socket during this call.
1333 *
1334 * That does not matter, at most we will get a spurious wakeup on the wrong
1335 * socket, which will be handled correctly in performReceive() (which _is_
1336 * protected by transporter locks on upper layer).
1337 */
1338 Uint32
poll_TCP(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1339 TransporterRegistry::poll_TCP(Uint32 timeOutMillis,
1340 TransporterReceiveHandle& recvdata)
1341 {
1342 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1343
1344 recvdata.m_socket_poller.clear();
1345
1346 const bool extra_socket = m_has_extra_wakeup_socket;
1347 if (extra_socket && recvdata.m_transporters.get(0))
1348 {
1349 const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1350 assert(&recvdata == receiveHandle); // not used by ndbmtd...
1351
1352 // Poll the wakup-socket for read
1353 recvdata.m_socket_poller.add(socket, true, false, false);
1354 }
1355
1356 Uint16 idx[MAX_NODES];
1357 for (int i = 0; i < nTCPTransporters; i++)
1358 {
1359 TCP_Transporter * t = theTCPTransporters[i];
1360 const NDB_SOCKET_TYPE socket = t->getSocket();
1361 Uint32 node_id = t->getRemoteNodeId();
1362
1363 idx[i] = MAX_NODES + 1;
1364 if (!recvdata.m_transporters.get(node_id))
1365 continue;
1366
1367 if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
1368 {
1369 idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
1370 }
1371 }
1372
1373 int tcpReadSelectReply = recvdata.m_socket_poller.poll_unsafe(timeOutMillis);
1374
1375 if (tcpReadSelectReply > 0)
1376 {
1377 if (extra_socket)
1378 {
1379 if (recvdata.m_socket_poller.has_read(0))
1380 {
1381 assert(recvdata.m_transporters.get(0));
1382 recvdata.m_recv_transporters.set((Uint32)0);
1383 }
1384 }
1385
1386 for (int i = 0; i < nTCPTransporters; i++)
1387 {
1388 TCP_Transporter * t = theTCPTransporters[i];
1389 if (idx[i] != MAX_NODES + 1)
1390 {
1391 Uint32 node_id = t->getRemoteNodeId();
1392 if (recvdata.m_socket_poller.has_read(idx[i]))
1393 recvdata.m_recv_transporters.set(node_id);
1394 }
1395 }
1396 }
1397
1398 return tcpReadSelectReply;
1399 }
1400 #endif
1401
1402 /**
1403 * Receive from the set of transporters in the bitmask
1404 * 'recvdata.m_transporters'. These has been polled by
1405 * ::pollReceive() which recorded transporters with
1406 * available data in the subset 'recvdata.m_recv_transporters'.
1407 *
1408 * In multi-threaded datanodes, there might be multiple
1409 * receiver threads, each serving a disjunct set of
1410 * 'm_transporters'.
1411 *
1412 * Single-threaded datanodes does all ::performReceive
1413 * from the scheduler main-loop, and thus it will handle
1414 * all 'm_transporters'.
1415 *
1416 * Clients has to aquire a 'poll right' (see TransporterFacade)
1417 * which gives it the right to temporarily acts as a receive
1418 * thread with the right to poll *all* transporters.
1419 *
1420 * Reception takes place on a set of transporters knowing to be in a
1421 * 'CONNECTED' state. Transporters can (asynch) become 'DISCONNECTING'
1422 * while we performReceive(). There is *no* mutex lock protecting
1423 * 'disconnecting' from being started while we are in the receive-loop!
1424 * However, the contents of the buffers++ should still be in a
1425 * consistent state, such that the current receive can complete
1426 * without failures.
1427 *
1428 * With regular intervals we have to ::update_connections()
1429 * in order to bring DISCONNECTING transporters into
1430 * a DISCONNECTED state. At earlies at this point, resources
1431 * used by performReceive() may be reset or released.
1432 * A transporter should be brought to the DISCONNECTED state
1433 * before it can reconnect again. (Note: There is a break of
1434 * this rule in ::do_connect, see own note here)
1435 *
1436 * To not interfere with ::poll- or ::performReceive(),
1437 * ::update_connections() has to be synched with with these
1438 * methods. Either by being run within the same
1439 * receive thread (dataNodes), or protected by the 'poll rights'.
1440 *
1441 * In case we were unable to receive due to job buffers being full.
1442 * Returns 0 when receive succeeded from all Transporters having data,
1443 * else 1.
1444 */
1445 Uint32
performReceive(TransporterReceiveHandle & recvdata)1446 TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
1447 {
1448 TransporterReceiveWatchdog guard(recvdata);
1449 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1450 bool stopReceiving = false;
1451
1452 if (recvdata.m_recv_transporters.get(0))
1453 {
1454 assert(recvdata.m_transporters.get(0));
1455 assert(&recvdata == receiveHandle); // not used by ndbmtd
1456 recvdata.m_recv_transporters.clear(Uint32(0));
1457 consume_extra_sockets();
1458 }
1459
1460 #ifdef ERROR_INSERT
1461 if (!m_blocked.isclear())
1462 {
1463 /* Exclude receive from blocked sockets. */
1464 recvdata.m_recv_transporters.bitANDC(m_blocked);
1465
1466 if (recvdata.m_recv_transporters.isclear() &&
1467 recvdata.m_has_data_transporters.isclear())
1468 {
1469 /* poll sees data, but we want to ignore for now
1470 * sleep a little to avoid busy loop
1471 */
1472 NdbSleep_MilliSleep(1);
1473 }
1474 }
1475 #endif
1476
1477 #ifdef NDB_TCP_TRANSPORTER
1478 /**
1479 * Receive data from transporters polled to have data.
1480 * Add to set of transporters having pending data.
1481 */
1482 for(Uint32 id = recvdata.m_recv_transporters.find_first();
1483 id != BitmaskImpl::NotFound;
1484 id = recvdata.m_recv_transporters.find_next(id + 1))
1485 {
1486 TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
1487 assert(recvdata.m_transporters.get(id));
1488
1489 /**
1490 * First check connection 'is CONNECTED.
1491 * A connection can only be set into, or taken out of, is_connected'
1492 * state by ::update_connections(). See comment there about
1493 * synchronication between ::update_connections() and
1494 * performReceive()
1495 *
1496 * Transporter::isConnected() state my change asynch.
1497 * A mismatch between the TransporterRegistry::is_connected(),
1498 * and Transporter::isConnected() state is possible, and indicate
1499 * that a change is underway. (Completed by update_connections())
1500 */
1501 if (is_connected(id))
1502 {
1503 if (t->isConnected())
1504 {
1505 int nBytes = t->doReceive(recvdata);
1506 if (nBytes > 0)
1507 {
1508 recvdata.transporter_recv_from(id);
1509 recvdata.m_has_data_transporters.set(id);
1510 }
1511 }
1512 }
1513 }
1514 recvdata.m_recv_transporters.clear();
1515
1516 /**
1517 * Unpack data either received above or pending from prev rounds.
1518 *
1519 * Data to be processed at this stage is in the Transporter
1520 * receivebuffer. The data *is received*, and will stay in
1521 * the receiveBuffer even if a disconnect is started during
1522 * unpack.
1523 * When ::update_connection() finaly completes the disconnect,
1524 * (synced with ::performReceive()), 'm_has_data_transporters'
1525 * will be cleared, which will terminate further unpacking.
1526 *
1527 * NOTE:
1528 * Without reading inconsistent date, we could have removed
1529 * the 'connected' checks below, However, there is a requirement
1530 * in the CLOSE_COMREQ/CONF protocol between TRPMAN and QMGR
1531 * that no signals arrives from disconnecting nodes after a
1532 * CLOSE_COMCONF was sent. For the moment the risk of taking
1533 * advantage of this small optimization is not worth the risk.
1534 */
1535 for(Uint32 id = recvdata.m_has_data_transporters.find_first();
1536 id != BitmaskImpl::NotFound && !stopReceiving;
1537 id = recvdata.m_has_data_transporters.find_next(id + 1))
1538 {
1539 bool hasdata = false;
1540 TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
1541
1542 assert(recvdata.m_transporters.get(id));
1543
1544 if (is_connected(id))
1545 {
1546 if (t->isConnected())
1547 {
1548 if (unlikely(recvdata.checkJobBuffer()))
1549 return 1; // Full, can't unpack more
1550 if (unlikely(recvdata.m_handled_transporters.get(id)))
1551 continue; // Skip now to avoid starvation
1552 Uint32 * ptr;
1553 Uint32 sz = t->getReceiveData(&ptr);
1554 Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id], stopReceiving);
1555 if (likely(szUsed))
1556 {
1557 t->updateReceiveDataPtr(szUsed);
1558 hasdata = t->hasReceiveData();
1559 }
1560 // else, we didn't unpack anything:
1561 // Avail ReceiveData to short to be usefull, need to
1562 // receive more before we can resume this transporter.
1563 }
1564 }
1565 // If transporter still have data, make sure that it's remember to next time
1566 recvdata.m_has_data_transporters.set(id, hasdata);
1567 recvdata.m_handled_transporters.set(id, hasdata);
1568 }
1569 #endif
1570
1571 #ifdef NDB_SCI_TRANSPORTER
1572 //performReceive
1573 //do prepareReceive on the SCI transporters (prepareReceive(t,,,,))
1574 for (int i=0; i<nSCITransporters && !stopReceiving; i++)
1575 {
1576 SCI_Transporter *t = theSCITransporters[i];
1577 const NodeId nodeId = t->getRemoteNodeId();
1578 assert(recvdata.m_transporters.get(nodeId));
1579 if(is_connected(nodeId))
1580 {
1581 if(t->isConnected() && t->checkConnected())
1582 {
1583 if (unlikely(recvdata.checkJobBuffer()))
1584 return 1; // Full, can't unpack more
1585 if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
1586 continue; // Skip now to avoid starvation
1587
1588 Uint32 * readPtr, * eodPtr;
1589 t->getReceivePtr(&readPtr, &eodPtr);
1590 callbackObj->transporter_recv_from(nodeId);
1591 Uint32 *newPtr = unpack(recvdata, readPtr, eodPtr, nodeId, ioStates[nodeId], stopReceiving);
1592 t->updateReceivePtr(newPtr);
1593 }
1594 }
1595 recvdata.m_handled_transporters.set(nodeId);
1596 }
1597 #endif
1598 #ifdef NDB_SHM_TRANSPORTER
1599 for (int i=0; i<nSHMTransporters && !stopReceiving; i++)
1600 {
1601 SHM_Transporter *t = theSHMTransporters[i];
1602 const NodeId nodeId = t->getRemoteNodeId();
1603 assert(recvdata.m_transporters.get(nodeId));
1604 if(is_connected(nodeId)){
1605 if(t->isConnected() && t->checkConnected())
1606 {
1607 if (unlikely(recvdata.checkJobBuffer()))
1608 return 1; // Full, can't unpack more
1609 if (unlikely(recvdata.m_handled_transporters.get(nodeId)))
1610 continue; // Previously handled, skip to avoid starvation
1611
1612 Uint32 * readPtr, * eodPtr;
1613 t->getReceivePtr(&readPtr, &eodPtr);
1614 recvdata.transporter_recv_from(nodeId);
1615 Uint32 *newPtr = unpack(recvdata,
1616 readPtr, eodPtr, nodeId, ioStates[nodeId],
1617 stopReceiving);
1618 t->updateReceivePtr(newPtr);
1619 }
1620 }
1621 recvdata.m_handled_transporters.set(nodeId);
1622 }
1623 #endif
1624 recvdata.m_handled_transporters.clear();
1625 return 0;
1626 }
1627
1628 /**
1629 * In multi-threaded cases, this must be protected by send lock (can use
1630 * different locks for each node).
1631 */
1632 bool
performSend(NodeId nodeId)1633 TransporterRegistry::performSend(NodeId nodeId)
1634 {
1635 Transporter *t = get_transporter(nodeId);
1636 if (t && t->isConnected() && is_connected(nodeId))
1637 {
1638 return t->doSend();
1639 }
1640
1641 return false;
1642 }
1643
1644 void
consume_extra_sockets()1645 TransporterRegistry::consume_extra_sockets()
1646 {
1647 char buf[4096];
1648 ssize_t ret;
1649 int err;
1650 NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
1651 do
1652 {
1653 ret = my_recv(sock, buf, sizeof(buf), 0);
1654 err = my_socket_errno();
1655 } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
1656
1657 /* Notify upper layer of explicit wakeup */
1658 callbackObj->reportWakeup();
1659 }
1660
1661 void
performSend()1662 TransporterRegistry::performSend()
1663 {
1664 int i;
1665 sendCounter = 1;
1666
1667 #ifdef NDB_TCP_TRANSPORTER
1668 for (i = m_transp_count; i < nTCPTransporters; i++)
1669 {
1670 TCP_Transporter *t = theTCPTransporters[i];
1671 if (t && t->has_data_to_send() &&
1672 t->isConnected() && is_connected(t->getRemoteNodeId()))
1673 {
1674 t->doSend();
1675 }
1676 }
1677 for (i = 0; i < m_transp_count && i < nTCPTransporters; i++)
1678 {
1679 TCP_Transporter *t = theTCPTransporters[i];
1680 if (t && t->has_data_to_send() &&
1681 t->isConnected() && is_connected(t->getRemoteNodeId()))
1682 {
1683 t->doSend();
1684 }
1685 }
1686 m_transp_count++;
1687 if (m_transp_count == nTCPTransporters) m_transp_count = 0;
1688 #endif
1689 #ifdef NDB_SCI_TRANSPORTER
1690 //scroll through the SCI transporters,
1691 // get each transporter, check if connected, send data
1692 for (i=0; i<nSCITransporters; i++) {
1693 SCI_Transporter *t = theSCITransporters[i];
1694 const NodeId nodeId = t->getRemoteNodeId();
1695
1696 if(is_connected(nodeId))
1697 {
1698 if(t->isConnected() && t->has_data_to_send())
1699 {
1700 t->doSend();
1701 } //if
1702 } //if
1703 }
1704 #endif
1705
1706 #ifdef NDB_SHM_TRANSPORTER
1707 for (i=0; i<nSHMTransporters; i++)
1708 {
1709 SHM_Transporter *t = theSHMTransporters[i];
1710 const NodeId nodeId = t->getRemoteNodeId();
1711 if(is_connected(nodeId))
1712 {
1713 if(t->isConnected())
1714 {
1715 t->doSend();
1716 }
1717 }
1718 }
1719 #endif
1720 }
1721
1722 int
forceSendCheck(int sendLimit)1723 TransporterRegistry::forceSendCheck(int sendLimit){
1724 int tSendCounter = sendCounter;
1725 sendCounter = tSendCounter + 1;
1726 if (tSendCounter >= sendLimit) {
1727 performSend();
1728 sendCounter = 1;
1729 return 1;
1730 }//if
1731 return 0;
1732 }//TransporterRegistry::forceSendCheck()
1733
1734 #ifdef DEBUG_TRANSPORTER
1735 void
printState()1736 TransporterRegistry::printState(){
1737 ndbout << "-- TransporterRegistry -- " << endl << endl
1738 << "Transporters = " << nTransporters << endl;
1739 for(int i = 0; i<maxTransporters; i++)
1740 if(theTransporters[i] != NULL){
1741 const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
1742 ndbout << "Transporter: " << remoteNodeId
1743 << " PerformState: " << performStates[remoteNodeId]
1744 << " IOState: " << ioStates[remoteNodeId] << endl;
1745 }
1746 }
1747 #endif
1748
1749 #ifdef ERROR_INSERT
1750 bool
isBlocked(NodeId nodeId)1751 TransporterRegistry::isBlocked(NodeId nodeId)
1752 {
1753 return m_blocked.get(nodeId);
1754 }
1755
1756 void
blockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)1757 TransporterRegistry::blockReceive(TransporterReceiveHandle& recvdata,
1758 NodeId nodeId)
1759 {
1760 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1761 assert(recvdata.m_transporters.get(nodeId));
1762
1763 /* Check that node is not already blocked?
1764 * Stop pulling from its socket (but track received data etc)
1765 */
1766 /* Shouldn't already be blocked with data */
1767 assert(!m_blocked.get(nodeId));
1768
1769 m_blocked.set(nodeId);
1770 }
1771
1772 void
unblockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)1773 TransporterRegistry::unblockReceive(TransporterReceiveHandle& recvdata,
1774 NodeId nodeId)
1775 {
1776 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1777 assert(recvdata.m_transporters.get(nodeId));
1778
1779 /* Check that node is blocked?
1780 * Resume pulling from its socket
1781 * Ensure in-flight data is processed if there was some
1782 */
1783 assert(m_blocked.get(nodeId));
1784 assert(!recvdata.m_has_data_transporters.get(nodeId));
1785
1786 m_blocked.clear(nodeId);
1787
1788 if (m_blocked_disconnected.get(nodeId))
1789 {
1790 /* Process disconnect notification/handling now */
1791 m_blocked_disconnected.clear(nodeId);
1792
1793 report_disconnect(recvdata, nodeId, m_disconnect_errors[nodeId]);
1794 }
1795 }
1796 #endif
1797
1798 #ifdef ERROR_INSERT
1799 Uint32
getMixologyLevel() const1800 TransporterRegistry::getMixologyLevel() const
1801 {
1802 return m_mixology_level;
1803 }
1804
1805 extern Uint32 MAX_RECEIVED_SIGNALS; /* Packer.cpp */
1806
1807 #define MIXOLOGY_MIX_INCOMING_SIGNALS 4
1808
1809 void
setMixologyLevel(Uint32 l)1810 TransporterRegistry::setMixologyLevel(Uint32 l)
1811 {
1812 m_mixology_level = l;
1813
1814 if (m_mixology_level & MIXOLOGY_MIX_INCOMING_SIGNALS)
1815 {
1816 ndbout_c("MIXOLOGY_MIX_INCOMING_SIGNALS on");
1817 /* Max one signal per transporter */
1818 MAX_RECEIVED_SIGNALS = 1;
1819 }
1820
1821 /* TODO : Add mixing of Send from NdbApi / MGMD */
1822 }
1823 #endif
1824
1825 IOState
ioState(NodeId nodeId) const1826 TransporterRegistry::ioState(NodeId nodeId) const {
1827 return ioStates[nodeId];
1828 }
1829
1830 void
setIOState(NodeId nodeId,IOState state)1831 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
1832 if (ioStates[nodeId] == state)
1833 return;
1834
1835 DEBUG("TransporterRegistry::setIOState("
1836 << nodeId << ", " << state << ")");
1837
1838 ioStates[nodeId] = state;
1839 }
1840
1841 extern "C" void *
run_start_clients_C(void * me)1842 run_start_clients_C(void * me)
1843 {
1844 ((TransporterRegistry*) me)->start_clients_thread();
1845 return 0;
1846 }
1847
1848 /**
1849 * This method is used to initiate connection, called from the TRPMAN block.
1850 *
1851 * This works asynchronously, no actions are taken directly in the calling
1852 * thread.
1853 */
1854 void
do_connect(NodeId node_id)1855 TransporterRegistry::do_connect(NodeId node_id)
1856 {
1857 PerformState &curr_state = performStates[node_id];
1858 switch(curr_state){
1859 case DISCONNECTED:
1860 break;
1861 case CONNECTED:
1862 return;
1863 case CONNECTING:
1864 return;
1865 case DISCONNECTING:
1866 /**
1867 * NOTE (Need future work)
1868 * Going directly from DISCONNECTION to CONNECTING creates
1869 * a possile race with ::update_connections(): It will
1870 * see either of the *ING states, and bring the connection
1871 * into CONNECTED or *DISCONNECTED* state. Furthermore, the
1872 * state may be overwritten to CONNECTING by this method.
1873 * We should probably have waited for DISCONNECTED state,
1874 * before allowing reCONNECTING ....
1875 */
1876 break;
1877 }
1878 DBUG_ENTER("TransporterRegistry::do_connect");
1879 DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
1880
1881 /*
1882 No one else should be using the transporter now, reset
1883 its send buffer
1884 */
1885 callbackObj->reset_send_buffer(node_id);
1886
1887 Transporter * t = theTransporters[node_id];
1888 if (t != NULL)
1889 t->resetBuffers();
1890
1891 curr_state= CONNECTING;
1892 DBUG_VOID_RETURN;
1893 }
1894
1895 /**
1896 * This method is used to initiate disconnect from TRPMAN. It is also called
1897 * from the TCP transporter in case of an I/O error on the socket.
1898 *
1899 * This works asynchronously, similar to do_connect().
1900 */
1901 void
do_disconnect(NodeId node_id,int errnum)1902 TransporterRegistry::do_disconnect(NodeId node_id, int errnum)
1903 {
1904 PerformState &curr_state = performStates[node_id];
1905 switch(curr_state){
1906 case DISCONNECTED:
1907 return;
1908 case CONNECTED:
1909 break;
1910 case CONNECTING:
1911 break;
1912 case DISCONNECTING:
1913 return;
1914 }
1915 DBUG_ENTER("TransporterRegistry::do_disconnect");
1916 DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
1917 curr_state= DISCONNECTING;
1918 m_disconnect_errnum[node_id] = errnum;
1919 DBUG_VOID_RETURN;
1920 }
1921
1922 void
report_connect(TransporterReceiveHandle & recvdata,NodeId node_id)1923 TransporterRegistry::report_connect(TransporterReceiveHandle& recvdata,
1924 NodeId node_id)
1925 {
1926 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1927 assert(recvdata.m_transporters.get(node_id));
1928
1929 DBUG_ENTER("TransporterRegistry::report_connect");
1930 DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
1931
1932 /*
1933 The send buffers was reset when this connection
1934 was set to CONNECTING. In order to make sure no stray
1935 signals has been written to the send buffer since then
1936 call 'reset_send_buffer' with the "should_be_empty" flag
1937 set
1938 */
1939 callbackObj->reset_send_buffer(node_id, true);
1940
1941 if (recvdata.epoll_add((TCP_Transporter*)theTransporters[node_id]))
1942 {
1943 performStates[node_id] = CONNECTED;
1944 recvdata.reportConnect(node_id);
1945 DBUG_VOID_RETURN;
1946 }
1947
1948 /**
1949 * Failed to add to epoll_set...
1950 * disconnect it (this is really really bad)
1951 */
1952 performStates[node_id] = DISCONNECTING;
1953 DBUG_VOID_RETURN;
1954 }
1955
1956 void
report_disconnect(TransporterReceiveHandle & recvdata,NodeId node_id,int errnum)1957 TransporterRegistry::report_disconnect(TransporterReceiveHandle& recvdata,
1958 NodeId node_id, int errnum)
1959 {
1960 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1961 assert(recvdata.m_transporters.get(node_id));
1962
1963 DBUG_ENTER("TransporterRegistry::report_disconnect");
1964 DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
1965
1966 #ifdef ERROR_INSERT
1967 if (m_blocked.get(node_id))
1968 {
1969 /* We are simulating real latency, so control events experience
1970 * it too
1971 */
1972 m_blocked_disconnected.set(node_id);
1973 m_disconnect_errors[node_id] = errnum;
1974 DBUG_VOID_RETURN;
1975 }
1976 #endif
1977
1978 performStates[node_id] = DISCONNECTED;
1979 recvdata.m_recv_transporters.clear(node_id);
1980 recvdata.m_has_data_transporters.clear(node_id);
1981 recvdata.m_handled_transporters.clear(node_id);
1982 recvdata.m_bad_data_transporters.clear(node_id);
1983 recvdata.reportDisconnect(node_id, errnum);
1984 DBUG_VOID_RETURN;
1985 }
1986
1987 /**
1988 * We only call TransporterCallback::reportError() from
1989 * TransporterRegistry::update_connections().
1990 *
1991 * In other places we call this method to enqueue the error that will later be
1992 * picked up by update_connections().
1993 */
1994 void
report_error(NodeId nodeId,TransporterError errorCode,const char * errorInfo)1995 TransporterRegistry::report_error(NodeId nodeId, TransporterError errorCode,
1996 const char *errorInfo)
1997 {
1998 if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
1999 m_error_states[nodeId].m_info == (const char *)~(UintPtr)0)
2000 {
2001 m_error_states[nodeId].m_code = errorCode;
2002 m_error_states[nodeId].m_info = errorInfo;
2003 }
2004 }
2005
2006 /**
2007 * update_connections(), together with the thread running in
2008 * start_clients_thread(), handle the state changes for transporters as they
2009 * connect and disconnect.
2010 *
2011 * update_connections on a specific set of recvdata *must not* be run
2012 * concurrently with :performReceive() on the same recvdata. Thus,
2013 * it must either be called from the same (receive-)thread as
2014 * performReceive(), or protected by aquiring the (client) poll rights.
2015 */
2016 void
update_connections(TransporterReceiveHandle & recvdata)2017 TransporterRegistry::update_connections(TransporterReceiveHandle& recvdata)
2018 {
2019 TransporterReceiveWatchdog guard(recvdata);
2020 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2021
2022 for (int i= 0, n= 0; n < nTransporters; i++){
2023 Transporter * t = theTransporters[i];
2024 if (!t)
2025 continue;
2026 n++;
2027
2028 const NodeId nodeId = t->getRemoteNodeId();
2029 if (!recvdata.m_transporters.get(nodeId))
2030 continue;
2031
2032 TransporterError code = m_error_states[nodeId].m_code;
2033 const char *info = m_error_states[nodeId].m_info;
2034 if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
2035 {
2036 recvdata.reportError(nodeId, code, info);
2037 m_error_states[nodeId].m_code = TE_NO_ERROR;
2038 m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
2039 }
2040
2041 switch(performStates[nodeId]){
2042 case CONNECTED:
2043 case DISCONNECTED:
2044 break;
2045 case CONNECTING:
2046 if(t->isConnected())
2047 report_connect(recvdata, nodeId);
2048 break;
2049 case DISCONNECTING:
2050 if(!t->isConnected())
2051 report_disconnect(recvdata, nodeId, m_disconnect_errnum[nodeId]);
2052 break;
2053 }
2054 }
2055 }
2056
2057 /**
2058 * Run as own thread
2059 * Possible blocking parts of transporter connect and diconnect
2060 * is supposed to be handled here.
2061 */
2062 void
start_clients_thread()2063 TransporterRegistry::start_clients_thread()
2064 {
2065 int persist_mgm_count= 0;
2066 DBUG_ENTER("TransporterRegistry::start_clients_thread");
2067 while (m_run_start_clients_thread) {
2068 NdbSleep_MilliSleep(100);
2069 persist_mgm_count++;
2070 if(persist_mgm_count==50)
2071 {
2072 ndb_mgm_check_connection(m_mgm_handle);
2073 persist_mgm_count= 0;
2074 }
2075 for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
2076 Transporter * t = theTransporters[i];
2077 if (!t)
2078 continue;
2079 n++;
2080
2081 const NodeId nodeId = t->getRemoteNodeId();
2082 switch(performStates[nodeId]){
2083 case CONNECTING:
2084 if(!t->isConnected() && !t->isServer) {
2085 if (get_and_clear_node_up_indicator(nodeId))
2086 {
2087 // Other node have indicated that node nodeId is up, try connect
2088 // now and restart backoff sequence
2089 backoff_reset_connecting_time(nodeId);
2090 }
2091 if (!backoff_update_and_check_time_for_connect(nodeId))
2092 {
2093 // Skip connect this time
2094 continue;
2095 }
2096
2097 bool connected= false;
2098 /**
2099 * First, we try to connect (if we have a port number).
2100 */
2101
2102 if (t->get_s_port())
2103 {
2104 DBUG_PRINT("info", ("connecting to node %d using port %d",
2105 nodeId, t->get_s_port()));
2106 connected= t->connect_client();
2107 }
2108
2109 /**
2110 * If dynamic, get the port for connecting from the management server
2111 */
2112 if( !connected && t->get_s_port() <= 0) { // Port is dynamic
2113 int server_port= 0;
2114 struct ndb_mgm_reply mgm_reply;
2115
2116 DBUG_PRINT("info", ("connection to node %d should use "
2117 "dynamic port",
2118 nodeId));
2119
2120 if(!ndb_mgm_is_connected(m_mgm_handle))
2121 ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
2122
2123 if(ndb_mgm_is_connected(m_mgm_handle))
2124 {
2125 DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
2126 nodeId));
2127
2128 const int res=
2129 ndb_mgm_get_connection_int_parameter(m_mgm_handle,
2130 t->getRemoteNodeId(),
2131 t->getLocalNodeId(),
2132 CFG_CONNECTION_SERVER_PORT,
2133 &server_port,
2134 &mgm_reply);
2135 DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
2136 server_port,t->getRemoteNodeId(),
2137 t->getLocalNodeId(),res));
2138 if( res >= 0 )
2139 {
2140 DBUG_PRINT("info", ("got port %d to use for connection to %d",
2141 server_port, nodeId));
2142
2143 if (server_port != 0)
2144 {
2145 if (t->get_s_port() != server_port)
2146 {
2147 // Got a different port number, reset backoff
2148 backoff_reset_connecting_time(nodeId);
2149 }
2150 // Save the new port number
2151 t->set_s_port(server_port);
2152 }
2153 else
2154 {
2155 // Got port number 0, port is not known. Keep the old.
2156 }
2157 }
2158 else if(ndb_mgm_is_connected(m_mgm_handle))
2159 {
2160 DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
2161 res));
2162 g_eventLogger->info("Failed to get dynamic port, res: %d",
2163 res);
2164 ndb_mgm_disconnect(m_mgm_handle);
2165 }
2166 else
2167 {
2168 DBUG_PRINT("info", ("mgmd close connection early"));
2169 g_eventLogger->info
2170 ("Management server closed connection early. "
2171 "It is probably being shut down (or has problems). "
2172 "We will retry the connection. %d %s %s line: %d",
2173 ndb_mgm_get_latest_error(m_mgm_handle),
2174 ndb_mgm_get_latest_error_desc(m_mgm_handle),
2175 ndb_mgm_get_latest_error_msg(m_mgm_handle),
2176 ndb_mgm_get_latest_error_line(m_mgm_handle)
2177 );
2178 }
2179 }
2180 /** else
2181 * We will not be able to get a new port unless
2182 * the m_mgm_handle is connected. Note that not
2183 * being connected is an ok state, just continue
2184 * until it is able to connect. Continue using the
2185 * old port until we can connect again and get a
2186 * new port.
2187 */
2188 }
2189 }
2190 break;
2191 case DISCONNECTING:
2192 if(t->isConnected())
2193 t->doDisconnect();
2194 break;
2195 case DISCONNECTED:
2196 {
2197 if (t->isConnected())
2198 {
2199 g_eventLogger->warning("Found connection to %u in state DISCONNECTED "
2200 " while being connected, disconnecting!",
2201 t->getRemoteNodeId());
2202 t->doDisconnect();
2203 }
2204 break;
2205 }
2206 default:
2207 break;
2208 }
2209 }
2210 }
2211 DBUG_VOID_RETURN;
2212 }
2213
2214 struct NdbThread*
start_clients()2215 TransporterRegistry::start_clients()
2216 {
2217 m_run_start_clients_thread= true;
2218 m_start_clients_thread= NdbThread_Create(run_start_clients_C,
2219 (void**)this,
2220 0, // default stack size
2221 "ndb_start_clients",
2222 NDB_THREAD_PRIO_LOW);
2223 if (m_start_clients_thread == 0)
2224 {
2225 m_run_start_clients_thread= false;
2226 }
2227 return m_start_clients_thread;
2228 }
2229
2230 bool
stop_clients()2231 TransporterRegistry::stop_clients()
2232 {
2233 if (m_start_clients_thread) {
2234 m_run_start_clients_thread= false;
2235 void* status;
2236 NdbThread_WaitFor(m_start_clients_thread, &status);
2237 NdbThread_Destroy(&m_start_clients_thread);
2238 }
2239 return true;
2240 }
2241
2242 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)2243 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
2244 const char *interf,
2245 int s_port)
2246 {
2247 DBUG_ENTER("TransporterRegistry::add_transporter_interface");
2248 DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
2249 if (interf && strlen(interf) == 0)
2250 interf= 0;
2251
2252 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
2253 {
2254 Transporter_interface &tmp= m_transporter_interface[i];
2255 if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
2256 continue;
2257 if (interf != 0 && tmp.m_interface != 0 &&
2258 strcmp(interf, tmp.m_interface) == 0)
2259 {
2260 DBUG_VOID_RETURN; // found match, no need to insert
2261 }
2262 if (interf == 0 && tmp.m_interface == 0)
2263 {
2264 DBUG_VOID_RETURN; // found match, no need to insert
2265 }
2266 }
2267 Transporter_interface t;
2268 t.m_remote_nodeId= remoteNodeId;
2269 t.m_s_service_port= s_port;
2270 t.m_interface= interf;
2271 m_transporter_interface.push_back(t);
2272 DBUG_PRINT("exit",("interface and port added"));
2273 DBUG_VOID_RETURN;
2274 }
2275
2276 bool
start_service(SocketServer & socket_server)2277 TransporterRegistry::start_service(SocketServer& socket_server)
2278 {
2279 DBUG_ENTER("TransporterRegistry::start_service");
2280 if (m_transporter_interface.size() > 0 &&
2281 localNodeId == 0)
2282 {
2283 g_eventLogger->error("INTERNAL ERROR: not initialized");
2284 DBUG_RETURN(false);
2285 }
2286
2287 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
2288 {
2289 Transporter_interface &t= m_transporter_interface[i];
2290
2291 unsigned short port= (unsigned short)t.m_s_service_port;
2292 if(t.m_s_service_port<0)
2293 port= -t.m_s_service_port; // is a dynamic port
2294 TransporterService *transporter_service =
2295 new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
2296 if(!socket_server.setup(transporter_service,
2297 &port, t.m_interface))
2298 {
2299 DBUG_PRINT("info", ("Trying new port"));
2300 port= 0;
2301 if(t.m_s_service_port>0
2302 || !socket_server.setup(transporter_service,
2303 &port, t.m_interface))
2304 {
2305 /*
2306 * If it wasn't a dynamically allocated port, or
2307 * our attempts at getting a new dynamic port failed
2308 */
2309 g_eventLogger->error("Unable to setup transporter service port: %s:%d!\n"
2310 "Please check if the port is already used,\n"
2311 "(perhaps the node is already running)",
2312 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
2313 delete transporter_service;
2314 DBUG_RETURN(false);
2315 }
2316 }
2317 t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
2318 DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
2319 transporter_service->setTransporterRegistry(this);
2320 }
2321 DBUG_RETURN(true);
2322 }
2323
2324 #ifdef NDB_SHM_TRANSPORTER
2325 extern "C"
2326 void
shm_sig_handler(int signo)2327 shm_sig_handler(int signo)
2328 {
2329 g_shm_counter++;
2330 }
2331 #endif
2332
2333 void
startReceiving()2334 TransporterRegistry::startReceiving()
2335 {
2336 DBUG_ENTER("TransporterRegistry::startReceiving");
2337
2338 #ifdef NDB_SHM_TRANSPORTER
2339 m_shm_own_pid = getpid();
2340 if (g_ndb_shm_signum)
2341 {
2342 DBUG_PRINT("info",("Install signal handler for signum %d",
2343 g_ndb_shm_signum));
2344 struct sigaction sa;
2345 NdbThread_set_shm_sigmask(FALSE);
2346 sigemptyset(&sa.sa_mask);
2347 sa.sa_handler = shm_sig_handler;
2348 sa.sa_flags = 0;
2349 int ret;
2350 while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR)
2351 ;
2352 if(ret != 0)
2353 {
2354 DBUG_PRINT("error",("Install failed"));
2355 g_eventLogger->error("Failed to install signal handler for"
2356 " SHM transporter, signum %d, errno: %d (%s)",
2357 g_ndb_shm_signum, errno, strerror(errno));
2358 }
2359 }
2360 #endif // NDB_SHM_TRANSPORTER
2361 DBUG_VOID_RETURN;
2362 }
2363
2364 void
stopReceiving()2365 TransporterRegistry::stopReceiving(){
2366 }
2367
2368 void
startSending()2369 TransporterRegistry::startSending(){
2370 }
2371
2372 void
stopSending()2373 TransporterRegistry::stopSending(){
2374 }
2375
operator <<(NdbOut & out,SignalHeader & sh)2376 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
2377 out << "-- Signal Header --" << endl;
2378 out << "theLength: " << sh.theLength << endl;
2379 out << "gsn: " << sh.theVerId_signalNumber << endl;
2380 out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
2381 out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
2382 out << "sendersSig: " << sh.theSendersSignalId << endl;
2383 out << "theSignalId: " << sh.theSignalId << endl;
2384 out << "trace: " << (int)sh.theTrace << endl;
2385 return out;
2386 }
2387
2388 Transporter*
get_transporter(NodeId nodeId)2389 TransporterRegistry::get_transporter(NodeId nodeId) {
2390 assert(nodeId < maxTransporters);
2391 return theTransporters[nodeId];
2392 }
2393
2394
connect_client(NdbMgmHandle * h)2395 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
2396 {
2397 DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
2398
2399 Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
2400
2401 if(!mgm_nodeid)
2402 {
2403 g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2404 return false;
2405 }
2406 Transporter * t = theTransporters[mgm_nodeid];
2407 if (!t)
2408 {
2409 g_eventLogger->error("%s: %d", __FILE__, __LINE__);
2410 return false;
2411 }
2412
2413 bool res = t->connect_client(connect_ndb_mgmd(h));
2414 if (res == true)
2415 {
2416 performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
2417 }
2418 DBUG_RETURN(res);
2419 }
2420
2421
report_dynamic_ports(NdbMgmHandle h) const2422 bool TransporterRegistry::report_dynamic_ports(NdbMgmHandle h) const
2423 {
2424 // Fill array of nodeid/port pairs for those ports which are dynamic
2425 unsigned num_ports = 0;
2426 ndb_mgm_dynamic_port ports[MAX_NODES];
2427 for(unsigned i = 0; i < m_transporter_interface.size(); i++)
2428 {
2429 const Transporter_interface& ti = m_transporter_interface[i];
2430 if (ti.m_s_service_port >= 0)
2431 continue; // Not a dynamic port
2432
2433 assert(num_ports < NDB_ARRAY_SIZE(ports));
2434 ports[num_ports].nodeid = ti.m_remote_nodeId;
2435 ports[num_ports].port = ti.m_s_service_port;
2436 num_ports++;
2437 }
2438
2439 if (num_ports == 0)
2440 {
2441 // No dynamic ports in use, nothing to report
2442 return true;
2443 }
2444
2445 // Send array of nodeid/port pairs to mgmd
2446 if (ndb_mgm_set_dynamic_ports(h, localNodeId,
2447 ports, num_ports) < 0)
2448 {
2449 g_eventLogger->error("Failed to register dynamic ports, error: %d - '%s'",
2450 ndb_mgm_get_latest_error(h),
2451 ndb_mgm_get_latest_error_desc(h));
2452 return false;
2453 }
2454
2455 return true;
2456 }
2457
2458
2459 /**
2460 * Given a connected NdbMgmHandle, turns it into a transporter
2461 * and returns the socket.
2462 */
connect_ndb_mgmd(NdbMgmHandle * h)2463 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
2464 {
2465 NDB_SOCKET_TYPE sockfd;
2466 my_socket_invalidate(&sockfd);
2467
2468 DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
2469
2470 if ( h==NULL || *h == NULL )
2471 {
2472 g_eventLogger->error("Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
2473 DBUG_RETURN(sockfd);
2474 }
2475
2476 if (!report_dynamic_ports(*h))
2477 {
2478 ndb_mgm_destroy_handle(h);
2479 DBUG_RETURN(sockfd);
2480 }
2481
2482 /**
2483 * convert_to_transporter also disposes of the handle (i.e. we don't leak
2484 * memory here.
2485 */
2486 DBUG_PRINT("info", ("Converting handle to transporter"));
2487 sockfd= ndb_mgm_convert_to_transporter(h);
2488 if (!my_socket_valid(sockfd))
2489 {
2490 g_eventLogger->error("Failed to convert to transporter (%s: %d)",
2491 __FILE__, __LINE__);
2492 ndb_mgm_destroy_handle(h);
2493 }
2494 DBUG_RETURN(sockfd);
2495 }
2496
2497 /**
2498 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
2499 * and returns the socket.
2500 */
2501 NDB_SOCKET_TYPE
connect_ndb_mgmd(const char * server_name,unsigned short server_port)2502 TransporterRegistry::connect_ndb_mgmd(const char* server_name,
2503 unsigned short server_port)
2504 {
2505 NdbMgmHandle h= ndb_mgm_create_handle();
2506 NDB_SOCKET_TYPE s;
2507 my_socket_invalidate(&s);
2508
2509 DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
2510
2511 if ( h == NULL )
2512 {
2513 DBUG_RETURN(s);
2514 }
2515
2516 /**
2517 * Set connectstring
2518 */
2519 {
2520 BaseString cs;
2521 cs.assfmt("%s:%u", server_name, server_port);
2522 ndb_mgm_set_connectstring(h, cs.c_str());
2523 }
2524
2525 if(ndb_mgm_connect(h, 0, 0, 0)<0)
2526 {
2527 DBUG_PRINT("info", ("connection to mgmd failed"));
2528 ndb_mgm_destroy_handle(&h);
2529 DBUG_RETURN(s);
2530 }
2531
2532 DBUG_RETURN(connect_ndb_mgmd(&h));
2533 }
2534
2535 /**
2536 * The calls below are used by all implementations: NDB API, ndbd and
2537 * ndbmtd. The calls to handle->getWritePtr, handle->updateWritePtr
2538 * are handled by special implementations for NDB API, ndbd and
2539 * ndbmtd.
2540 */
2541
2542 Uint32 *
getWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2543 TransporterRegistry::getWritePtr(TransporterSendBufferHandle *handle,
2544 NodeId node, Uint32 lenBytes, Uint32 prio)
2545 {
2546 Transporter *t = theTransporters[node];
2547 Uint32 *insertPtr = handle->getWritePtr(node, lenBytes, prio,
2548 t->get_max_send_buffer());
2549
2550 if (insertPtr == 0) {
2551 //-------------------------------------------------
2552 // Buffer was completely full. We have severe problems.
2553 // We will attempt to wait for a small time
2554 //-------------------------------------------------
2555 if(t->send_is_possible(10)) {
2556 //-------------------------------------------------
2557 // Send is possible after the small timeout.
2558 //-------------------------------------------------
2559 if(!handle->forceSend(node)){
2560 return 0;
2561 } else {
2562 //-------------------------------------------------
2563 // Since send was successful we will make a renewed
2564 // attempt at inserting the signal into the buffer.
2565 //-------------------------------------------------
2566 insertPtr = handle->getWritePtr(node, lenBytes, prio,
2567 t->get_max_send_buffer());
2568 }//if
2569 } else {
2570 return 0;
2571 }//if
2572 }
2573 return insertPtr;
2574 }
2575
2576 void
updateWritePtr(TransporterSendBufferHandle * handle,NodeId node,Uint32 lenBytes,Uint32 prio)2577 TransporterRegistry::updateWritePtr(TransporterSendBufferHandle *handle,
2578 NodeId node, Uint32 lenBytes, Uint32 prio)
2579 {
2580 Transporter *t = theTransporters[node];
2581
2582 Uint32 used = handle->updateWritePtr(node, lenBytes, prio);
2583 t->update_status_overloaded(used);
2584
2585 if(t->send_limit_reached(used)) {
2586 //-------------------------------------------------
2587 // Buffer is full and we are ready to send. We will
2588 // not wait since the signal is already in the buffer.
2589 // Force flag set has the same indication that we
2590 // should always send. If it is not possible to send
2591 // we will not worry since we will soon be back for
2592 // a renewed trial.
2593 //-------------------------------------------------
2594 if(t->send_is_possible(0)) {
2595 //-------------------------------------------------
2596 // Send was possible, attempt at a send.
2597 //-------------------------------------------------
2598 handle->forceSend(node);
2599 }//if
2600 }
2601 }
2602
2603 Uint32
get_bytes_to_send_iovec(NodeId node,struct iovec * dst,Uint32 max)2604 TransporterRegistry::get_bytes_to_send_iovec(NodeId node, struct iovec *dst,
2605 Uint32 max)
2606 {
2607 assert(m_use_default_send_buffer);
2608
2609 if (max == 0)
2610 return 0;
2611
2612 Uint32 count = 0;
2613 SendBuffer *b = m_send_buffers + node;
2614 SendBufferPage *page = b->m_first_page;
2615 while (page != NULL && count < max)
2616 {
2617 dst[count].iov_base = page->m_data+page->m_start;
2618 dst[count].iov_len = page->m_bytes;
2619 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2620 page = page->m_next;
2621 count++;
2622 }
2623
2624 return count;
2625 }
2626
2627 Uint32
bytes_sent(NodeId node,Uint32 bytes)2628 TransporterRegistry::bytes_sent(NodeId node, Uint32 bytes)
2629 {
2630 assert(m_use_default_send_buffer);
2631
2632 SendBuffer *b = m_send_buffers + node;
2633 Uint32 used_bytes = b->m_used_bytes;
2634
2635 if (bytes == 0)
2636 return used_bytes;
2637
2638 used_bytes -= bytes;
2639 b->m_used_bytes = used_bytes;
2640
2641 SendBufferPage *page = b->m_first_page;
2642 while (bytes && bytes >= page->m_bytes)
2643 {
2644 SendBufferPage * tmp = page;
2645 bytes -= page->m_bytes;
2646 page = page->m_next;
2647 release_page(tmp);
2648 }
2649
2650 if (used_bytes == 0)
2651 {
2652 b->m_first_page = 0;
2653 b->m_last_page = 0;
2654 }
2655 else
2656 {
2657 page->m_start += bytes;
2658 page->m_bytes -= bytes;
2659 assert(page->m_start + page->m_bytes <= page->max_data_bytes());
2660 b->m_first_page = page;
2661 }
2662
2663 return used_bytes;
2664 }
2665
2666 bool
has_data_to_send(NodeId node)2667 TransporterRegistry::has_data_to_send(NodeId node)
2668 {
2669 assert(m_use_default_send_buffer);
2670
2671 SendBuffer *b = m_send_buffers + node;
2672 return (b->m_first_page != NULL && b->m_first_page->m_bytes);
2673 }
2674
2675 void
reset_send_buffer(NodeId node,bool should_be_empty)2676 TransporterRegistry::reset_send_buffer(NodeId node, bool should_be_empty)
2677 {
2678 assert(m_use_default_send_buffer);
2679
2680 // Make sure that buffer is already empty if the "should_be_empty"
2681 // flag is set. This is done to quickly catch any stray signals
2682 // written to the send buffer while not being connected
2683 if (should_be_empty && !has_data_to_send(node))
2684 return;
2685 assert(!should_be_empty);
2686
2687 SendBuffer *b = m_send_buffers + node;
2688 SendBufferPage *page = b->m_first_page;
2689 while (page != NULL)
2690 {
2691 SendBufferPage *next = page->m_next;
2692 release_page(page);
2693 page = next;
2694 }
2695 b->m_first_page = NULL;
2696 b->m_last_page = NULL;
2697 b->m_used_bytes = 0;
2698 }
2699
2700 TransporterRegistry::SendBufferPage *
alloc_page()2701 TransporterRegistry::alloc_page()
2702 {
2703 SendBufferPage *page = m_page_freelist;
2704 if (page != NULL)
2705 {
2706 m_tot_used_buffer_memory += SendBufferPage::PGSIZE;
2707 m_page_freelist = page->m_next;
2708 return page;
2709 }
2710
2711 ndbout << "ERROR: out of send buffers in kernel." << endl;
2712 return NULL;
2713 }
2714
2715 void
release_page(SendBufferPage * page)2716 TransporterRegistry::release_page(SendBufferPage *page)
2717 {
2718 assert(page != NULL);
2719 page->m_next = m_page_freelist;
2720 m_tot_used_buffer_memory -= SendBufferPage::PGSIZE;
2721 m_page_freelist = page;
2722 }
2723
2724 /**
2725 * These are the TransporterSendBufferHandle methods used by the
2726 * single-threaded ndbd.
2727 */
2728 Uint32 *
getWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio,Uint32 max_use)2729 TransporterRegistry::getWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio,
2730 Uint32 max_use)
2731 {
2732 assert(m_use_default_send_buffer);
2733
2734 SendBuffer *b = m_send_buffers + node;
2735
2736 /* First check if we have room in already allocated page. */
2737 SendBufferPage *page = b->m_last_page;
2738 if (page != NULL && page->m_bytes + page->m_start + lenBytes <= page->max_data_bytes())
2739 {
2740 return (Uint32 *)(page->m_data + page->m_start + page->m_bytes);
2741 }
2742
2743 if (b->m_used_bytes + lenBytes > max_use)
2744 return NULL;
2745
2746 /* Allocate a new page. */
2747 page = alloc_page();
2748 if (page == NULL)
2749 return NULL;
2750 page->m_next = NULL;
2751 page->m_bytes = 0;
2752 page->m_start = 0;
2753
2754 if (b->m_last_page == NULL)
2755 {
2756 b->m_first_page = page;
2757 b->m_last_page = page;
2758 }
2759 else
2760 {
2761 assert(b->m_first_page != NULL);
2762 b->m_last_page->m_next = page;
2763 b->m_last_page = page;
2764 }
2765 return (Uint32 *)(page->m_data);
2766 }
2767
2768 /**
2769 * This is used by the ndbd, so here only one thread is using this, so
2770 * values will always be consistent.
2771 */
2772 void
getSendBufferLevel(NodeId node,SB_LevelType & level)2773 TransporterRegistry::getSendBufferLevel(NodeId node, SB_LevelType &level)
2774 {
2775 SendBuffer *b = m_send_buffers + node;
2776 calculate_send_buffer_level(b->m_used_bytes,
2777 m_tot_send_buffer_memory,
2778 m_tot_used_buffer_memory,
2779 0,
2780 level);
2781 return;
2782 }
2783
2784 Uint32
updateWritePtr(NodeId node,Uint32 lenBytes,Uint32 prio)2785 TransporterRegistry::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2786 {
2787 assert(m_use_default_send_buffer);
2788
2789 SendBuffer *b = m_send_buffers + node;
2790 SendBufferPage *page = b->m_last_page;
2791 assert(page != NULL);
2792 assert(page->m_bytes + lenBytes <= page->max_data_bytes());
2793 page->m_bytes += lenBytes;
2794 b->m_used_bytes += lenBytes;
2795 return b->m_used_bytes;
2796 }
2797
2798 bool
forceSend(NodeId node)2799 TransporterRegistry::forceSend(NodeId node)
2800 {
2801 Transporter *t = get_transporter(node);
2802 if (t)
2803 return t->doSend();
2804 else
2805 return false;
2806 }
2807
2808
2809 void
print_transporters(const char * where,NdbOut & out)2810 TransporterRegistry::print_transporters(const char* where, NdbOut& out)
2811 {
2812 out << where << " >>" << endl;
2813
2814 for(unsigned i = 0; i < maxTransporters; i++){
2815 if(theTransporters[i] == NULL)
2816 continue;
2817
2818 const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
2819 struct in_addr conn_addr = get_connect_address(remoteNodeId);
2820 char addr_buf[NDB_ADDR_STRLEN];
2821 char *addr_str = Ndb_inet_ntop(AF_INET,
2822 static_cast<void*>(&conn_addr),
2823 addr_buf,
2824 (socklen_t)sizeof(addr_buf));
2825
2826 out << i << " "
2827 << getPerformStateString(remoteNodeId) << " to node: "
2828 << remoteNodeId << " at " << addr_str << endl;
2829 }
2830
2831 out << "<<" << endl;
2832
2833 for (unsigned i= 0; i < m_transporter_interface.size(); i++){
2834 Transporter_interface tf= m_transporter_interface[i];
2835
2836 out << i
2837 << " remote node: " << tf.m_remote_nodeId
2838 << " port: " << tf.m_s_service_port
2839 << " interface: " << tf.m_interface << endl;
2840 }
2841 }
2842
2843 void
inc_overload_count(Uint32 nodeId)2844 TransporterRegistry::inc_overload_count(Uint32 nodeId)
2845 {
2846 assert(nodeId < MAX_NODES);
2847 assert(theTransporters[nodeId] != NULL);
2848 theTransporters[nodeId]->inc_overload_count();
2849 }
2850
2851 void
inc_slowdown_count(Uint32 nodeId)2852 TransporterRegistry::inc_slowdown_count(Uint32 nodeId)
2853 {
2854 assert(nodeId < MAX_NODES);
2855 assert(theTransporters[nodeId] != NULL);
2856 theTransporters[nodeId]->inc_slowdown_count();
2857 }
2858
2859 Uint32
get_overload_count(Uint32 nodeId)2860 TransporterRegistry::get_overload_count(Uint32 nodeId)
2861 {
2862 assert(nodeId < MAX_NODES);
2863 assert(theTransporters[nodeId] != NULL);
2864 return theTransporters[nodeId]->get_overload_count();
2865 }
2866
2867 Uint32
get_slowdown_count(Uint32 nodeId)2868 TransporterRegistry::get_slowdown_count(Uint32 nodeId)
2869 {
2870 assert(nodeId < MAX_NODES);
2871 assert(theTransporters[nodeId] != NULL);
2872 return theTransporters[nodeId]->get_slowdown_count();
2873 }
2874
2875 Uint32
get_connect_count(Uint32 nodeId)2876 TransporterRegistry::get_connect_count(Uint32 nodeId)
2877 {
2878 assert(nodeId < MAX_NODES);
2879 assert(theTransporters[nodeId] != NULL);
2880 return theTransporters[nodeId]->get_connect_count();
2881 }
2882
2883 /**
2884 * We calculate the risk level for a send buffer.
2885 * The primary instrument is the current size of
2886 * the node send buffer. However if the total
2887 * buffer for all send buffers is also close to
2888 * empty, then we will adjust the node send
2889 * buffer size for this. In this manner a very
2890 * contested total buffer will also slow down
2891 * the entire node operation.
2892 */
2893 void
calculate_send_buffer_level(Uint64 node_send_buffer_size,Uint64 total_send_buffer_size,Uint64 total_used_send_buffer_size,Uint32 num_threads,SB_LevelType & level)2894 calculate_send_buffer_level(Uint64 node_send_buffer_size,
2895 Uint64 total_send_buffer_size,
2896 Uint64 total_used_send_buffer_size,
2897 Uint32 num_threads,
2898 SB_LevelType &level)
2899 {
2900 Uint64 percentage =
2901 (total_used_send_buffer_size * 100) / total_send_buffer_size;
2902
2903 if (percentage < 90)
2904 {
2905 ;
2906 }
2907 else if (percentage < 95)
2908 {
2909 node_send_buffer_size *= 2;
2910 }
2911 else if (percentage < 97)
2912 {
2913 node_send_buffer_size *= 4;
2914 }
2915 else if (percentage < 98)
2916 {
2917 node_send_buffer_size *= 8;
2918 }
2919 else if (percentage < 99)
2920 {
2921 node_send_buffer_size *= 16;
2922 }
2923 else
2924 {
2925 level = SB_CRITICAL_LEVEL;
2926 return;
2927 }
2928
2929 if (node_send_buffer_size < 128 * 1024)
2930 {
2931 level = SB_NO_RISK_LEVEL;
2932 return;
2933 }
2934 else if (node_send_buffer_size < 256 * 1024)
2935 {
2936 level = SB_LOW_LEVEL;
2937 return;
2938 }
2939 else if (node_send_buffer_size < 384 * 1024)
2940 {
2941 level = SB_MEDIUM_LEVEL;
2942 return;
2943 }
2944 else if (node_send_buffer_size < 1024 * 1024)
2945 {
2946 level = SB_HIGH_LEVEL;
2947 return;
2948 }
2949 else if (node_send_buffer_size < 2 * 1024 * 1024)
2950 {
2951 level = SB_RISK_LEVEL;
2952 return;
2953 }
2954 else
2955 {
2956 level = SB_CRITICAL_LEVEL;
2957 return;
2958 }
2959 }
2960
2961 template class Vector<TransporterRegistry::Transporter_interface>;
2962