1 /*
2 Copyright (c) 2003, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include <TransporterRegistry.hpp>
28 #include "TransporterInternalDefinitions.hpp"
29
30 #include "Transporter.hpp"
31 #include <SocketAuthenticator.hpp>
32 #include "BlockNumbers.h"
33
34 #include "TCP_Transporter.hpp"
35 #include "Multi_Transporter.hpp"
36 #include "Loopback_Transporter.hpp"
37
38 #ifndef WIN32
39 #include "SHM_Transporter.hpp"
40 #endif
41
42 #include "NdbOut.hpp"
43 #include <NdbSleep.h>
44 #include <NdbMutex.h>
45 #include <NdbSpin.h>
46 #include <InputStream.hpp>
47 #include <OutputStream.hpp>
48 #include <socket_io.h>
49
50 #include <mgmapi/mgmapi.h>
51 #include <mgmapi_internal.h>
52 #include <mgmapi/mgmapi_debug.h>
53
54 #include <EventLogger.hpp>
55 extern EventLogger * g_eventLogger;
56
57 #if 0
58 #define DEBUG_FPRINTF(arglist) do { fprintf arglist ; } while (0)
59 #else
60 #define DEBUG_FPRINTF(a)
61 #endif
62
63 /**
64 * There is a requirement in the Transporter design that
65 * ::performReceive() and ::update_connections()
66 * on the same 'TransporterReceiveHandle' should not be
67 * run concurrently. class TransporterReceiveWatchdog provides a
68 * simple mechanism to assert that this rule is followed.
69 * Does nothing if NDEBUG is defined (in production code)
70 */
71 class TransporterReceiveWatchdog
72 {
73 public:
74 #ifdef NDEBUG
TransporterReceiveWatchdog(TransporterReceiveHandle & recvdata)75 TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
76 {}
77
78 #else
79 TransporterReceiveWatchdog(TransporterReceiveHandle& recvdata)
80 : m_recvdata(recvdata)
81 {
82 assert(m_recvdata.m_active == false);
83 m_recvdata.m_active = true;
84 }
85
86 ~TransporterReceiveWatchdog()
87 {
88 assert(m_recvdata.m_active == true);
89 m_recvdata.m_active = false;
90 }
91
92 private:
93 TransporterReceiveHandle& m_recvdata;
94 #endif
95 };
96
97
98 struct in_addr
get_connect_address(NodeId node_id) const99 TransporterRegistry::get_connect_address(NodeId node_id) const
100 {
101 return theNodeIdTransporters[node_id]->m_connect_address;
102 }
103
104 Uint64
get_bytes_sent(NodeId node_id) const105 TransporterRegistry::get_bytes_sent(NodeId node_id) const
106 {
107 return theNodeIdTransporters[node_id]->get_bytes_sent();
108 }
109
110 Uint64
get_bytes_received(NodeId node_id) const111 TransporterRegistry::get_bytes_received(NodeId node_id) const
112 {
113 return theNodeIdTransporters[node_id]->get_bytes_received();
114 }
115
newSession(NDB_SOCKET_TYPE sockfd)116 SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
117 {
118 DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
119 DEBUG_FPRINTF((stderr, "New session created\n"));
120 if (m_auth && !m_auth->server_authenticate(sockfd))
121 {
122 DEBUG_FPRINTF((stderr, "Failed to authenticate new session\n"));
123 ndb_socket_close_with_reset(sockfd, true); // Close with reset
124 DBUG_RETURN(0);
125 }
126
127 BaseString msg;
128 bool close_with_reset = true;
129 bool log_failure = false;
130 if (!m_transporter_registry->connect_server(sockfd,
131 msg,
132 close_with_reset,
133 log_failure))
134 {
135 DEBUG_FPRINTF((stderr, "New session failed in connect_server\n"));
136 ndb_socket_close_with_reset(sockfd, close_with_reset);
137 if (log_failure)
138 {
139 g_eventLogger->warning("TR : %s", msg.c_str());
140 }
141 DBUG_RETURN(0);
142 }
143
144 DBUG_RETURN(0);
145 }
146
TransporterReceiveData()147 TransporterReceiveData::TransporterReceiveData()
148 : m_transporters(),
149 m_recv_transporters(),
150 m_has_data_transporters(),
151 m_handled_transporters(),
152 m_bad_data_transporters(),
153 m_last_trp_id(0)
154 {
155 /**
156 * With multi receiver threads
157 * an interface to reassign these is needed...
158 */
159 m_transporters.set(); // Handle all
160 m_transporters.clear(Uint32(0)); // Except wakeup socket...
161
162 #if defined(HAVE_EPOLL_CREATE)
163 m_epoll_fd = -1;
164 m_epoll_events = 0;
165 #endif
166 }
167
168 bool
init(unsigned maxTransporters)169 TransporterReceiveData::init(unsigned maxTransporters)
170 {
171 maxTransporters += 1; /* wakeup socket */
172 m_spintime = 0;
173 m_total_spintime = 0;
174 #if defined(HAVE_EPOLL_CREATE)
175 m_epoll_fd = epoll_create(maxTransporters);
176 if (m_epoll_fd == -1)
177 {
178 perror("epoll_create failed... falling back to select!");
179 goto fallback;
180 }
181 m_epoll_events = new struct epoll_event[maxTransporters];
182 if (m_epoll_events == 0)
183 {
184 perror("Failed to alloc epoll-array... falling back to select!");
185 close(m_epoll_fd);
186 m_epoll_fd = -1;
187 goto fallback;
188 }
189 memset(m_epoll_events, 0, maxTransporters * sizeof(struct epoll_event));
190 return true;
191 fallback:
192 #endif
193 return m_socket_poller.set_max_count(maxTransporters);
194 }
195
196 bool
epoll_add(Transporter * t)197 TransporterReceiveData::epoll_add(Transporter *t)
198 {
199 assert(m_transporters.get(t->getTransporterIndex()));
200 #if defined(HAVE_EPOLL_CREATE)
201 if (m_epoll_fd != -1)
202 {
203 bool add = true;
204 struct epoll_event event_poll;
205 memset(&event_poll, 0, sizeof(event_poll));
206 NDB_SOCKET_TYPE sock_fd = t->getSocket();
207 int node_id = t->getRemoteNodeId();
208 int op = EPOLL_CTL_ADD;
209 int ret_val, error;
210
211 if (!ndb_socket_valid(sock_fd))
212 return FALSE;
213
214 event_poll.data.u32 = t->getTransporterIndex();
215 event_poll.events = EPOLLIN;
216 ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
217 if (!ret_val)
218 goto ok;
219 error= errno;
220 if (error == ENOENT && !add)
221 {
222 /*
223 * Could be that socket was closed premature to this call.
224 * Not a problem that this occurs.
225 */
226 goto ok;
227 }
228 if (!add || (add && (error != ENOMEM)))
229 {
230 /*
231 * Serious problems, we are either using wrong parameters,
232 * have permission problems or the socket doesn't support
233 * epoll!!
234 */
235 ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
236 " node %u to epoll-set,"
237 " errno: %u %s",
238 add ? "ADD" : "DEL",
239 m_epoll_fd,
240 MY_SOCKET_FORMAT_VALUE(sock_fd),
241 node_id,
242 error,
243 strerror(error));
244 abort();
245 }
246 ndbout << "We lacked memory to add the socket for node id ";
247 ndbout << node_id << endl;
248 return false;
249 }
250
251 ok:
252 #endif
253 return true;
254 }
255
~TransporterReceiveData()256 TransporterReceiveData::~TransporterReceiveData()
257 {
258 #if defined(HAVE_EPOLL_CREATE)
259 if (m_epoll_fd != -1)
260 {
261 close(m_epoll_fd);
262 m_epoll_fd = -1;
263 }
264
265 if (m_epoll_events)
266 {
267 delete [] m_epoll_events;
268 m_epoll_events = 0;
269 }
270 #endif
271 }
272
TransporterRegistry(TransporterCallback * callback,TransporterReceiveHandle * recvHandle,unsigned _maxTransporters)273 TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
274 TransporterReceiveHandle *recvHandle,
275 unsigned _maxTransporters) :
276 callbackObj(callback),
277 receiveHandle(recvHandle),
278 m_mgm_handle(0),
279 sendCounter(1),
280 localNodeId(0),
281 maxTransporters(_maxTransporters),
282 nTransporters(0),
283 nMultiTransporters(0),
284 nTCPTransporters(0), nSHMTransporters(0),
285 connectBackoffMaxTime(0),
286 m_transp_count(1),
287 m_total_max_send_buffer(0)
288 {
289 if (receiveHandle != 0)
290 {
291 receiveHandle->nTCPTransporters = 0;
292 receiveHandle->nSHMTransporters = 0;
293 }
294 DBUG_ENTER("TransporterRegistry::TransporterRegistry");
295
296 allTransporters = new Transporter* [maxTransporters];
297 theTCPTransporters = new TCP_Transporter * [maxTransporters];
298 theSHMTransporters = new SHM_Transporter * [maxTransporters];
299 theTransporterTypes = new TransporterType [MAX_NODES];
300 theNodeIdTransporters = new Transporter * [MAX_NODES];
301 theMultiTransporters = new Multi_Transporter * [MAX_NODES];
302 performStates = new PerformState [MAX_NODES];
303 ioStates = new IOState [MAX_NODES];
304 peerUpIndicators = new bool [maxTransporters];
305 connectingTime = new Uint32 [maxTransporters];
306 m_disconnect_errnum = new int [maxTransporters];
307 m_disconnect_enomem_error = new Uint32 [maxTransporters];
308 m_error_states = new ErrorState [maxTransporters];
309
310 m_has_extra_wakeup_socket = false;
311
312 #ifdef ERROR_INSERT
313 m_blocked.clear();
314 m_blocked_trp.clear();
315 m_blocked_disconnected.clear();
316 m_sendBlocked.clear();
317
318 m_mixology_level = 0;
319 #endif
320
321 // Initialize the transporter arrays
322 ErrorState default_error_state = { TE_NO_ERROR, (const char *)~(UintPtr)0 };
323 for (unsigned i = 0; i < MAX_NODES; i++)
324 {
325 theNodeIdTransporters[i] = NULL;
326 theMultiTransporters[i] = NULL;
327 performStates[i] = DISCONNECTED;
328 ioStates[i] = NoHalt;
329 peerUpIndicators[i] = true; // Assume all nodes are up, will be
330 // cleared at first connect attempt
331 connectingTime[i] = 0;
332 m_disconnect_errnum[i]= 0;
333 m_disconnect_enomem_error[i] = 0;
334 m_error_states[i] = default_error_state;
335 }
336 for (unsigned i = 0; i < maxTransporters; i++)
337 {
338 allTransporters[i] = NULL;
339 theTCPTransporters[i] = NULL;
340 theSHMTransporters[i] = NULL;
341 }
342 theMultiTransporterMutex = NdbMutex_Create();
343 DBUG_VOID_RETURN;
344 }
345
set_mgm_handle(NdbMgmHandle h)346 void TransporterRegistry::set_mgm_handle(NdbMgmHandle h)
347 {
348 DBUG_ENTER("TransporterRegistry::set_mgm_handle");
349 if (m_mgm_handle)
350 ndb_mgm_destroy_handle(&m_mgm_handle);
351 m_mgm_handle= h;
352 ndb_mgm_set_timeout(m_mgm_handle, 5000);
353 #ifndef DBUG_OFF
354 if (h)
355 {
356 char buf[256];
357 DBUG_PRINT("info",("handle set with connectstring: %s",
358 ndb_mgm_get_connectstring(h,buf, sizeof(buf))));
359 }
360 else
361 {
362 DBUG_PRINT("info",("handle set to NULL"));
363 }
364 #endif
365 DBUG_VOID_RETURN;
366 }
367
~TransporterRegistry()368 TransporterRegistry::~TransporterRegistry()
369 {
370 DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
371
372 disconnectAll();
373 removeAll();
374
375 delete[] allTransporters;
376 delete[] theTCPTransporters;
377 delete[] theSHMTransporters;
378 delete[] theTransporterTypes;
379 delete[] theMultiTransporters;
380 delete[] theNodeIdTransporters;
381 delete[] performStates;
382 delete[] ioStates;
383 delete[] peerUpIndicators;
384 delete[] connectingTime;
385 delete[] m_disconnect_errnum;
386 delete[] m_disconnect_enomem_error;
387 delete[] m_error_states;
388
389 if (m_mgm_handle)
390 ndb_mgm_destroy_handle(&m_mgm_handle);
391
392 if (m_has_extra_wakeup_socket)
393 {
394 ndb_socket_close(m_extra_wakeup_sockets[0]);
395 ndb_socket_close(m_extra_wakeup_sockets[1]);
396 }
397 NdbMutex_Destroy(theMultiTransporterMutex);
398 DBUG_VOID_RETURN;
399 }
400
401 void
removeAll()402 TransporterRegistry::removeAll()
403 {
404 for (Uint32 i = 0; i < nTCPTransporters; i++)
405 {
406 delete theTCPTransporters[i];
407 }
408 for (Uint32 i = 0; i < nSHMTransporters; i++)
409 {
410 delete theSHMTransporters[i];
411 }
412 for (Uint32 i = 0; i < nMultiTransporters; i++)
413 {
414 delete theMultiTransporters[i];
415 }
416 nTransporters = 0;
417 nTCPTransporters = 0;
418 nSHMTransporters = 0;
419 nMultiTransporters = 0;
420 }
421
422 void
disconnectAll()423 TransporterRegistry::disconnectAll(){
424 DEBUG_FPRINTF((stderr, "(%u)doDisconnect(all), line: %d\n",
425 localNodeId, __LINE__));
426 for (Uint32 i = 0; i < nTCPTransporters; i++)
427 {
428 theTCPTransporters[i]->doDisconnect();
429 }
430 #ifndef WIN32
431 for (Uint32 i = 0; i < nSHMTransporters; i++)
432 {
433 theSHMTransporters[i]->doDisconnect();
434 }
435 #endif
436 }
437
438 bool
init(NodeId nodeId)439 TransporterRegistry::init(NodeId nodeId) {
440 DBUG_ENTER("TransporterRegistry::init");
441 assert(localNodeId == 0 ||
442 localNodeId == nodeId);
443
444 localNodeId = nodeId;
445
446 DEBUG("TransporterRegistry started node: " << localNodeId);
447
448 if (receiveHandle)
449 {
450 if (!init(* receiveHandle))
451 DBUG_RETURN(false);
452 }
453
454 DBUG_RETURN(true);
455 }
456
457 bool
init(TransporterReceiveHandle & recvhandle)458 TransporterRegistry::init(TransporterReceiveHandle& recvhandle)
459 {
460 recvhandle.nTCPTransporters = nTCPTransporters;
461 recvhandle.nSHMTransporters = nSHMTransporters;
462 return recvhandle.init(maxTransporters);
463 }
464
465 bool
connect_server(NDB_SOCKET_TYPE sockfd,BaseString & msg,bool & close_with_reset,bool & log_failure)466 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
467 BaseString & msg,
468 bool& close_with_reset,
469 bool& log_failure)
470 {
471 DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
472
473 log_failure = true;
474
475 // Read "hello" that consists of node id and other info
476 // from client
477 SocketInputStream s_input(sockfd);
478 char buf[256]; // <int> <int> <int> <int> <..expansion..>
479 if (s_input.gets(buf, sizeof(buf)) == 0) {
480 /* Could be spurious connection, need not log */
481 log_failure = false;
482 msg.assfmt("Ignored connection attempt as failed to "
483 "read 'hello' from client");
484 DBUG_PRINT("error", ("%s", msg.c_str()));
485 DEBUG_FPRINTF((stderr, "%s", msg.c_str()));
486 DBUG_RETURN(false);
487 }
488
489 int nodeId;
490 int remote_transporter_type;
491 int serverNodeId = -1;
492 int multi_transporter_instance = -1;
493 int r= sscanf(buf, "%d %d %d %d",
494 &nodeId,
495 &remote_transporter_type,
496 &serverNodeId,
497 &multi_transporter_instance);
498 switch (r) {
499 case 4:
500 /* Latest version client */
501 break;
502 case 3:
503 /* Older client, sending just nodeid, transporter type, serverNodeId */
504 break;
505 case 2:
506 /* Older client, sending just nodeid and transporter type */
507 break;
508 default:
509 /* Could be spurious connection, need not log */
510 log_failure = false;
511 msg.assfmt("Ignored connection attempt as failed to "
512 "parse 'hello' from client. >%s<", buf);
513 DBUG_PRINT("error", ("%s", msg.c_str()));
514 DEBUG_FPRINTF((stderr, "%s", msg.c_str()));
515 DBUG_RETURN(false);
516 }
517
518 DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d "
519 "server nodeid %d instance %d",
520 nodeId,
521 remote_transporter_type,
522 serverNodeId,
523 multi_transporter_instance));
524 /*
525 DEBUG_FPRINTF((stderr, "Client hello, nodeId: %d transporter type: %d "
526 "server nodeid %d instance %d",
527 nodeId,
528 remote_transporter_type,
529 serverNodeId,
530 multi_transporter_instace));
531 */
532
533 // Check that nodeid is in range before accessing the arrays
534 if (nodeId < 0 ||
535 nodeId > (int)MAX_NODES)
536 {
537 /* Strange, log it */
538 msg.assfmt("Ignored connection attempt as client "
539 "nodeid %u out of range", nodeId);
540 DBUG_PRINT("error", ("%s", msg.c_str()));
541 DBUG_RETURN(false);
542 }
543
544 lockMultiTransporters();
545 // Check that transporter is allocated
546 Transporter *t= theNodeIdTransporters[nodeId];
547 if (t == 0)
548 {
549 unlockMultiTransporters();
550 /* Strange, log it */
551 msg.assfmt("Ignored connection attempt as client "
552 "nodeid %u is undefined.",
553 nodeId);
554 DBUG_PRINT("error", ("%s", msg.c_str()));
555 DBUG_RETURN(false);
556 }
557
558 // Check transporter type
559 if (remote_transporter_type != t->m_type &&
560 t->m_type != tt_Multi_TRANSPORTER) // Checked later
561 {
562 unlockMultiTransporters();
563 /* Strange, log it */
564 msg.assfmt("Connection attempt from client node %u failed as transporter "
565 "type %u is not as expected %u.",
566 nodeId,
567 remote_transporter_type,
568 t->m_type);
569 DBUG_RETURN(false);
570 }
571
572 // Check that the serverNodeId is correct
573 if (serverNodeId != -1)
574 {
575 /* Check that incoming connection was meant for us */
576 if (serverNodeId != t->getLocalNodeId())
577 {
578 unlockMultiTransporters();
579 /* Strange, log it */
580 msg.assfmt("Ignored connection attempt as client "
581 "node %u attempting to connect to node %u, "
582 "but this is node %u.",
583 nodeId,
584 serverNodeId,
585 t->getLocalNodeId());
586 DBUG_PRINT("error", ("%s", msg.c_str()));
587 DBUG_RETURN(false);
588 }
589 }
590
591 bool correct_state = false;
592 Multi_Transporter *multi_trp =
593 get_node_multi_transporter(nodeId);
594 if (multi_trp &&
595 multi_transporter_instance > 0)
596 {
597 /* Specific path for non-zero multi transporter instances */
598 DEBUG_FPRINTF((stderr, "connect_server multi trp, node %u instance %u\n",
599 t->getRemoteNodeId(),
600 multi_transporter_instance));
601
602 if (performStates[nodeId] == TransporterRegistry::CONNECTED)
603 {
604 /**
605 * The connection is ok if the following is true:
606 * 1) The transporter is a Multi_Transporter object
607 * 2) The instance exists as inactive transporter
608 * 3) The inactive instance referred to isn't already
609 * connected
610 *
611 * If this all are true we will replace the multi transporter
612 * instance with the transporter instance to connect in this
613 * instance.
614 */
615 if ((multi_transporter_instance - 1) <
616 int(multi_trp->get_num_inactive_transporters()))
617 {
618 Transporter *inst_trp =
619 multi_trp->get_inactive_transporter(multi_transporter_instance - 1);
620
621 /* Check type now that we have the actual instance */
622 if (remote_transporter_type == inst_trp->m_type)
623 {
624 if (!inst_trp->isConnected())
625 {
626 /**
627 * Continue connection setup with
628 * multi-transporter specific instance
629 */
630 correct_state = true;
631 t = inst_trp;
632 }
633 else
634 {
635 /* Strange, log it */
636 msg.assfmt("Ignored connection attempt from node %u as multi "
637 "transporter instance %u already connected.",
638 nodeId,
639 multi_transporter_instance);
640 }
641 }
642 else
643 {
644 /* Strange, log it */
645 msg.assfmt("Ignored multi transporter connection attempt "
646 "from node %u instance %u as transporter "
647 "type %u is not as expected %u",
648 nodeId,
649 multi_transporter_instance,
650 remote_transporter_type,
651 inst_trp->m_type);
652 }
653 }
654 else
655 {
656 /* Strange, log it */
657 msg.assfmt("Ignored connection attempt from node %u as multi "
658 "transporter instance %u is not in range.",
659 nodeId,
660 multi_transporter_instance);
661 }
662 }
663 }
664 else
665 {
666 /**
667 * Normal connection setup
668 * Sub cases :
669 * Normal setup for non-multi trp : multi = 0, instance = 0
670 * Normal setup from non-multi trp from
671 * old version : multi = 0, instance = -1
672 * Normal setup for multi trp instance 0 : multi = 1, instance = 0
673 * Normal setup from for multi trp
674 * from old version : multi = 1, instance = -1
675 *
676 * Not supported :
677 * multi = 0, instance > 0 : Invalid
678 */
679 if (!multi_trp)
680 {
681 if (multi_transporter_instance > 0)
682 {
683 unlockMultiTransporters();
684 /* Strange, log it */
685 msg.assfmt("Ignored connection attempt from node %u as multi "
686 "transporter instance %d specified for non multi-transporter",
687 nodeId, multi_transporter_instance);
688 DBUG_RETURN(false);
689 }
690 }
691 else
692 {
693 /* multi_trp */
694 require(multi_transporter_instance <= 0);
695
696 /* Continue connection setup with specific instance 0 */
697 require(multi_trp->get_num_active_transporters() == 1);
698 t = multi_trp->get_active_transporter(0);
699 }
700
701 /**
702 * Normal connection setup requires state to be in CONNECTING
703 */
704 if (performStates[nodeId] == TransporterRegistry::CONNECTING)
705 {
706 correct_state = true;
707 }
708 else
709 {
710 msg.assfmt("Ignored connection attempt as this node "
711 "is not expecting a connection from node %u. "
712 "State %u",
713 nodeId,
714 performStates[nodeId]);
715
716 /**
717 * This is expected if the transporter state is DISCONNECTED,
718 * otherwise it's a bit strange
719 */
720 log_failure = (performStates[nodeId] != TransporterRegistry::DISCONNECTED);
721
722 DEBUG_FPRINTF((stderr, "%s", msg.c_str()));
723 }
724 }
725 unlockMultiTransporters();
726
727 // Check that the transporter should be connecting
728 if (!correct_state)
729 {
730 DBUG_PRINT("error", ("Transporter for node id %d in wrong state %s",
731 nodeId, msg.c_str()));
732 /*
733 DEBUG_FPRINTF((stderr, "Transporter for node id %d in wrong state %s\n",
734 nodeId, msg.c_str()));
735 */
736
737 // Avoid TIME_WAIT on server by requesting client to close connection
738 SocketOutputStream s_output(sockfd);
739 if (s_output.println("BYE") < 0)
740 {
741 // Failed to request client close
742 DBUG_PRINT("error", ("Failed to send client BYE"));
743 DBUG_RETURN(false);
744 }
745
746 // Wait for to close connection by reading EOF(i.e read returns 0)
747 const int read_eof_timeout = 1000; // Fairly short timeout
748 if (read_socket(sockfd, read_eof_timeout,
749 buf, sizeof(buf)) == 0)
750 {
751 // Client gracefully closed connection, turn off close_with_reset
752 close_with_reset = false;
753 DBUG_RETURN(false);
754 }
755
756 // Failed to request client close
757 DBUG_RETURN(false);
758 }
759
760 // Send reply to client
761 SocketOutputStream s_output(sockfd);
762 if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
763 {
764 /* Strange, log it */
765 msg.assfmt("Connection attempt failed due to error sending "
766 "reply to client node %u",
767 nodeId);
768 DBUG_PRINT("error", ("%s", msg.c_str()));
769 DBUG_RETURN(false);
770 }
771
772 // Setup transporter (transporter responsible for closing sockfd)
773 DEBUG_FPRINTF((stderr, "connect_server for trp_id %u\n",
774 t->getTransporterIndex()));
775 DBUG_RETURN(t->connect_server(sockfd, msg));
776 }
777
778 void
insert_allTransporters(Transporter * t)779 TransporterRegistry::insert_allTransporters(Transporter *t)
780 {
781 TrpId trp_id = t->getTransporterIndex();
782 if (trp_id == 0)
783 {
784 nTransporters++;
785 require(allTransporters[nTransporters] == 0);
786 allTransporters[nTransporters] = t;
787 t->setTransporterIndex(nTransporters);
788 }
789 else
790 {
791 require(allTransporters[trp_id] == 0);
792 allTransporters[trp_id] = t;
793 }
794 }
795
796 void
remove_allTransporters(Transporter * t)797 TransporterRegistry::remove_allTransporters(Transporter *t)
798 {
799 TrpId trp_id = t->getTransporterIndex();
800 if (trp_id == 0)
801 {
802 return;
803 }
804 else if (t == allTransporters[trp_id])
805 {
806 DEBUG_FPRINTF((stderr,
807 "remove trp_id %u for node %u from allTransporters\n",
808 trp_id,
809 t->getRemoteNodeId()));
810 allTransporters[trp_id] = 0;
811 }
812 }
813
814 void
insert_node_transporter(NodeId node_id,Transporter * t)815 TransporterRegistry::insert_node_transporter(NodeId node_id, Transporter *t)
816 {
817 theNodeIdTransporters[node_id] = t;
818 }
819
820 void
lockMultiTransporters()821 TransporterRegistry::lockMultiTransporters()
822 {
823 NdbMutex_Lock(theMultiTransporterMutex);
824 }
825
826 void
unlockMultiTransporters()827 TransporterRegistry::unlockMultiTransporters()
828 {
829 NdbMutex_Unlock(theMultiTransporterMutex);
830 }
831
832 Uint32
get_num_multi_transporters()833 TransporterRegistry::get_num_multi_transporters()
834 {
835 return nMultiTransporters;
836 }
837
838 Multi_Transporter*
get_multi_transporter(Uint32 index)839 TransporterRegistry::get_multi_transporter(Uint32 index)
840 {
841 require(index < nMultiTransporters);
842 return theMultiTransporters[index];
843 }
844
845 bool
configureTransporter(TransporterConfiguration * config)846 TransporterRegistry::configureTransporter(TransporterConfiguration *config)
847 {
848 NodeId remoteNodeId = config->remoteNodeId;
849
850 assert(localNodeId);
851 assert(config->localNodeId == localNodeId);
852
853 if (remoteNodeId > MAX_NODES)
854 return false;
855
856 Transporter* t = theNodeIdTransporters[remoteNodeId];
857 if(t != NULL)
858 {
859 // Transporter already exist, try to reconfigure it
860 require(!t->isMultiTransporter());
861 require(!t->isPartOfMultiTransporter());
862 return t->configure(config);
863 }
864
865 DEBUG("Configuring transporter from " << localNodeId
866 << " to " << remoteNodeId);
867
868 switch (config->type){
869 case tt_TCP_TRANSPORTER:
870 return createTCPTransporter(config);
871 case tt_SHM_TRANSPORTER:
872 return createSHMTransporter(config);
873 default:
874 abort();
875 break;
876 }
877 return false;
878 }
879
880 bool
createMultiTransporter(Uint32 node_id,Uint32 num_trps)881 TransporterRegistry::createMultiTransporter(Uint32 node_id, Uint32 num_trps)
882 {
883 Multi_Transporter *multi_trp = 0;
884 lockMultiTransporters();
885 Transporter *base_trp = theNodeIdTransporters[node_id];
886 require(!base_trp->isMultiTransporter());
887 require(!base_trp->isPartOfMultiTransporter());
888 multi_trp = new Multi_Transporter(*this, base_trp);
889 theMultiTransporters[nMultiTransporters] = multi_trp;
890 nMultiTransporters++;
891 Uint32 nodeId = base_trp->getRemoteNodeId();
892 TransporterType type = theTransporterTypes[nodeId];
893 for (Uint32 i = 0; i < num_trps; i++)
894 {
895 if (type == tt_TCP_TRANSPORTER)
896 {
897 TCP_Transporter *tcp_trp = (TCP_Transporter*)base_trp;
898 TCP_Transporter *new_trp = new TCP_Transporter(*this, tcp_trp);
899 require(new_trp->initTransporter());
900 multi_trp->add_not_used_trp(new_trp);
901 new_trp->set_multi_transporter_instance(i + 1);
902 theTCPTransporters[nTCPTransporters++] = new_trp;
903 }
904 #ifndef WIN32
905 else if (type == tt_SHM_TRANSPORTER)
906 {
907 SHM_Transporter *shm_trp = (SHM_Transporter*)base_trp;
908 SHM_Transporter *new_trp = new SHM_Transporter(*this, shm_trp);
909 require(new_trp->initTransporter());
910 multi_trp->add_not_used_trp(new_trp);
911 new_trp->set_multi_transporter_instance(i + 1);
912 theSHMTransporters[nSHMTransporters++] = new_trp;
913 }
914 #endif
915 else
916 {
917 require(false);
918 }
919 }
920 multi_trp->add_active_trp(base_trp);
921 unlockMultiTransporters();
922 return true;
923 }
924
925 bool
createTCPTransporter(TransporterConfiguration * config)926 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
927
928 TCP_Transporter * t = 0;
929 /* Don't use index 0, special use case for extra transporters */
930 config->transporterIndex = nTransporters + 1;
931 if (config->remoteNodeId == config->localNodeId)
932 {
933 t = new Loopback_Transporter(* this, config);
934 }
935 else
936 {
937 t = new TCP_Transporter(*this, config);
938 }
939
940 if (t == NULL)
941 return false;
942 else if (!t->initTransporter())
943 {
944 delete t;
945 return false;
946 }
947
948 // Put the transporter in the transporter arrays
949 nTransporters++;
950 allTransporters[nTransporters] = t;
951 theTCPTransporters[nTCPTransporters] = t;
952 theNodeIdTransporters[t->getRemoteNodeId()] = t;
953 theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
954 performStates[t->getRemoteNodeId()] = DISCONNECTED;
955 nTCPTransporters++;
956 m_total_max_send_buffer += t->get_max_send_buffer();
957 return true;
958 }
959
960 bool
createSHMTransporter(TransporterConfiguration * config)961 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config)
962 {
963 #ifndef WIN32
964 DBUG_ENTER("TransporterRegistry::createTransporter SHM");
965
966 /* Don't use index 0, special use case for extra transporters */
967 config->transporterIndex = nTransporters + 1;
968
969 SHM_Transporter * t = new SHM_Transporter(*this,
970 config->transporterIndex,
971 config->localHostName,
972 config->remoteHostName,
973 config->s_port,
974 config->isMgmConnection,
975 localNodeId,
976 config->remoteNodeId,
977 config->serverNodeId,
978 config->checksum,
979 config->signalId,
980 config->shm.shmKey,
981 config->shm.shmSize,
982 config->preSendChecksum,
983 config->shm.shmSpintime,
984 config->shm.sendBufferSize);
985 if (t == NULL)
986 return false;
987
988 // Put the transporter in the transporter arrays
989 nTransporters++;
990 allTransporters[nTransporters] = t;
991 theSHMTransporters[nSHMTransporters] = t;
992 theNodeIdTransporters[t->getRemoteNodeId()] = t;
993 theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
994 performStates[t->getRemoteNodeId()] = DISCONNECTED;
995
996 nSHMTransporters++;
997 m_total_max_send_buffer += t->get_max_send_buffer();
998
999 DBUG_RETURN(true);
1000 #else
1001 return false;
1002 #endif
1003 }
1004
1005 /**
1006 * prepareSend() - queue a signal for later asynchronous sending.
1007 *
1008 * A successfull prepareSend() only guarantee that the signal has been
1009 * stored in some send buffers. Normally it will later be sent, but could
1010 * also be discarded if the transporter *later* disconnects.
1011 *
1012 * Signal memory is allocated with the implementation dependent
1013 * ::getWritePtr(). On multithreaded implementations, allocation may
1014 * take place in thread-local buffer pools which is later 'flushed'
1015 * to a global send buffer.
1016 *
1017 * Asynchronous to prepareSend() there may be Transporters
1018 * (dis)connecting which are signaled to the upper layers by calling
1019 * disable_/enable_send_buffers().
1020 *
1021 * The 'sendHandle' interface has the method ::isSendEnabled() which
1022 * provides us with a way to check whether communication with a node
1023 * is possible. Depending on the sendHandle implementation,
1024 * isSendEnabled() may either have a 'synchronized' or 'optimistic'
1025 * implementation:
1026 * - A (thread-) 'synchronized' implementation guarantee that the
1027 * send buffers really are enabled, both thread local and global,
1028 * at the time of send buffer allocation. (May disconnect later though)
1029 * - An 'optimistic' implementation does not really know whether the
1030 * send buffers are (globally) enabled. Send buffers may always be
1031 * allocated and possibly silently discarded later.
1032 * (SEND_DISCONNECTED will never be returned)
1033 *
1034 * The trp_client implementation is 'synchronized', while the mt-/non-mt
1035 * data node implementation is not. Note that a 'SEND_DISCONNECTED'
1036 * and 'SEND_BLOCKED' return has always been handled as an 'OK' on
1037 * the data nodes. So not being able to detect 'SEND_DISCONNECTED'
1038 * should not matter.
1039 *
1040 * Note that sending behaves differently wrt disconnect / reconnect
1041 * synching compared to 'receive'. Receiver side *is* synchroinized with
1042 * the receiver transporter disconnect / reconnect by both requiring the
1043 * 'poll-right'. Thus receiver logic may check Transporter::isConnected()
1044 * directly.
1045 *
1046 * See further comments as part of ::performReceive().
1047 */
1048 template <typename AnySectionArg>
1049 SendStatus
prepareSendTemplate(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,TrpId & trp_id,AnySectionArg section)1050 TransporterRegistry::prepareSendTemplate(
1051 TransporterSendBufferHandle *sendHandle,
1052 const SignalHeader * signalHeader,
1053 Uint8 prio,
1054 const Uint32 * signalData,
1055 NodeId nodeId,
1056 TrpId &trp_id,
1057 AnySectionArg section)
1058 {
1059 Transporter *node_trp = theNodeIdTransporters[nodeId];
1060 if (unlikely(node_trp == NULL))
1061 {
1062 DEBUG("Discarding message to unknown node: " << nodeId);
1063 return SEND_UNKNOWN_NODE;
1064 }
1065 assert(!node_trp->isPartOfMultiTransporter());
1066 Transporter *t;
1067 t = node_trp->get_send_transporter(signalHeader->theReceiversBlockNumber,
1068 signalHeader->theSendersBlockRef);
1069 assert(!t->isMultiTransporter());
1070 trp_id = t->getTransporterIndex();
1071 if (unlikely(trp_id == 0))
1072 {
1073 /**
1074 * Can happen in disconnect situations, node is disconnected, so send
1075 * to it is successful since the node won't be there to receive the
1076 * message.
1077 */
1078 DEBUG("Discarding message due to trp_id = 0");
1079 return SEND_OK;
1080 }
1081 if(
1082 likely((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) ||
1083 (signalHeader->theReceiversBlockNumber == QMGR) ||
1084 (signalHeader->theReceiversBlockNumber == API_CLUSTERMGR))
1085 {
1086 if (likely(sendHandle->isSendEnabled(nodeId)))
1087 {
1088 const Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, section.m_ptr);
1089 if (likely(lenBytes <= MAX_SEND_MESSAGE_BYTESIZE))
1090 {
1091 SendStatus error = SEND_OK;
1092 Uint32 *insertPtr = getWritePtr(sendHandle,
1093 t,
1094 trp_id,
1095 lenBytes,
1096 prio,
1097 &error);
1098 if (likely(insertPtr != nullptr))
1099 {
1100 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, section);
1101 updateWritePtr(sendHandle, t, trp_id, lenBytes, prio);
1102 return SEND_OK;
1103 }
1104 if (unlikely(error == SEND_MESSAGE_TOO_BIG))
1105 {
1106 g_eventLogger->info("Send message too big");
1107 return SEND_MESSAGE_TOO_BIG;
1108 }
1109 set_status_overloaded(nodeId, true);
1110 const int sleepTime = 2;
1111
1112 /**
1113 * @note: on linux/i386 the granularity is 10ms
1114 * so sleepTime = 2 generates a 10 ms sleep.
1115 */
1116 for (int i = 0; i < 100; i++)
1117 {
1118 NdbSleep_MilliSleep(sleepTime);
1119 /* FC : Consider counting sleeps here */
1120 insertPtr = getWritePtr(sendHandle,
1121 t,
1122 trp_id,
1123 lenBytes,
1124 prio,
1125 &error);
1126 if (likely(insertPtr != nullptr))
1127 {
1128 t->m_packer.pack(insertPtr, prio, signalHeader, signalData, section);
1129 updateWritePtr(sendHandle, t, trp_id, lenBytes, prio);
1130 DEBUG_FPRINTF((stderr, "TE_SEND_BUFFER_FULL\n"));
1131 /**
1132 * Send buffer full, but resend works
1133 */
1134 report_error(nodeId, TE_SEND_BUFFER_FULL);
1135 return SEND_OK;
1136 }
1137 if (unlikely(error == SEND_MESSAGE_TOO_BIG))
1138 {
1139 g_eventLogger->info("Send message too big");
1140 return SEND_MESSAGE_TOO_BIG;
1141 }
1142 }
1143
1144 WARNING("Signal to " << nodeId << " lost(buffer)");
1145 DEBUG_FPRINTF((stderr, "TE_SIGNAL_LOST_SEND_BUFFER_FULL\n"));
1146 report_error(nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
1147 return SEND_BUFFER_FULL;
1148 }
1149 else
1150 {
1151 g_eventLogger->info("Send message too big: length %u", lenBytes);
1152 return SEND_MESSAGE_TOO_BIG;
1153 }
1154 }
1155 else
1156 {
1157 #ifdef ERROR_INSERT
1158 if (m_blocked.get(nodeId))
1159 {
1160 /* Looks like it disconnected while blocked. We'll pretend
1161 * not to notice for now
1162 */
1163 WARNING("Signal to " << nodeId << " discarded as node blocked + disconnected");
1164 return SEND_OK;
1165 }
1166 #endif
1167 DEBUG("Signal to " << nodeId << " lost(disconnect) ");
1168 return SEND_DISCONNECTED;
1169 }
1170 }
1171 else
1172 {
1173 DEBUG("Discarding message to block: "
1174 << signalHeader->theReceiversBlockNumber
1175 << " node: " << nodeId);
1176
1177 return SEND_BLOCKED;
1178 }
1179 }
1180
1181
1182 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,TrpId & trp_id,const LinearSectionPtr ptr[3])1183 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1184 const SignalHeader *signalHeader,
1185 Uint8 prio,
1186 const Uint32 *signalData,
1187 NodeId nodeId,
1188 TrpId &trp_id,
1189 const LinearSectionPtr ptr[3])
1190 {
1191 const Packer::LinearSectionArg section(ptr);
1192 return prepareSendTemplate(sendHandle,
1193 signalHeader,
1194 prio,
1195 signalData,
1196 nodeId,
1197 trp_id,
1198 section);
1199 }
1200
1201
1202 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,TrpId & trp_id,class SectionSegmentPool & thePool,const SegmentedSectionPtr ptr[3])1203 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1204 const SignalHeader *signalHeader,
1205 Uint8 prio,
1206 const Uint32 *signalData,
1207 NodeId nodeId,
1208 TrpId &trp_id,
1209 class SectionSegmentPool &thePool,
1210 const SegmentedSectionPtr ptr[3])
1211 {
1212 const Packer::SegmentedSectionArg section(thePool,ptr);
1213 return prepareSendTemplate(sendHandle,
1214 signalHeader,
1215 prio,
1216 signalData,
1217 nodeId,
1218 trp_id,
1219 section);
1220 }
1221
1222
1223 SendStatus
prepareSend(TransporterSendBufferHandle * sendHandle,const SignalHeader * signalHeader,Uint8 prio,const Uint32 * signalData,NodeId nodeId,const GenericSectionPtr ptr[3])1224 TransporterRegistry::prepareSend(TransporterSendBufferHandle *sendHandle,
1225 const SignalHeader *signalHeader,
1226 Uint8 prio,
1227 const Uint32 *signalData,
1228 NodeId nodeId,
1229 const GenericSectionPtr ptr[3])
1230 {
1231 TrpId trp_id = 0;
1232 const Packer::GenericSectionArg section(ptr);
1233 return prepareSendTemplate(sendHandle,
1234 signalHeader,
1235 prio,
1236 signalData,
1237 nodeId,
1238 trp_id,
1239 section);
1240 }
1241
1242 bool
setup_wakeup_socket(TransporterReceiveHandle & recvdata)1243 TransporterRegistry::setup_wakeup_socket(TransporterReceiveHandle& recvdata)
1244 {
1245 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1246
1247 if (m_has_extra_wakeup_socket)
1248 {
1249 return true;
1250 }
1251
1252 assert(!recvdata.m_transporters.get(0));
1253
1254 if (ndb_socketpair(m_extra_wakeup_sockets))
1255 {
1256 perror("socketpair failed!");
1257 return false;
1258 }
1259
1260 if (!TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[0]) ||
1261 !TCP_Transporter::setSocketNonBlocking(m_extra_wakeup_sockets[1]))
1262 {
1263 goto err;
1264 }
1265
1266 #if defined(HAVE_EPOLL_CREATE)
1267 if (recvdata.m_epoll_fd != -1)
1268 {
1269 int sock = m_extra_wakeup_sockets[0].fd;
1270 struct epoll_event event_poll;
1271 memset(&event_poll, 0, sizeof(event_poll));
1272 event_poll.data.u32 = 0;
1273 event_poll.events = EPOLLIN;
1274 int ret_val = epoll_ctl(recvdata.m_epoll_fd, EPOLL_CTL_ADD, sock,
1275 &event_poll);
1276 if (ret_val != 0)
1277 {
1278 int error= errno;
1279 fprintf(stderr, "Failed to add extra sock %u to epoll-set: %u\n",
1280 sock, error);
1281 fflush(stderr);
1282 goto err;
1283 }
1284 }
1285 #endif
1286 m_has_extra_wakeup_socket = true;
1287 recvdata.m_transporters.set(Uint32(0));
1288 return true;
1289
1290 err:
1291 ndb_socket_close(m_extra_wakeup_sockets[0]);
1292 ndb_socket_close(m_extra_wakeup_sockets[1]);
1293 ndb_socket_invalidate(m_extra_wakeup_sockets+0);
1294 ndb_socket_invalidate(m_extra_wakeup_sockets+1);
1295 return false;
1296 }
1297
1298 void
wakeup()1299 TransporterRegistry::wakeup()
1300 {
1301 if (m_has_extra_wakeup_socket)
1302 {
1303 static char c = 37;
1304 ndb_send(m_extra_wakeup_sockets[1], &c, 1, 0);
1305 }
1306 }
1307
1308 Uint32
check_TCP(TransporterReceiveHandle & recvdata,Uint32 timeOutMillis)1309 TransporterRegistry::check_TCP(TransporterReceiveHandle& recvdata,
1310 Uint32 timeOutMillis)
1311 {
1312 Uint32 retVal = 0;
1313 #if defined(HAVE_EPOLL_CREATE)
1314 if (likely(recvdata.m_epoll_fd != -1))
1315 {
1316 int tcpReadSelectReply = 0;
1317 Uint32 num_trps = nTCPTransporters + nSHMTransporters +
1318 (m_has_extra_wakeup_socket ? 1 : 0);
1319
1320 if (num_trps)
1321 {
1322 tcpReadSelectReply = epoll_wait(recvdata.m_epoll_fd,
1323 recvdata.m_epoll_events,
1324 num_trps, timeOutMillis);
1325 retVal = tcpReadSelectReply;
1326 }
1327
1328 int num_socket_events = tcpReadSelectReply;
1329 if (num_socket_events > 0)
1330 {
1331 for (int i = 0; i < num_socket_events; i++)
1332 {
1333 const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
1334 /**
1335 * check that it's assigned to "us"
1336 */
1337 assert(recvdata.m_transporters.get(trpid));
1338
1339 recvdata.m_recv_transporters.set(trpid);
1340 }
1341 }
1342 else if (num_socket_events < 0)
1343 {
1344 assert(errno == EINTR);
1345 }
1346 }
1347 else
1348 #endif
1349 {
1350 retVal = poll_TCP(timeOutMillis, recvdata);
1351 }
1352 return retVal;
1353 }
1354
1355 Uint32
poll_SHM(TransporterReceiveHandle & recvdata,NDB_TICKS start_poll,Uint32 micros_to_poll)1356 TransporterRegistry::poll_SHM(TransporterReceiveHandle& recvdata,
1357 NDB_TICKS start_poll,
1358 Uint32 micros_to_poll)
1359 {
1360 Uint32 res;
1361 Uint64 micros_passed;
1362 do
1363 {
1364 bool any_connected = false;
1365 res = poll_SHM(recvdata, any_connected);
1366 if (res || !any_connected)
1367 {
1368 /**
1369 * If data found or no SHM transporter connected there is no
1370 * reason to continue spinning.
1371 */
1372 break;
1373 }
1374 NDB_TICKS now = NdbTick_getCurrentTicks();
1375 micros_passed =
1376 NdbTick_Elapsed(start_poll, now).microSec();
1377 } while (micros_passed < Uint64(micros_to_poll));
1378 return res;
1379 }
1380
1381 Uint32
poll_SHM(TransporterReceiveHandle & recvdata,bool & any_connected)1382 TransporterRegistry::poll_SHM(TransporterReceiveHandle& recvdata,
1383 bool &any_connected)
1384 {
1385 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1386
1387 Uint32 retVal = 0;
1388 any_connected = false;
1389 #ifndef WIN32
1390 for (Uint32 i = 0; i < recvdata.nSHMTransporters; i++)
1391 {
1392 SHM_Transporter * t = theSHMTransporters[i];
1393 Uint32 node_id = t->getRemoteNodeId();
1394 Uint32 trp_id = t->getTransporterIndex();
1395
1396 if (!recvdata.m_transporters.get(trp_id))
1397 continue;
1398
1399 if (t->isConnected() && is_connected(node_id))
1400 {
1401 any_connected = true;
1402 if (t->hasDataToRead())
1403 {
1404 recvdata.m_has_data_transporters.set(trp_id);
1405 retVal = 1;
1406 }
1407 }
1408 }
1409 #endif
1410 return retVal;
1411 }
1412
1413 Uint32
spin_check_transporters(TransporterReceiveHandle & recvdata)1414 TransporterRegistry::spin_check_transporters(
1415 TransporterReceiveHandle& recvdata)
1416 {
1417 Uint32 res = 0;
1418 #ifndef WIN32
1419 Uint64 micros_passed = 0;
1420 bool any_connected = false;
1421 Uint64 spintime = Uint64(recvdata.m_spintime);
1422
1423 if (spintime == 0)
1424 {
1425 return res;
1426 }
1427 NDB_TICKS start = NdbTick_getCurrentTicks();
1428 do
1429 {
1430 {
1431 res = poll_SHM(recvdata, any_connected);
1432 if (res || !any_connected)
1433 break;
1434 }
1435 if (res || !any_connected)
1436 break;
1437 res = check_TCP(recvdata, 0);
1438 if (res)
1439 break;
1440 NdbSpin();
1441 NDB_TICKS now = NdbTick_getCurrentTicks();
1442 micros_passed =
1443 NdbTick_Elapsed(start, now).microSec();
1444 } while (micros_passed < Uint64(recvdata.m_spintime));
1445 recvdata.m_total_spintime += micros_passed;
1446 #endif
1447 return res;
1448 }
1449
1450 Uint32
pollReceive(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1451 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
1452 TransporterReceiveHandle& recvdata)
1453 {
1454 bool sleep_state_set = false;
1455 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1456
1457 Uint32 retVal = 0;
1458 recvdata.m_recv_transporters.clear();
1459
1460 /**
1461 * If any transporters have left-over data that was not fully executed in
1462 * last loop, don't wait and return 'data available' even if nothing new
1463 */
1464 if (!recvdata.m_has_data_transporters.isclear())
1465 {
1466 timeOutMillis = 0;
1467 retVal = 1;
1468 }
1469 #ifndef WIN32
1470 if (recvdata.nSHMTransporters > 0)
1471 {
1472 /**
1473 * We start by checking shared memory transporters without
1474 * any mutexes or other protection. If we find something to
1475 * read we will set timeout to 0 and check the TCP transporters
1476 * before returning.
1477 */
1478 bool any_connected = false;
1479 Uint32 res = poll_SHM(recvdata, any_connected);
1480 if(res)
1481 {
1482 retVal |= res;
1483 timeOutMillis = 0;
1484 }
1485 else if (timeOutMillis > 0 && any_connected)
1486 {
1487 /**
1488 * We are preparing to wait for socket events. We will start by
1489 * polling for a configurable amount of microseconds before we
1490 * go to sleep. We will check both shared memory transporters and
1491 * TCP transporters in this period. We will check shared memory
1492 * transporter four times and then check TCP transporters in a
1493 * loop.
1494 *
1495 * After this polling period, if we are still waiting for data
1496 * we will prepare to go to sleep by informing the other side
1497 * that we are going to sleep.
1498 *
1499 * To do this we first grab the mutex used by the sender
1500 * to check for sleep/awake state, next we poll the shared
1501 * memory holding this mutex. If this check also returns
1502 * without finding any data we set the state to sleep,
1503 * release the mutex and go to sleep (on an epoll/poll)
1504 * that can be woken up by incoming data or a wakeup byte
1505 * sent to SHM transporter.
1506 */
1507 res = spin_check_transporters(recvdata);
1508 if (res)
1509 {
1510 retVal |= res;
1511 timeOutMillis = 0;
1512 }
1513 else
1514 {
1515 int res = reset_shm_awake_state(recvdata, sleep_state_set);
1516 if (res || !sleep_state_set)
1517 {
1518 /**
1519 * If sleep_state_set is false here it means that the
1520 * all SHM transporters were disconnected. Alternatively
1521 * there was data available on all the connected SHM
1522 * transporters. Read the data from TCP transporters and
1523 * return.
1524 */
1525 retVal |= 1;
1526 timeOutMillis = 0;
1527 }
1528 }
1529 }
1530 }
1531 #endif
1532 retVal |= check_TCP(recvdata, timeOutMillis);
1533 #ifndef WIN32
1534 if (recvdata.nSHMTransporters > 0)
1535 {
1536 /**
1537 * If any SHM transporter was put to sleep above we will
1538 * set all connected SHM transporters to awake now.
1539 */
1540 if (sleep_state_set)
1541 {
1542 set_shm_awake_state(recvdata);
1543 }
1544 bool any_connected = false;
1545 int res = poll_SHM(recvdata, any_connected);
1546 retVal |= res;
1547 }
1548 #endif
1549 return retVal;
1550 }
1551
1552 /**
1553 * Every time a SHM transporter is sending data and wants the other side
1554 * to wake up to handle the data, it follows this procedure.
1555 * 1) Write the data in shared memory
1556 * 2) Acquire the mutex protecting the awake flag on the receive side.
1557 * 3) Read flag
1558 * 4) Release mutex
1559 * 5.1) If flag says that receiver is awake we're done
1560 * 5.2) If flag says that receiver is asleep we will send a byte on the
1561 * transporter socket for the SHM transporter to wake up the other
1562 * side.
1563 *
1564 * The reset_shm_awake_state is called right before we are going to go
1565 * to sleep. To ensure that we don't miss any signals from the other
1566 * side we will first check if there is data available on shared memory.
1567 * We first grab the mutex before checking this. If no data is available
1568 * we can proceed to go to sleep after setting the flag to indicate that
1569 * we are asleep. The above procedure used when sending means that we
1570 * know that the sender will always know the correct state. The only
1571 * error in this is that the sender might think that we are asleep when
1572 * we actually is still on our way to go to sleep. In this case no harm
1573 * has been done since the only thing that have happened is that one byte
1574 * is sent on the SHM socket that wasn't absolutely needed.
1575 */
1576 int
reset_shm_awake_state(TransporterReceiveHandle & recvdata,bool & sleep_state_set)1577 TransporterRegistry::reset_shm_awake_state(TransporterReceiveHandle& recvdata,
1578 bool& sleep_state_set)
1579 {
1580 int res = 0;
1581 #ifndef WIN32
1582 for (Uint32 i = 0; i < recvdata.nSHMTransporters; i++)
1583 {
1584 SHM_Transporter * t = theSHMTransporters[i];
1585 Uint32 node_id = t->getRemoteNodeId();
1586 Uint32 trp_id = t->getTransporterIndex();
1587
1588 if (!recvdata.m_transporters.get(trp_id))
1589 continue;
1590
1591 if (t->isConnected())
1592 {
1593 t->lock_mutex();
1594 if (is_connected(node_id))
1595 {
1596 if (t->hasDataToRead())
1597 {
1598 recvdata.m_has_data_transporters.set(trp_id);
1599 res = 1;
1600 }
1601 else
1602 {
1603 sleep_state_set = true;
1604 t->set_awake_state(0);
1605 }
1606 }
1607 t->unlock_mutex();
1608 }
1609 }
1610 #endif
1611 return res;
1612 }
1613
1614 /**
1615 * We have been sleeping for a while, before proceeding we need to set
1616 * the flag to awake in the shared memory. This will flag to all other
1617 * nodes using shared memory to communicate that we're awake and don't
1618 * need any socket communication to wake up.
1619 */
1620 void
set_shm_awake_state(TransporterReceiveHandle & recvdata)1621 TransporterRegistry::set_shm_awake_state(TransporterReceiveHandle& recvdata)
1622 {
1623 #ifndef WIN32
1624 for (Uint32 i = 0; i < recvdata.nSHMTransporters; i++)
1625 {
1626 SHM_Transporter * t = theSHMTransporters[i];
1627 Uint32 id = t->getTransporterIndex();
1628
1629 if (!recvdata.m_transporters.get(id))
1630 continue;
1631 if (t->isConnected())
1632 {
1633 t->lock_mutex();
1634 t->set_awake_state(1);
1635 t->unlock_mutex();
1636 }
1637 }
1638 #endif
1639 }
1640
1641 /**
1642 * We do not want to hold any transporter locks during select(), so there
1643 * is no protection against a disconnect closing the socket during this call.
1644 *
1645 * That does not matter, at most we will get a spurious wakeup on the wrong
1646 * socket, which will be handled correctly in performReceive() (which _is_
1647 * protected by transporter locks on upper layer).
1648 */
1649 Uint32
poll_TCP(Uint32 timeOutMillis,TransporterReceiveHandle & recvdata)1650 TransporterRegistry::poll_TCP(Uint32 timeOutMillis,
1651 TransporterReceiveHandle& recvdata)
1652 {
1653 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1654
1655 recvdata.m_socket_poller.clear();
1656
1657 const bool extra_socket = m_has_extra_wakeup_socket;
1658 if (extra_socket && recvdata.m_transporters.get(0))
1659 {
1660 const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
1661 assert(&recvdata == receiveHandle); // not used by ndbmtd...
1662
1663 // Poll the wakup-socket for read
1664 recvdata.m_socket_poller.add(socket, true, false, false);
1665 }
1666
1667 Uint16 idx[MAX_NTRANSPORTERS];
1668 Uint32 i = 0;
1669 for (; i < recvdata.nTCPTransporters; i++)
1670 {
1671 TCP_Transporter * t = theTCPTransporters[i];
1672 const NDB_SOCKET_TYPE socket = t->getSocket();
1673 Uint32 node_id = t->getRemoteNodeId();
1674 Uint32 trp_id = t->getTransporterIndex();
1675
1676 idx[i] = maxTransporters + 1;
1677 if (!recvdata.m_transporters.get(trp_id))
1678 continue;
1679
1680 if (is_connected(node_id) && t->isConnected() && ndb_socket_valid(socket))
1681 {
1682 idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
1683 }
1684 }
1685
1686 #ifndef WIN32
1687 for (Uint32 j = 0; j < recvdata.nSHMTransporters; j++)
1688 {
1689 /**
1690 * We need to listen to socket also for shared memory transporters.
1691 * These sockets are used as a wakeup mechanism, so we're not sending
1692 * any data in it. But we need a socket to be able to wake up things
1693 * when the receiver is not awake and we've only sent data on shared
1694 * memory transporter.
1695 */
1696 SHM_Transporter * t = theSHMTransporters[j];
1697 const NDB_SOCKET_TYPE socket = t->getSocket();
1698 Uint32 node_id = t->getRemoteNodeId();
1699 Uint32 trp_id = t->getTransporterIndex();
1700 idx[i] = maxTransporters + 1;
1701 if (!recvdata.m_transporters.get(trp_id))
1702 {
1703 i++;
1704 continue;
1705 }
1706 if (is_connected(node_id) && t->isConnected() && ndb_socket_valid(socket))
1707 {
1708 idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
1709 }
1710 i++;
1711 }
1712 #endif
1713 int tcpReadSelectReply = recvdata.m_socket_poller.poll_unsafe(timeOutMillis);
1714
1715 if (tcpReadSelectReply > 0)
1716 {
1717 if (extra_socket)
1718 {
1719 if (recvdata.m_socket_poller.has_read(0))
1720 {
1721 assert(recvdata.m_transporters.get(0));
1722 recvdata.m_recv_transporters.set((Uint32)0);
1723 }
1724 }
1725 i = 0;
1726 for (; i < recvdata.nTCPTransporters; i++)
1727 {
1728 TCP_Transporter * t = theTCPTransporters[i];
1729 if (idx[i] != maxTransporters + 1)
1730 {
1731 Uint32 trp_id = t->getTransporterIndex();
1732 if (recvdata.m_socket_poller.has_read(idx[i]))
1733 {
1734 recvdata.m_recv_transporters.set(trp_id);
1735 }
1736 }
1737 }
1738 #ifndef WIN32
1739 for (Uint32 j = 0; j < recvdata.nSHMTransporters; j++)
1740 {
1741 /**
1742 * If a shared memory transporter have data on its socket we
1743 * will get it now, the data is only an indication for us to
1744 * wake up, so we're not interested in the data as such.
1745 * But to integrate it with epoll handling we will read it
1746 * in performReceive still.
1747 */
1748 SHM_Transporter * t = theSHMTransporters[j];
1749 if (idx[i] != maxTransporters + 1)
1750 {
1751 Uint32 trp_id = t->getTransporterIndex();
1752 if (recvdata.m_socket_poller.has_read(idx[i]))
1753 recvdata.m_recv_transporters.set(trp_id);
1754 }
1755 i++;
1756 }
1757 #endif
1758 }
1759
1760 return tcpReadSelectReply;
1761 }
1762
1763 void
set_recv_thread_idx(Transporter * t,Uint32 recv_thread_idx)1764 TransporterRegistry::set_recv_thread_idx(Transporter *t,
1765 Uint32 recv_thread_idx)
1766 {
1767 t->set_recv_thread_idx(recv_thread_idx);
1768 }
1769
1770 /**
1771 * Receive from the set of transporters in the bitmask
1772 * 'recvdata.m_transporters'. These has been polled by
1773 * ::pollReceive() which recorded transporters with
1774 * available data in the subset 'recvdata.m_recv_transporters'.
1775 *
1776 * In multi-threaded datanodes, there might be multiple
1777 * receiver threads, each serving a disjunct set of
1778 * 'm_transporters'.
1779 *
1780 * Single-threaded datanodes does all ::performReceive
1781 * from the scheduler main-loop, and thus it will handle
1782 * all 'm_transporters'.
1783 *
1784 * Clients has to aquire a 'poll right' (see TransporterFacade)
1785 * which gives it the right to temporarily acts as a receive
1786 * thread with the right to poll *all* transporters.
1787 *
1788 * Reception takes place on a set of transporters knowing to be in a
1789 * 'CONNECTED' state. Transporters can (asynch) become 'DISCONNECTING'
1790 * while we performReceive(). There is *no* mutex lock protecting
1791 * 'disconnecting' from being started while we are in the receive-loop!
1792 * However, the contents of the buffers++ should still be in a
1793 * consistent state, such that the current receive can complete
1794 * without failures.
1795 *
1796 * With regular intervals we have to ::update_connections()
1797 * in order to bring DISCONNECTING transporters into
1798 * a DISCONNECTED state. At earlies at this point, resources
1799 * used by performReceive() may be reset or released.
1800 * A transporter should be brought to the DISCONNECTED state
1801 * before it can reconnect again. (Note: There is a break of
1802 * this rule in ::do_connect, see own note here)
1803 *
1804 * To not interfere with ::poll- or ::performReceive(),
1805 * ::update_connections() has to be synched with with these
1806 * methods. Either by being run within the same
1807 * receive thread (dataNodes), or protected by the 'poll rights'.
1808 *
1809 * In case we were unable to receive due to job buffers being full.
1810 * Returns 0 when receive succeeded from all Transporters having data,
1811 * else 1.
1812 */
1813 Uint32
performReceive(TransporterReceiveHandle & recvdata,Uint32 recv_thread_idx)1814 TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata,
1815 Uint32 recv_thread_idx)
1816 {
1817 (void)recv_thread_idx;
1818 TransporterReceiveWatchdog guard(recvdata);
1819 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
1820 bool stopReceiving = false;
1821
1822 if (recvdata.m_recv_transporters.get(0))
1823 {
1824 assert(recvdata.m_transporters.get(0));
1825 assert(&recvdata == receiveHandle); // not used by ndbmtd
1826 recvdata.m_recv_transporters.clear(Uint32(0));
1827 consume_extra_sockets();
1828 }
1829
1830 #ifdef ERROR_INSERT
1831 if (!m_blocked_trp.isclear())
1832 {
1833 /* Exclude receive from blocked sockets. */
1834 recvdata.m_recv_transporters.bitANDC(m_blocked_trp);
1835
1836 if (recvdata.m_recv_transporters.isclear() &&
1837 recvdata.m_has_data_transporters.isclear())
1838 {
1839 /* poll sees data, but we want to ignore for now
1840 * sleep a little to avoid busy loop
1841 */
1842 NdbSleep_MilliSleep(1);
1843 }
1844 }
1845 #endif
1846
1847 /**
1848 * m_recv_transporters set indicates that there might be data
1849 * available on the socket used by the transporter. The
1850 * doReceive call will read the socket. For TCP transporters
1851 * the doReceive call will return an indication if there is
1852 * data to receive on socket. This will set m_has_data_transporters.
1853 * For SHM transporter the socket is only used to send wakeup
1854 * bytes. The m_has_data_transporters bitmap was set already in
1855 * pollReceive for SHM transporters.
1856 */
1857 for(Uint32 trp_id = recvdata.m_recv_transporters.find_first();
1858 trp_id != BitmaskImpl::NotFound;
1859 trp_id = recvdata.m_recv_transporters.find_next(trp_id + 1))
1860 {
1861 Transporter *transp = allTransporters[trp_id];
1862 NodeId node_id = transp->getRemoteNodeId();
1863 if (transp->getTransporterType() == tt_TCP_TRANSPORTER)
1864 {
1865 TCP_Transporter * t = (TCP_Transporter*)transp;
1866 assert(recvdata.m_transporters.get(trp_id));
1867 assert(recv_thread_idx == transp->get_recv_thread_idx());
1868
1869 /**
1870 * First check connection 'is CONNECTED.
1871 * A connection can only be set into, or taken out of, is_connected'
1872 * state by ::update_connections(). See comment there about
1873 * synchronication between ::update_connections() and
1874 * performReceive()
1875 *
1876 * Transporter::isConnected() state my change asynch.
1877 * A mismatch between the TransporterRegistry::is_connected(),
1878 * and Transporter::isConnected() state is possible, and indicate
1879 * that a change is underway. (Completed by update_connections())
1880 */
1881 if (is_connected(node_id))
1882 {
1883 if (t->isConnected())
1884 {
1885 int nBytes = t->doReceive(recvdata);
1886 if (nBytes > 0)
1887 {
1888 recvdata.transporter_recv_from(node_id);
1889 recvdata.m_has_data_transporters.set(trp_id);
1890 }
1891 }
1892 }
1893 }
1894 else
1895 {
1896 #ifndef WIN32
1897 require(transp->getTransporterType() == tt_SHM_TRANSPORTER);
1898 SHM_Transporter * t = (SHM_Transporter*)transp;
1899 assert(recvdata.m_transporters.get(trp_id));
1900 if (is_connected(node_id) && t->isConnected())
1901 {
1902 t->doReceive();
1903 /**
1904 * Ignore any data we read, the data wasn't collected by the
1905 * shared memory transporter, it was simply read and thrown
1906 * away, it is only a wakeup call to send data over the socket
1907 * for shared memory transporters.
1908 */
1909 }
1910 #else
1911 require(false);
1912 #endif
1913 }
1914 }
1915 recvdata.m_recv_transporters.clear();
1916
1917 /**
1918 * Unpack data either received above or pending from prev rounds.
1919 * For the Shared memory transporter m_has_data_transporters can
1920 * be set in pollReceive as well.
1921 *
1922 * TCP Transporter
1923 * ---------------
1924 * Data to be processed at this stage is in the Transporter
1925 * receivebuffer. The data *is received*, and will stay in
1926 * the receiveBuffer even if a disconnect is started during
1927 * unpack.
1928 * When ::update_connection() finaly completes the disconnect,
1929 * (synced with ::performReceive()), 'm_has_data_transporters'
1930 * will be cleared, which will terminate further unpacking.
1931 *
1932 * NOTE:
1933 * Without reading inconsistent date, we could have removed
1934 * the 'connected' checks below, However, there is a requirement
1935 * in the CLOSE_COMREQ/CONF protocol between TRPMAN and QMGR
1936 * that no signals arrives from disconnecting nodes after a
1937 * CLOSE_COMCONF was sent. For the moment the risk of taking
1938 * advantage of this small optimization is not worth the risk.
1939 */
1940 Uint32 trp_id = recvdata.m_last_trp_id;
1941 while ((trp_id = recvdata.m_has_data_transporters.find_next(trp_id + 1)) !=
1942 BitmaskImpl::NotFound)
1943 {
1944 bool hasdata = false;
1945 Transporter * t = (Transporter*)allTransporters[trp_id];
1946 NodeId node_id = t->getRemoteNodeId();
1947
1948 assert(recvdata.m_transporters.get(trp_id));
1949
1950 if (is_connected(node_id))
1951 {
1952 if (t->isConnected())
1953 {
1954 if (unlikely(recvdata.checkJobBuffer()))
1955 {
1956 return 1; // Full, can't unpack more
1957 }
1958 if (unlikely(recvdata.m_handled_transporters.get(trp_id)))
1959 continue; // Skip now to avoid starvation
1960 if (t->getTransporterType() == tt_TCP_TRANSPORTER)
1961 {
1962 TCP_Transporter *t_tcp = (TCP_Transporter*)t;
1963 Uint32 * ptr;
1964 Uint32 sz = t_tcp->getReceiveData(&ptr);
1965 Uint32 szUsed = unpack(recvdata,
1966 ptr,
1967 sz,
1968 node_id,
1969 ioStates[node_id],
1970 stopReceiving);
1971 if (likely(szUsed))
1972 {
1973 assert(recv_thread_idx == t_tcp->get_recv_thread_idx());
1974 t_tcp->updateReceiveDataPtr(szUsed);
1975 hasdata = t_tcp->hasReceiveData();
1976 }
1977 }
1978 else
1979 {
1980 #ifndef WIN32
1981 require(t->getTransporterType() == tt_SHM_TRANSPORTER);
1982 SHM_Transporter *t_shm = (SHM_Transporter*)t;
1983 Uint32 * readPtr, * eodPtr, * endPtr;
1984 t_shm->getReceivePtr(&readPtr, &eodPtr, &endPtr);
1985 recvdata.transporter_recv_from(node_id);
1986 Uint32 *newPtr = unpack(recvdata,
1987 readPtr,
1988 eodPtr,
1989 endPtr,
1990 node_id,
1991 ioStates[node_id],
1992 stopReceiving);
1993 t_shm->updateReceivePtr(recvdata, newPtr);
1994 /**
1995 * Set hasdata dependent on if data is still available in
1996 * transporter to ensure we follow rules about setting
1997 * m_has_data_transporters and m_handled_transporters
1998 * when returning from performReceive.
1999 */
2000 hasdata = t_shm->hasDataToRead();
2001 #else
2002 require(false);
2003 #endif
2004 }
2005 // else, we didn't unpack anything:
2006 // Avail ReceiveData to short to be useful, need to
2007 // receive more before we can resume this transporter.
2008 }
2009 }
2010 // If transporter still have data, make sure that it's remember to next time
2011 recvdata.m_has_data_transporters.set(trp_id, hasdata);
2012 recvdata.m_handled_transporters.set(trp_id, hasdata);
2013
2014 if (unlikely(stopReceiving))
2015 {
2016 recvdata.m_last_trp_id = trp_id; //Resume from node after 'last_node'
2017 return 1;
2018 }
2019 }
2020 recvdata.m_handled_transporters.clear();
2021 recvdata.m_last_trp_id = 0;
2022 return 0;
2023 }
2024
2025 void
consume_extra_sockets()2026 TransporterRegistry::consume_extra_sockets()
2027 {
2028 char buf[4096];
2029 ssize_t ret;
2030 int err;
2031 NDB_SOCKET_TYPE sock = m_extra_wakeup_sockets[0];
2032 do
2033 {
2034 ret = ndb_recv(sock, buf, sizeof(buf), 0);
2035 err = ndb_socket_errno();
2036 } while (ret == sizeof(buf) || (ret == -1 && err == EINTR));
2037
2038 /* Notify upper layer of explicit wakeup */
2039 callbackObj->reportWakeup();
2040 }
2041
2042 /**
2043 * performSend() - Call physical transporters to 'doSend'
2044 * of previously prepareSend() signals.
2045 *
2046 * The doSend() implementations will call
2047 * TransporterCallback::get_bytes_to_send_iovec() to fetch
2048 * any available data from the send buffer.
2049 *
2050 * *This* ^^ is the synch point where we under mutex protection
2051 * may check for specific nodes being disconnected/disabled.
2052 * For disabled nodes we may drain the send buffers instead of
2053 * returning anything from get_bytes_to_send_iovec().
2054 * Also see comments for prepareSend() above.
2055 *
2056 * Note that since disconnection may happen asynch from other
2057 * threads, we can not reliably check the 'connected' state
2058 * before doSend(). Instead we must require that the
2059 * TransporterCallback implementation provide necessary locking
2060 * of get_bytes_to_send() vs enable/disable of send buffers.
2061 *
2062 * Returns:
2063 * true if anything still remains to be sent.
2064 * Will require another ::performSend()
2065 *
2066 * false: if nothing more remains, either due to
2067 * the send buffers being empty, we succeeded
2068 * sending everything, or we found the node to be
2069 * disconnected and thus discarded the contents.
2070 */
2071 bool
performSendNode(NodeId nodeId,bool need_wakeup)2072 TransporterRegistry::performSendNode(NodeId nodeId, bool need_wakeup)
2073 {
2074 Transporter *t = get_node_transporter(nodeId);
2075 if (t != NULL)
2076 {
2077 #ifdef ERROR_INSERT
2078 if (m_sendBlocked.get(nodeId))
2079 {
2080 return true;
2081 }
2082 #endif
2083 return t->doSend(need_wakeup);
2084 }
2085 return false;
2086 }
2087
2088 bool
performSend(TrpId trp_id,bool need_wakeup)2089 TransporterRegistry::performSend(TrpId trp_id, bool need_wakeup)
2090 {
2091 Transporter *t = get_transporter(trp_id);
2092 if (t != NULL)
2093 {
2094 #ifdef ERROR_INSERT
2095 NodeId nodeId = t->getRemoteNodeId();
2096 if (m_sendBlocked.get(nodeId))
2097 {
2098 return true;
2099 }
2100 #endif
2101 return t->doSend(need_wakeup);
2102 }
2103 return false;
2104 }
2105
2106 void
performSend()2107 TransporterRegistry::performSend()
2108 {
2109 Uint32 i;
2110 sendCounter = 1;
2111
2112 lockMultiTransporters();
2113 for (i = m_transp_count; i < (nTransporters + 1); i++)
2114 {
2115 Transporter *t = allTransporters[i];
2116 if (t != NULL
2117 #ifdef ERROR_INSERT
2118 && !m_sendBlocked.get(t->getRemoteNodeId())
2119 #endif
2120 )
2121 {
2122 t->doSend();
2123 }
2124 }
2125 for (i = 1; i < m_transp_count && i < (nTransporters + 1); i++)
2126 {
2127 Transporter *t = allTransporters[i];
2128 if (t != NULL
2129 #ifdef ERROR_INSERT
2130 && !m_sendBlocked.get(t->getRemoteNodeId())
2131 #endif
2132 )
2133 {
2134 t->doSend();
2135 }
2136 }
2137 m_transp_count++;
2138 if (m_transp_count == (nTransporters + 1))
2139 m_transp_count = 1;
2140 unlockMultiTransporters();
2141 }
2142
2143 #ifdef DEBUG_TRANSPORTER
2144 void
printState()2145 TransporterRegistry::printState()
2146 {
2147 ndbout << "-- TransporterRegistry -- " << endl << endl
2148 << "Transporters = " << nTransporters << endl;
2149 for(Uint32 i = 0; i < maxTransporters; i++)
2150 {
2151 if (allTransporters[i] != NULL)
2152 {
2153 const NodeId remoteNodeId = allTransporters[i]->getRemoteNodeId();
2154 ndbout << "Transporter: " << remoteNodeId
2155 << " PerformState: " << performStates[remoteNodeId]
2156 << " IOState: " << ioStates[remoteNodeId] << endl;
2157 }
2158 }
2159 }
2160 #else
2161 void
printState()2162 TransporterRegistry::printState()
2163 {
2164 }
2165 #endif
2166
2167 #ifdef ERROR_INSERT
2168 bool
isBlocked(NodeId nodeId)2169 TransporterRegistry::isBlocked(NodeId nodeId)
2170 {
2171 return m_blocked.get(nodeId);
2172 }
2173
2174 void
blockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)2175 TransporterRegistry::blockReceive(TransporterReceiveHandle& recvdata,
2176 NodeId nodeId)
2177 {
2178 TrpId ids[MAX_NODE_GROUP_TRANSPORTERS];
2179 Uint32 num_ids;
2180 lockMultiTransporters();
2181 get_trps_for_node(nodeId, ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2182
2183 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2184 for (Uint32 i = 0; i < num_ids; i++)
2185 {
2186 Uint32 trp_id = ids[i];
2187 if (recvdata.m_transporters.get(trp_id))
2188 m_blocked_trp.set(trp_id);
2189 }
2190 /* Check that node is not already blocked?
2191 * Stop pulling from its socket (but track received data etc)
2192 */
2193 /* Shouldn't already be blocked with data */
2194 bool last_call = true;
2195 for (Uint32 i = 0; i < num_ids; i++)
2196 {
2197 Uint32 trp_id = ids[i];
2198 if (!m_blocked_trp.get(trp_id))
2199 last_call = false;
2200 }
2201 if (last_call)
2202 m_blocked.set(nodeId);
2203 unlockMultiTransporters();
2204 }
2205
2206 void
unblockReceive(TransporterReceiveHandle & recvdata,NodeId nodeId)2207 TransporterRegistry::unblockReceive(TransporterReceiveHandle& recvdata,
2208 NodeId nodeId)
2209 {
2210 TrpId ids[MAX_NODE_GROUP_TRANSPORTERS];
2211 Uint32 num_ids;
2212 lockMultiTransporters();
2213 get_trps_for_node(nodeId, ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2214
2215 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2216 for (Uint32 i = 0; i < num_ids; i++)
2217 {
2218 Uint32 trp_id = ids[i];
2219 if (recvdata.m_transporters.get(trp_id))
2220 {
2221 assert(m_blocked_trp.get(trp_id));
2222 assert(!recvdata.m_has_data_transporters.get(trp_id));
2223 m_blocked_trp.clear(trp_id);
2224 }
2225 }
2226 bool last_call = true;
2227 for (Uint32 i = 0; i < num_ids; i++)
2228 {
2229 Uint32 trp_id = ids[i];
2230 if (m_blocked_trp.get(trp_id))
2231 {
2232 last_call = false;
2233 }
2234 }
2235
2236 /* Check that node is blocked?
2237 * Resume pulling from its socket
2238 * Ensure in-flight data is processed if there was some
2239 */
2240 if (last_call)
2241 m_blocked.clear(nodeId);
2242 unlockMultiTransporters();
2243 if (m_blocked_disconnected.get(nodeId))
2244 {
2245 /* Process disconnect notification/handling now */
2246 report_disconnect(recvdata, nodeId, m_disconnect_errors[nodeId]);
2247 }
2248 if (last_call)
2249 {
2250 lockMultiTransporters();
2251 m_blocked_disconnected.clear(nodeId);
2252 unlockMultiTransporters();
2253 }
2254 }
2255
2256 bool
isSendBlocked(NodeId nodeId) const2257 TransporterRegistry::isSendBlocked(NodeId nodeId) const
2258 {
2259 return m_sendBlocked.get(nodeId);
2260 }
2261
2262 void
blockSend(TransporterReceiveHandle & recvdata,NodeId nodeId)2263 TransporterRegistry::blockSend(TransporterReceiveHandle& recvdata,
2264 NodeId nodeId)
2265 {
2266 #ifdef VM_TRACE
2267 TrpId trp_ids[MAX_NODE_GROUP_TRANSPORTERS];
2268 Uint32 num_ids;
2269 lockMultiTransporters();
2270 get_trps_for_node(nodeId, trp_ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2271 unlockMultiTransporters();
2272 #endif
2273 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2274 m_sendBlocked.set(nodeId);
2275 }
2276
2277 void
unblockSend(TransporterReceiveHandle & recvdata,NodeId nodeId)2278 TransporterRegistry::unblockSend(TransporterReceiveHandle& recvdata,
2279 NodeId nodeId)
2280 {
2281 #ifdef VM_TRACE
2282 TrpId trp_ids[MAX_NODE_GROUP_TRANSPORTERS];
2283 Uint32 num_ids;
2284 lockMultiTransporters();
2285 get_trps_for_node(nodeId, trp_ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2286 unlockMultiTransporters();
2287 #endif
2288 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2289 m_sendBlocked.clear(nodeId);
2290 }
2291
2292 #endif
2293
2294 #ifdef ERROR_INSERT
2295 Uint32
getMixologyLevel() const2296 TransporterRegistry::getMixologyLevel() const
2297 {
2298 return m_mixology_level;
2299 }
2300
2301 extern Uint32 MAX_RECEIVED_SIGNALS; /* Packer.cpp */
2302
2303 #define MIXOLOGY_MIX_INCOMING_SIGNALS 4
2304
2305 void
setMixologyLevel(Uint32 l)2306 TransporterRegistry::setMixologyLevel(Uint32 l)
2307 {
2308 m_mixology_level = l;
2309
2310 if (m_mixology_level & MIXOLOGY_MIX_INCOMING_SIGNALS)
2311 {
2312 ndbout_c("MIXOLOGY_MIX_INCOMING_SIGNALS on");
2313 /* Max one signal per transporter */
2314 MAX_RECEIVED_SIGNALS = 1;
2315 }
2316
2317 /* TODO : Add mixing of Send from NdbApi / MGMD */
2318 }
2319 #endif
2320
2321 IOState
ioState(NodeId nodeId) const2322 TransporterRegistry::ioState(NodeId nodeId) const {
2323 return ioStates[nodeId];
2324 }
2325
2326 void
setIOState(NodeId nodeId,IOState state)2327 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
2328 if (ioStates[nodeId] == state)
2329 return;
2330
2331 DEBUG("TransporterRegistry::setIOState("
2332 << nodeId << ", " << state << ")");
2333
2334 ioStates[nodeId] = state;
2335 }
2336
2337 extern "C" void *
run_start_clients_C(void * me)2338 run_start_clients_C(void * me)
2339 {
2340 ((TransporterRegistry*) me)->start_clients_thread();
2341 return 0;
2342 }
2343
2344 /**
2345 * This method is used to initiate connection, called from the TRPMAN block.
2346 *
2347 * This works asynchronously, no actions are taken directly in the calling
2348 * thread.
2349 */
2350 void
do_connect(NodeId node_id)2351 TransporterRegistry::do_connect(NodeId node_id)
2352 {
2353 PerformState &curr_state = performStates[node_id];
2354 switch(curr_state){
2355 case DISCONNECTED:
2356 break;
2357 case CONNECTED:
2358 return;
2359 case CONNECTING:
2360 return;
2361 case DISCONNECTING:
2362 /**
2363 * NOTE (Need future work)
2364 * Going directly from DISCONNECTING to CONNECTING creates
2365 * a possible race with ::update_connections(): It will
2366 * see either of the *ING states, and bring the connection
2367 * into CONNECTED or *DISCONNECTED* state. Furthermore, the
2368 * state may be overwritten to CONNECTING by this method.
2369 * We should probably have waited for DISCONNECTED state,
2370 * before allowing reCONNECTING ....
2371 */
2372 assert(false);
2373 break;
2374 }
2375 DEBUG_FPRINTF((stderr, "(%u)REG:do_connect(%u)\n", localNodeId, node_id));
2376 DBUG_ENTER("TransporterRegistry::do_connect");
2377 DBUG_PRINT("info",("performStates[%d]=CONNECTING",node_id));
2378
2379 Transporter * t = theNodeIdTransporters[node_id];
2380 if (t != NULL)
2381 {
2382 if (t->isMultiTransporter())
2383 {
2384 Multi_Transporter *multi_trp = (Multi_Transporter*)t;
2385 require(multi_trp->get_num_active_transporters() == 1);
2386 t = multi_trp->get_active_transporter(0);
2387 }
2388 require(!t->isPartOfMultiTransporter());
2389 require(!t->isMultiTransporter());
2390 DEBUG_FPRINTF((stderr, "(%u)REG:resetBuffers(%u)\n",
2391 localNodeId, node_id));
2392 t->resetBuffers();
2393 }
2394
2395 DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = CONNECTING\n",
2396 localNodeId, node_id));
2397 curr_state= CONNECTING;
2398 DBUG_VOID_RETURN;
2399 }
2400
2401 /**
2402 * This method is used to initiate disconnect from TRPMAN. It is also called
2403 * from the TCP/SHM transporter in case of an I/O error on the socket.
2404 *
2405 * This works asynchronously, similar to do_connect().
2406 */
2407 bool
do_disconnect(NodeId node_id,int errnum,bool send_source)2408 TransporterRegistry::do_disconnect(NodeId node_id,
2409 int errnum,
2410 bool send_source)
2411 {
2412 DEBUG_FPRINTF((stderr, "(%u)REG:do_disconnect(%u, %d)\n",
2413 localNodeId, node_id, errnum));
2414 PerformState &curr_state = performStates[node_id];
2415 switch(curr_state){
2416 case DISCONNECTED:
2417 {
2418 return true;
2419 }
2420 case CONNECTED:
2421 {
2422 break;
2423 }
2424 case CONNECTING:
2425 /**
2426 * This is a correct transition. But it should only occur for nodes
2427 * that lack resources, e.g. lack of shared memory resources to
2428 * setup the transporter. Therefore we assert here to get a simple
2429 * handling of test failures such that we can fix the test config.
2430 */
2431 //DBUG_ASSERT(false);
2432 break;
2433 case DISCONNECTING:
2434 {
2435 return true;
2436 }
2437 }
2438 if (errnum == ENOENT)
2439 {
2440 m_disconnect_enomem_error[node_id]++;
2441 if (m_disconnect_enomem_error[node_id] < 10)
2442 {
2443 NdbSleep_MilliSleep(40);
2444 g_eventLogger->info("Socket error %d on nodeId: %u in state: %u",
2445 errnum, node_id, (Uint32)curr_state);
2446 return false;
2447 }
2448 }
2449 if (errnum == 0)
2450 {
2451 g_eventLogger->info("Node %u disconnected in state: %d",
2452 node_id, (int)curr_state);
2453 }
2454 else
2455 {
2456 g_eventLogger->info("Node %u disconnected in %s with errnum: %d"
2457 " in state: %d",
2458 node_id,
2459 send_source ? "send" : "recv",
2460 errnum,
2461 (int)curr_state);
2462 }
2463 DBUG_ENTER("TransporterRegistry::do_disconnect");
2464 DBUG_PRINT("info",("performStates[%d]=DISCONNECTING",node_id));
2465 curr_state= DISCONNECTING;
2466 DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTING\n",
2467 localNodeId, node_id));
2468 m_disconnect_errnum[node_id] = errnum;
2469 DBUG_RETURN(false);
2470 }
2471
2472 /**
2473 * report_connect() / report_disconnect()
2474 *
2475 * Connect or disconnect the 'TransporterReceiveHandle' and
2476 * enable/disable the send buffers.
2477 *
2478 * To prevent races wrt poll/receive of data, these methods must
2479 * either be called from the same (receive-)thread as performReceive(),
2480 * or by the (API) client holding the poll-right.
2481 *
2482 * The send buffers needs similar protection against concurent
2483 * enable/disable of the same send buffers. Thus the sender
2484 * side is also handled here.
2485 */
2486 void
report_connect(TransporterReceiveHandle & recvdata,NodeId node_id)2487 TransporterRegistry::report_connect(TransporterReceiveHandle& recvdata,
2488 NodeId node_id)
2489 {
2490 Transporter *t = theNodeIdTransporters[node_id];
2491 if (t->isMultiTransporter())
2492 {
2493 Multi_Transporter *multi_trp = (Multi_Transporter*)t;
2494 require(multi_trp->get_num_active_transporters() == 1);
2495 t = multi_trp->get_active_transporter(0);
2496 }
2497 require(!t->isMultiTransporter());
2498 require(!t->isPartOfMultiTransporter());
2499 Uint32 id = t->getTransporterIndex();
2500 DEBUG_FPRINTF((stderr, "(%u)REG:report_connect(%u)\n",
2501 localNodeId, node_id));
2502 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2503 assert(recvdata.m_transporters.get(id));
2504
2505 DBUG_ENTER("TransporterRegistry::report_connect");
2506 DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
2507
2508 if (recvdata.epoll_add(t))
2509 {
2510 callbackObj->enable_send_buffer(node_id, id);
2511 DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = CONNECTED\n",
2512 localNodeId, node_id));
2513 performStates[node_id] = CONNECTED;
2514 recvdata.reportConnect(node_id);
2515 DBUG_VOID_RETURN;
2516 }
2517
2518 /**
2519 * Failed to add to epoll_set...
2520 * disconnect it (this is really really bad)
2521 */
2522 DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTING\n",
2523 localNodeId, node_id));
2524 performStates[node_id] = DISCONNECTING;
2525 DBUG_VOID_RETURN;
2526 }
2527
2528 Multi_Transporter*
get_node_multi_transporter(NodeId node_id)2529 TransporterRegistry::get_node_multi_transporter(NodeId node_id)
2530 {
2531 Uint32 i;
2532 Uint32 num_multi_transporters = get_num_multi_transporters();
2533 for (i = 0; i < num_multi_transporters; i++)
2534 {
2535 Multi_Transporter *multi_trp = get_multi_transporter(i);
2536 if (multi_trp->getRemoteNodeId() == node_id)
2537 {
2538 return multi_trp;
2539 }
2540 }
2541 return (Multi_Transporter*)0;
2542
2543 }
2544
2545 void
report_disconnect(TransporterReceiveHandle & recvdata,NodeId node_id,int errnum)2546 TransporterRegistry::report_disconnect(TransporterReceiveHandle& recvdata,
2547 NodeId node_id, int errnum)
2548 {
2549 DEBUG_FPRINTF((stderr, "(%u)REG:report_disconnect(%u, %d)\n",
2550 localNodeId, node_id, errnum));
2551 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
2552
2553 DBUG_ENTER("TransporterRegistry::report_disconnect");
2554 DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
2555
2556 #ifdef ERROR_INSERT
2557 if (m_blocked.get(node_id))
2558 {
2559 /* We are simulating real latency, so control events experience
2560 * it too
2561 */
2562 m_blocked_disconnected.set(node_id);
2563 m_disconnect_errors[node_id] = errnum;
2564 DBUG_VOID_RETURN;
2565 }
2566 #endif
2567
2568 /**
2569 * No one else should be using the transporter now,
2570 * reset its send buffer and recvdata.
2571 *
2572 * Note that we may 'do_disconnect' due to transporter failure,
2573 * while trying to 'CONNECTING'. This cause a transition
2574 * from CONNECTING to DISCONNECTING without first being CONNECTED.
2575 * Thus there can be multiple reset & disable of the buffers (below)
2576 * without being 'enabled' inbetween.
2577 */
2578 TrpId trp_ids[MAX_NODE_GROUP_TRANSPORTERS];
2579 Uint32 num_ids;
2580 lockMultiTransporters();
2581 get_trps_for_node(node_id, trp_ids, num_ids, MAX_NODE_GROUP_TRANSPORTERS);
2582
2583 bool ready_to_disconnect = true;
2584 Transporter *node_trp = theNodeIdTransporters[node_id];
2585 for (Uint32 i = 0; i < num_ids; i++)
2586 {
2587 Uint32 trp_id = trp_ids[i];
2588 DEBUG_FPRINTF((stderr, "trp_id = %u, node_id = %u\n", trp_id, node_id));
2589 if (recvdata.m_transporters.get(trp_id))
2590 {
2591 /**
2592 * disable_send_buffer ensures that no more signals will be sent
2593 * to the disconnected node. Every time we collect data for sending
2594 * we will check the send buffer enabled flag holding the m_send_lock
2595 * thereby ensuring that after the disable_send_buffer method is
2596 * called no more signals are sent.
2597 */
2598 callbackObj->disable_send_buffer(node_id, trp_id);
2599 recvdata.m_recv_transporters.clear(trp_id);
2600 recvdata.m_has_data_transporters.clear(trp_id);
2601 recvdata.m_handled_transporters.clear(trp_id);
2602 }
2603 else
2604 {
2605 /**
2606 * Transporter is handled by another receive thread. If the
2607 * transporter is still in the allTransporters array it means
2608 * that we haven't called report_disconnect in this receive
2609 * thread yet. Thus we are not yet ready to disconnect.
2610 */
2611 require(node_trp->isMultiTransporter());
2612 if (allTransporters[trp_id] != 0)
2613 {
2614 ready_to_disconnect = false;
2615 DEBUG_FPRINTF((stderr, "trp_id = %u, node_id = %u,"
2616 " ready_to_disconnect = false\n",
2617 trp_id, node_id));
2618 }
2619 }
2620 }
2621 if (node_trp->isMultiTransporter())
2622 {
2623 /**
2624 * Switch back to having only one active transporter which is the
2625 * original TCP or SHM transporter. We only handle the single
2626 * transporter case when setting up a new connection. We can
2627 * only move from single transporter to multi transporter when
2628 * the transporter is connected.
2629 */
2630 Multi_Transporter *multi_trp = (Multi_Transporter*)node_trp;
2631 for (Uint32 i = 0; i < num_ids; i++)
2632 {
2633 Uint32 trp_id = trp_ids[i];
2634 if (recvdata.m_transporters.get(trp_id))
2635 {
2636 /**
2637 * Remove the transporter from the set of transporters handled in
2638 * this receive thread.
2639 *
2640 * Removing it from this bitmask means that the receive thread
2641 * will ignore this transporter when receiving and polling.
2642 */
2643 DEBUG_FPRINTF((stderr, "trp_id = %u, node_id = %u,"
2644 " multi remove_all\n",
2645 trp_id, node_id));
2646 Transporter *t = multi_trp->get_active_transporter(i);
2647 t->doDisconnect();
2648 if (t->isPartOfMultiTransporter())
2649 {
2650 require(num_ids > 1);
2651 remove_allTransporters(t);
2652 }
2653 else
2654 {
2655 require(num_ids == 1);
2656 Uint32 num_multi_trps = multi_trp->get_num_inactive_transporters();
2657 for (Uint32 i = 0; i < num_multi_trps; i++)
2658 {
2659 Transporter *remove_trp = multi_trp->get_inactive_transporter(i);
2660 TrpId remove_trp_id = remove_trp->getTransporterIndex();
2661 if (remove_trp_id != 0)
2662 {
2663 NodeId remove_node_id = remove_trp->getRemoteNodeId();
2664 require(node_id == remove_node_id);
2665 callbackObj->disable_send_buffer(node_id, remove_trp_id);
2666 remove_trp->doDisconnect();
2667 remove_allTransporters(remove_trp);
2668 }
2669 }
2670 }
2671 }
2672 }
2673 if (ready_to_disconnect)
2674 {
2675 /**
2676 * All multi transporter objects for this node have been removed
2677 * from being handled by any receive thread. We set the single
2678 * transporter used for setup of connections to be handled by
2679 * this receive thread. This assignment means that we can get
2680 * an unbalanced load on receive threads. However normally the
2681 * transporters should return to using multiple transporters as
2682 * soon as the connection is setup again. So this imbalance should
2683 * be temporary.
2684 */
2685 DEBUG_FPRINTF((stderr, "node_id = %u, new_node_trp\n",
2686 node_id));
2687 if (multi_trp->get_num_active_transporters() > 1)
2688 {
2689 /**
2690 * Multi socket setup is in use, thus switch to single transporter.
2691 * In rare situations we can still have data to send on base
2692 * transporter, so need to remove the send buffer before
2693 * disconnecting. Also ensure that connection is dropped here.
2694 */
2695 multi_trp->switch_active_trp();
2696 Transporter *base_trp = multi_trp->get_active_transporter(0);
2697 NodeId base_node_id = base_trp->getRemoteNodeId();
2698 TrpId base_trp_id = base_trp->getTransporterIndex();
2699 require(base_node_id == node_id);
2700 callbackObj->disable_send_buffer(node_id, base_trp_id);
2701 base_trp->doDisconnect();
2702 }
2703 }
2704 }
2705 else
2706 {
2707 Multi_Transporter *multi_trp = get_node_multi_transporter(node_id);
2708 (void)multi_trp;
2709 DEBUG_FPRINTF((stderr, "node_id = %u, multi_trp not %s\n",
2710 node_id, multi_trp == 0 ? "existing" : "active"));
2711 }
2712 recvdata.m_bad_data_transporters.clear(node_id);
2713 recvdata.m_last_trp_id = 0;
2714 if (ready_to_disconnect)
2715 {
2716 DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTED\n",
2717 localNodeId, node_id));
2718 performStates[node_id] = DISCONNECTED;
2719 recvdata.reportDisconnect(node_id, errnum);
2720 }
2721 unlockMultiTransporters();
2722 DBUG_VOID_RETURN;
2723 }
2724
2725 /**
2726 * We only call TransporterCallback::reportError() from
2727 * TransporterRegistry::update_connections().
2728 *
2729 * In other places we call this method to enqueue the error that will later be
2730 * picked up by update_connections().
2731 */
2732 void
report_error(NodeId nodeId,TransporterError errorCode,const char * errorInfo)2733 TransporterRegistry::report_error(NodeId nodeId, TransporterError errorCode,
2734 const char *errorInfo)
2735 {
2736 DEBUG_FPRINTF((stderr, "(%u)REG:report_error(%u, %d, %s\n",
2737 localNodeId, nodeId, (int)errorCode, errorInfo));
2738 if (m_error_states[nodeId].m_code == TE_NO_ERROR &&
2739 m_error_states[nodeId].m_info == (const char *)~(UintPtr)0)
2740 {
2741 m_error_states[nodeId].m_code = errorCode;
2742 m_error_states[nodeId].m_info = errorInfo;
2743 }
2744 }
2745
2746 /**
2747 * Data node transporter model (ndbmtd)
2748 * ------------------------------------
2749 * The concurrency protection of transporters uses a number of mutexes
2750 * together with some things that can only happen in a single thread.
2751 * To explain this we will consider send and receive separately.
2752 * We also need to consider multi socket transporters distinctively since
2753 * they add another layer to the concurrency model.
2754 *
2755 * Each transporter is protected by two mutexes. One to protect send and
2756 * one to protect receives. Actually the send mutex is divided into two
2757 * mutexes as well.
2758 *
2759 * Starting with send when we send a signal we write it only in thread-local
2760 * buffers at first. Next at regular times we move them to buffers per
2761 * transporter. To move them from these thread-local buffers to the
2762 * transporter buffers requires acquiring the send buffer mutex. This
2763 * activity is normally called flush buffers.
2764 *
2765 * Next the transporter buffers are sent, this requires holding the
2766 * buffer mutex as well as the send mutex when moving the buffers to the
2767 * transporter. While sending only the send mutex is required. This mutex
2768 * is held also when performing the send call on the transporter.
2769 *
2770 * The selection of the transporter to send to next is handled by a
2771 * a global send thread mutex. This mutex ensures that we send 50% of the
2772 * time to our neighbour nodes (data nodes within the same nodegroup) and
2773 * 50% to the rest of the nodes in the cluster. Thus we give much higher
2774 * priority to messages performing write transactions in the cluster as
2775 * well as local reads within the same nodegroup.
2776 *
2777 * For receive we use a model where a transporter is only handled by one
2778 * receive thread. This thread has its own mutex to protect against
2779 * interaction with other threads for transporter connect and disconnect.
2780 * This mutex is held while calling performReceive. performReceive
2781 * receives data from those transporters that signalled that data was
2782 * available in pollReceive. The mutex isn't held while calling pollReceive.
2783 *
2784 * Connect of the transporter requires not so much protection. Nothing
2785 * will be received until we add the transporter to the epoll set (even
2786 * shared memory transporters use epoll sets (implemented using poll in
2787 * non-Linux platforms). The epoll_add call happens in the receive thread
2788 * owning the transporter, thus no call to pollReceive will happen on the
2789 * transporter while we are calling update_connections where the
2790 * report_connect is called from when a connect has happened.
2791 *
2792 * After adding the transporter to the epoll set we enable the send
2793 * buffers, this uses both the send mutex and the send buffer mutex
2794 * on the transporter.
2795 *
2796 * Finally a signal is sent to TRPMAN in the receive thread (actually
2797 * the same thread where the update_connections is sent from. This signal
2798 * is simply sent onwards to QMGR plus some logging of the connection
2799 * event.
2800 *
2801 * CONNECT_REP in QMGR updates the node information and initiates the
2802 * CM_REGREQ protocol for data nodes and API_REGREQ for API nodes to
2803 * include the node in an organised manner. At this point only QMGR
2804 * is allowed to receive signals from the node. QMGR will allow for
2805 * any block to communicate with the node when finishing the
2806 * registration.
2807 *
2808 * The actual connect logic happens in start_clients_thread that
2809 * executes in a single thread that handles all client connect setup.
2810 * This thread regularly wakes up and calls connect on transporters
2811 * not yet connected. It has backoff logic to ensure these connects
2812 * don't happen too often. It will only perform this connect client
2813 * attempts when the node is in the state CONNECTING. The QMGR code
2814 * will ensure that we set this state when we are ready to include
2815 * the node into the cluster again. After a node failure we have to
2816 * handle the node failure to completion before we allow the node
2817 * to reenter the cluster.
2818 *
2819 * Connecting a client requires both a connection to be setup, in
2820 * addition the client and the server must execute an authentication
2821 * protocol and a protocol where we signal what type of transporter
2822 * that is connecting.
2823 *
2824 * The server port to connect the client is either provided by the
2825 * configuration (recommended in any scenario where firewalls exists)
2826 * in the network. If not configured the server uses a dynamic port
2827 * and informs the management server about this port number.
2828 *
2829 * When the client has successfully connected and the authentication
2830 * protocol is successfully completed and the information about the
2831 * transporter type is sent, then we will set the transporter state
2832 * to connected. This will be picked up by update_connections that
2833 * will perform the logic above.
2834 *
2835 * When the connection has been successfully completed we set the
2836 * variable theSocket in the transporter. To ensure that we don't
2837 * call any socket actions on a socket we already closed we protect
2838 * this assignment with both the send mutex and the receive mutex.
2839 *
2840 * The actual network logic used by the start_clients_thread is
2841 * found in the SocketClient class.
2842 *
2843 * Similar logic exists also for transporters where the node is the
2844 * server part. As part of starting the data node we setup a socket
2845 * which we bind to the hostname and server port to use for this
2846 * data node. Next we will start to listen for connect attempts by
2847 * clients.
2848 *
2849 * When a client attempts to connect we will see this through the
2850 * accept call on the socket. The verification of this client
2851 * attempt is handled by the newSession call. For data nodes
2852 * this is the newSession here in TransporterRegistry.cpp.
2853 * This method will first handle the authentication protocol where
2854 * the client follows the NDB connect setup protocol to verify that
2855 * it is an NDB connection being setup. Next we call connect_server
2856 * in the TransporterRegistry.
2857 *
2858 * connect_server will receive information about node id, transporter type
2859 * that will enable it to verify that the connect is ok to perform at the
2860 * moment. It is very normal that there are many attempts to connect that
2861 * are unsuccessful since the API nodes are not allowed to connect until
2862 * the data node is started and they will regularly attempt to connect
2863 * while the data node is starting.
2864 *
2865 * When the connect has been successfully setup we will use the same logic
2866 * as for clients where we set theSocket and set the transporter state to
2867 * connected to ensure that update_connections will see the new
2868 * connection. So there is a lot of logic to setup the connections, but
2869 * really only the setting of theSocket requires mutex protection.
2870 *
2871 * Disconnects can be discovered by both the send logic as well as the
2872 * receive logic when calling socket send and socket recv. In both cases
2873 * the transporter will call do_disconnect. The only action here is to
2874 * set the state to DISCONNECTING (no action if already set or the state
2875 * is already set to DISCONNECTED). While calling this function we can
2876 * either hold the send mutex or the receive mutex. But none of these
2877 * are needed since we only set the DISCONNECTING state. This state
2878 * change will be picked up by start_clients_thread (note that it will
2879 * be picked by this method independent of if it connected as a client
2880 * or as a server. This method will call doDisconnect on the transporter.
2881 * This method will update the state and next it will call
2882 * disconnectImpl on the transporter that will close the socket and
2883 * ensure that the variable theSocket no longer points to a valid
2884 * socket. This is again protected by the send mutex and the receive
2885 * mutex to ensure that it doesn't happen while we are calling any
2886 * send or recv calls. Here we will also close the socket.
2887 *
2888 * Setting the state to not connected in doDisconnect will flag to
2889 * update_connections that it should call report_disconnect. This
2890 * call will disable the send buffer (holding the send mutex and
2891 * send buffer mutex), it will also clear bitmasks for the transporter
2892 * used by the receive thread. It requires no mutex to protect these
2893 * changes since they are performed in the receive thread that is
2894 * responsible for receiving on this transporter. The call can be
2895 * performed in all receive threads, but will be ignored by any
2896 * receive thread not responsible for it.
2897 *
2898 * Next it will call reportDisconnect that will send a signal
2899 * DISCONNECT_REP to our TRPMAN instance. This signal will be
2900 * sent to QMGR, the disconnect event will be logged and we will
2901 * send a signal to CMVMI to cancel subscriptions for the node.
2902 *
2903 * QMGR will update node state information and will initiate
2904 * node failure handling if not already started.
2905 *
2906 * QMGR will control when it is ok to communicate with the node
2907 * using CLOSE_COMREQ and OPEN_COMREQ calls to TRPMAN.
2908 *
2909 * Closing the communication to a node can also be initiated from
2910 * receiving a signal from another node that the node is dead.
2911 * In this case QMGR will send CLOSE_COMREQ to TRPMAN instances and
2912 * TRPMAN will set the IO state to HaltIO to ensure no more signals
2913 * are handled and it will call do_disconnect in TransporterRegistry
2914 * to ensure that the above close sequence is called although the
2915 * transporters are still functional.
2916 *
2917 * Introducing multi transporters
2918 * ------------------------------
2919 * To enable higher communication throughput between data nodes it is
2920 * possible to setup multiple transporters for communicating with another
2921 * data node. Currently this feature only allows for multiple transporters
2922 * when communicating within the same node group. There is no principal
2923 * problem with multiple transporters for other communication between
2924 * data nodes as well. For API nodes we already have the ability to
2925 * use multiple transporters from an application program by using
2926 * multiple node ids. Thus it is not necessary to add multiple
2927 * transporters to communicate with API nodes.
2928 *
2929 * The connect process cannot use multiple transporters. The reason is
2930 * that we cannot be certain that the connecting node supports multiple
2931 * transporters between two data nodes. We can only check this once the
2932 * connection is setup.
2933 *
2934 * This means that the change from a single socket to multiple sockets
2935 * is handled when the node is in a connected state and thus traffic
2936 * between the nodes is happening as we setup the multi socket setup.
2937 *
2938 * Given that multiple transporters can only be used between data nodes
2939 * we don't need to handle the Shared memory transporter as used for
2940 * multiple transporters since Shared memory transporter can only be
2941 * used to communicate between data nodes and API nodes.
2942 *
2943 * Thus multiple transporters is only handled by TCP transporters.
2944 *
2945 * Additionally to setup multiple transporters we need to know which
2946 * nodes are part of the same node group. This information is available
2947 * already in start phase 3 for initial starts and for cluster restarts.
2948 * For node restarts and initial node restarts we will wait until start
2949 * phase 4 to setup multiple transporters. In addition we have the case
2950 * where node groups are added while nodes are up and running, in this
2951 * case the nodes in the new node group will setup multiple transporters
2952 * when the new node group creation is committed.
2953 *
2954 * The setup logic for multiple transporters is described in detail in
2955 * QmgrMain.cpp. Here we will focus on how it changes the connect/disconnect
2956 * logic and what concurrency protection is required to handle the
2957 * multiple transporters.
2958 *
2959 * We introduce a new mutex that is locked by calling lockMultiTransporters
2960 * and unlocked through the call unlockMultiTransporters. This mutex should
2961 * always be taken before any other mutex is acquired to avoid deadlocks.
2962 *
2963 * During setup we communicate between the nodes in the same nodegroup to
2964 * decide on the number of multi sockets to use for one transporter. The
2965 * multiple transporters are handled also by a new Transporter class called
2966 * Multi_Transporter. When the change is performed to use multi transporters
2967 * this transporter is placed into the node transporter array. It is never
2968 * removed from this array even after a node failure.
2969 *
2970 * When we setup the multi transporters we assign them to different recv
2971 * threads. We also ensure that the port number is copied from the base
2972 * transporter to the new multi transporters to ensure that we use the
2973 * dynamic port number for a new node after a restart.
2974 *
2975 * The way to activate the connect of the multiple transporters happens by
2976 * inserting them into the allTransporters array. By inserting them into
2977 * this array they will be handled by the start_clients_thread.
2978 *
2979 * The protocol to setup a multi transporter is slightly different since we
2980 * need to know the node id, transporter type and additionally the instance
2981 * number of the transporter being setup in the connection. To accept a
2982 * connection we must be in the CONNECTED state for the node and the instance
2983 * number must exist in the multi transporter and must not be already
2984 * connected.
2985 *
2986 * Next step is to perform switch activity, when we agreed with the other
2987 * node on to perform this action we will send a crucial signal
2988 * ACTIVATE_TRP_REQ. This signal will be sent on the base transporter.
2989 * Immediately after sending this signal we will switch to using multi
2990 * transporters.
2991 *
2992 * Before sending this signal we ensure that the only thread active is the
2993 * main thread and we lock all send mutex for both the base transporter
2994 * and the new multi transporters. We perform this by a new set of
2995 * signals to freeze threads.
2996 *
2997 * When receiving this signal in the other node we need to use
2998 * SYNC_THREAD_VIA_REQ. This ensures that all signals sent using the
2999 * base transporter have been executed before we start executing the
3000 * signals arriving on the new multi transporters. This ensures that
3001 * we always keep signal order even in the context of changing
3002 * the number of transporters for a node.
3003 *
3004 * After this we will send the activation request for each new transporter
3005 * to each of the TRPMAN instances. When this request arrives the new
3006 * transporter will be added to the epoll set of the receive thread. So
3007 * from here on the signals sent after the ACTIVATE_TRP_REQ signal can
3008 * be processed in the receiving node. When all ACTIVATE_TRP_REQ have
3009 * been processed we can send ACTIVATE_TRP_CONF to the requesting node.
3010 * If we have received ACTIVATE_TRP_CONF from the other node and we
3011 * have received ACTIVATE_TRP_REQ from the other node, then we are ready
3012 * to close the socket of the base transporter. While doing this we call
3013 * lockMultiTransporters and we lock both the send mutex and the receive
3014 * thread mutex to ensure that we don't interact with any socket calls
3015 * when performing this action. At this point we also disable the send
3016 * buffer of the base transporter.
3017 *
3018 * At this point the switch to the multi transporter setup is completed.
3019 * If we are performing a restart during this setup and the other node
3020 * crashes we will also crash since we are in a restart. So only when
3021 * we are switching while we are up will this be an issue. When switching
3022 * and being up we must handle crashes in all phases of the setup.
3023 * We have added a test case that both tests crashes in all phases as
3024 * well as long sleeps in all phases.
3025 *
3026 * Closing the base transporter will cause do_disconnect to be called
3027 * on this transporter while the node is still up. Thus we have to
3028 * handle this call separately for non-active connections. This should
3029 * only happen in the receive thread, so we don't take the send
3030 * mutex before changing theSocket here. Since the transporter isn't
3031 * active it should not perform any send activity here.
3032 *
3033 *
3034 * Disconnects when using multi transporters still happens through the
3035 * do_disconnect call, either activated by an error on any of the multi
3036 * sockets or activated by blocks discovering a failed node. This means
3037 * that performStates for the node is set to DISCONNECTING. This will
3038 * trigger call to doDisconnect from start_clients_thread for each of
3039 * the multi transporters and thus every multi transporter will have
3040 * the disconnectImpl method called on the transporter.
3041 *
3042 * This in turn will lead to that update_connections will call
3043 * report_disconnect for each of the multi transporters.
3044 *
3045 * The report_disconnect will discover that a multi transporter is
3046 * involved. It will disable send buffers and receive thread bitmasks
3047 * will be cleared. It will remove the multi transporter from the
3048 * allTransporters array. This is synchronized by acquiring the
3049 * lockMultiTransporters before these actions.
3050 *
3051 * It is possible to arrive here after the switch to multi transporter
3052 * still having data in the send buffer (still waiting to send the
3053 * ACTIVATE_TRP_REQ signal). This case must be handled by calling
3054 * disable send buffer as well as calling doDisconnect on the
3055 * base transporter that we already switched away from, otherwise these
3056 * send buffers will remain when starting the node up again and cause
3057 * a crash.
3058 *
3059 * As part of report_disconnect we will check if all multi transporters
3060 * have been handled by report_disconnect, we check this by looking into
3061 * allTransporters array to see if we removed the transporter from this
3062 * array for all multi transporters. If all have been removed we switch
3063 * back to the base transporter as the active transporter.
3064 *
3065 * If the failure happens before we switch to the multi transporters we will
3066 * disable send buffer and call doDisconnect also on the multi transporters
3067 * just in case.
3068 *
3069 * Finally when all multi transporters have been handled and we have
3070 * switched back to the base transporter we will send the DISCONNECT_REP
3071 * signal through the reportDisconnect call in the receive thread.
3072 *
3073 * After this we are again ready to setup the multi transporter again
3074 * after node failure handling have completed.
3075 *
3076 * No special handling of epoll sets (and poll sets in non-Linux platforms)
3077 * is required since the sockets are removed from those sets when the
3078 * socket is closed.
3079 *
3080 * update_connections()
3081 * --------------------
3082 * update_connections(), together with the thread running in
3083 * start_clients_thread(), handle the state changes for transporters as they
3084 * connect and disconnect.
3085 *
3086 * update_connections on a specific set of recvdata *must not* be run
3087 * concurrently with :performReceive() on the same recvdata. Thus,
3088 * it must either be called from the same (receive-)thread as
3089 * performReceive(), or protected by aquiring the (client) poll rights.
3090 */
3091 Uint32
update_connections(TransporterReceiveHandle & recvdata,Uint32 max_spintime)3092 TransporterRegistry::update_connections(TransporterReceiveHandle& recvdata,
3093 Uint32 max_spintime)
3094 {
3095 Uint32 spintime = 0;
3096 TransporterReceiveWatchdog guard(recvdata);
3097 assert((receiveHandle == &recvdata) || (receiveHandle == 0));
3098
3099 for (Uint32 i = 1; i < (nTransporters + 1); i++)
3100 {
3101 require(i < MAX_NTRANSPORTERS);
3102 Transporter * t = allTransporters[i];
3103 if (!t)
3104 continue;
3105
3106 const NodeId nodeId = t->getRemoteNodeId();
3107 if (!recvdata.m_transporters.get(i))
3108 continue;
3109
3110 TransporterError code = m_error_states[nodeId].m_code;
3111 const char *info = m_error_states[nodeId].m_info;
3112 if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
3113 {
3114 if (performStates[nodeId] == CONNECTING)
3115 {
3116 fprintf(stderr,
3117 "update_connections while CONNECTING, nodeId:%d, error:%d\n",
3118 nodeId,
3119 code);
3120 /* Failed during CONNECTING -> we are still DISCONNECTED */
3121 assert(!t->isConnected());
3122 assert(false);
3123 performStates[nodeId] = DISCONNECTED;
3124 }
3125
3126 recvdata.reportError(nodeId, code, info);
3127 m_error_states[nodeId].m_code = TE_NO_ERROR;
3128 m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
3129 }
3130
3131 switch(performStates[nodeId]){
3132 case CONNECTED:
3133 #ifndef WIN32
3134 if (t->getTransporterType() == tt_SHM_TRANSPORTER)
3135 {
3136 SHM_Transporter *shm_trp = (SHM_Transporter*)t;
3137 spintime = MAX(spintime, shm_trp->get_spintime());
3138 }
3139 #endif
3140 break;
3141 case DISCONNECTED:
3142 break;
3143 case CONNECTING:
3144 if(t->isConnected())
3145 report_connect(recvdata, nodeId);
3146 break;
3147 case DISCONNECTING:
3148 if(!t->isConnected())
3149 report_disconnect(recvdata, nodeId, m_disconnect_errnum[nodeId]);
3150 break;
3151 }
3152 }
3153 recvdata.nTCPTransporters = nTCPTransporters;
3154 recvdata.nSHMTransporters = nSHMTransporters;
3155 recvdata.m_spintime = MIN(spintime, max_spintime);
3156 return spintime; //Inform caller of spintime calculated on this level
3157 }
3158
3159 /**
3160 * Run as own thread
3161 * Possible blocking parts of transporter connect and diconnect
3162 * is supposed to be handled here.
3163 */
3164 void
start_clients_thread()3165 TransporterRegistry::start_clients_thread()
3166 {
3167 int persist_mgm_count= 0;
3168 DBUG_ENTER("TransporterRegistry::start_clients_thread");
3169 while (m_run_start_clients_thread) {
3170 NdbSleep_MilliSleep(100);
3171 persist_mgm_count++;
3172 if(persist_mgm_count==50)
3173 {
3174 ndb_mgm_check_connection(m_mgm_handle);
3175 persist_mgm_count= 0;
3176 }
3177 lockMultiTransporters();
3178 for (Uint32 i = 1;
3179 i < (nTransporters +1) && m_run_start_clients_thread;
3180 i++)
3181 {
3182 require(i < MAX_NTRANSPORTERS);
3183 Transporter * t = allTransporters[i];
3184 if (!t)
3185 continue;
3186
3187 const NodeId nodeId = t->getRemoteNodeId();
3188 switch(performStates[nodeId]){
3189 case CONNECTING:
3190 {
3191 if (t->isPartOfMultiTransporter())
3192 {
3193 break;
3194 }
3195 if (!t->isConnected() && !t->isServer)
3196 {
3197 if (get_and_clear_node_up_indicator(nodeId))
3198 {
3199 // Other node have indicated that node nodeId is up, try connect
3200 // now and restart backoff sequence
3201 backoff_reset_connecting_time(nodeId);
3202 }
3203 if (!backoff_update_and_check_time_for_connect(nodeId))
3204 {
3205 // Skip connect this time
3206 continue;
3207 }
3208
3209 bool connected= false;
3210 /**
3211 * First, we try to connect (if we have a port number).
3212 */
3213
3214 if (t->get_s_port())
3215 {
3216 DBUG_PRINT("info", ("connecting to node %d using port %d",
3217 nodeId, t->get_s_port()));
3218 unlockMultiTransporters();
3219 connected= t->connect_client();
3220 lockMultiTransporters();
3221 }
3222
3223 /**
3224 * If dynamic, get the port for connecting from the management server
3225 */
3226 if (!connected && t->get_s_port() <= 0) // Port is dynamic
3227 {
3228 int server_port= 0;
3229 struct ndb_mgm_reply mgm_reply;
3230 unlockMultiTransporters();
3231
3232 DBUG_PRINT("info", ("connection to node %d should use "
3233 "dynamic port",
3234 nodeId));
3235
3236 if(!ndb_mgm_is_connected(m_mgm_handle))
3237 ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
3238
3239 if(ndb_mgm_is_connected(m_mgm_handle))
3240 {
3241 DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
3242 nodeId));
3243
3244 const int res=
3245 ndb_mgm_get_connection_int_parameter(m_mgm_handle,
3246 t->getRemoteNodeId(),
3247 t->getLocalNodeId(),
3248 CFG_CONNECTION_SERVER_PORT,
3249 &server_port,
3250 &mgm_reply);
3251
3252 DBUG_PRINT("info",("Got dynamic port %d for %d -> %d (ret: %d)",
3253 server_port,t->getRemoteNodeId(),
3254 t->getLocalNodeId(),res));
3255 if( res >= 0 )
3256 {
3257 DBUG_PRINT("info", ("got port %d to use for connection to %d",
3258 server_port, nodeId));
3259
3260 if (server_port != 0)
3261 {
3262 if (t->get_s_port() != server_port)
3263 {
3264 // Got a different port number, reset backoff
3265 backoff_reset_connecting_time(nodeId);
3266 }
3267 // Save the new port number
3268 t->set_s_port(server_port);
3269 }
3270 else
3271 {
3272 // Got port number 0, port is not known. Keep the old.
3273 }
3274 }
3275 else if(ndb_mgm_is_connected(m_mgm_handle))
3276 {
3277 DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
3278 res));
3279 g_eventLogger->info("Failed to get dynamic port, res: %d",
3280 res);
3281 ndb_mgm_disconnect(m_mgm_handle);
3282 }
3283 else
3284 {
3285 DBUG_PRINT("info", ("mgmd close connection early"));
3286 g_eventLogger->info
3287 ("Management server closed connection early. "
3288 "It is probably being shut down (or has problems). "
3289 "We will retry the connection. %d %s %s line: %d",
3290 ndb_mgm_get_latest_error(m_mgm_handle),
3291 ndb_mgm_get_latest_error_desc(m_mgm_handle),
3292 ndb_mgm_get_latest_error_msg(m_mgm_handle),
3293 ndb_mgm_get_latest_error_line(m_mgm_handle)
3294 );
3295 }
3296 }
3297 /** else
3298 * We will not be able to get a new port unless
3299 * the m_mgm_handle is connected. Note that not
3300 * being connected is an ok state, just continue
3301 * until it is able to connect. Continue using the
3302 * old port until we can connect again and get a
3303 * new port.
3304 */
3305 lockMultiTransporters();
3306 }
3307 }
3308 break;
3309 }
3310 case DISCONNECTING:
3311 if(t->isConnected())
3312 {
3313 DEBUG_FPRINTF((stderr, "(%u)doDisconnect(%u), line: %u\n",
3314 localNodeId, t->getRemoteNodeId(), __LINE__));
3315 t->doDisconnect();
3316 }
3317 break;
3318 case DISCONNECTED:
3319 {
3320 if (t->isConnected())
3321 {
3322 g_eventLogger->warning("Found connection to %u in state DISCONNECTED "
3323 " while being connected, disconnecting!",
3324 t->getRemoteNodeId());
3325 DEBUG_FPRINTF((stderr, "(%u)doDisconnect(%u), line: %u\n",
3326 localNodeId, t->getRemoteNodeId(), __LINE__));
3327 t->doDisconnect();
3328 }
3329 break;
3330 }
3331 case CONNECTED:
3332 {
3333 if (t->isPartOfMultiTransporter() &&
3334 !t->isConnected() &&
3335 !t->isServer)
3336 {
3337 require(t->get_s_port());
3338 DBUG_PRINT("info", ("connecting multi-transporter to node %d"
3339 " using port %d",
3340 nodeId,
3341 t->get_s_port()));
3342 unlockMultiTransporters();
3343 t->connect_client();
3344 DEBUG_FPRINTF((stderr, "Connect client of trp id %u, res: %u\n",
3345 t->getTransporterIndex(),
3346 t->isConnected()));
3347 lockMultiTransporters();
3348 }
3349 }
3350 default:
3351 break;
3352 }
3353 }
3354 unlockMultiTransporters();
3355 }
3356 DBUG_VOID_RETURN;
3357 }
3358
3359 struct NdbThread*
start_clients()3360 TransporterRegistry::start_clients()
3361 {
3362 m_run_start_clients_thread= true;
3363 m_start_clients_thread= NdbThread_Create(run_start_clients_C,
3364 (void**)this,
3365 0, // default stack size
3366 "ndb_start_clients",
3367 NDB_THREAD_PRIO_LOW);
3368 if (m_start_clients_thread == 0)
3369 {
3370 m_run_start_clients_thread= false;
3371 }
3372 return m_start_clients_thread;
3373 }
3374
3375 bool
stop_clients()3376 TransporterRegistry::stop_clients()
3377 {
3378 if (m_start_clients_thread) {
3379 m_run_start_clients_thread= false;
3380 void* status;
3381 NdbThread_WaitFor(m_start_clients_thread, &status);
3382 NdbThread_Destroy(&m_start_clients_thread);
3383 }
3384 return true;
3385 }
3386
3387 void
add_transporter_interface(NodeId remoteNodeId,const char * interf,int s_port)3388 TransporterRegistry::add_transporter_interface(NodeId remoteNodeId,
3389 const char *interf,
3390 int s_port)
3391 {
3392 DBUG_ENTER("TransporterRegistry::add_transporter_interface");
3393 DBUG_PRINT("enter",("interface=%s, s_port= %d", interf, s_port));
3394 if (interf && strlen(interf) == 0)
3395 interf= 0;
3396
3397 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
3398 {
3399 Transporter_interface &tmp= m_transporter_interface[i];
3400 if (s_port != tmp.m_s_service_port || tmp.m_s_service_port==0)
3401 continue;
3402 if (interf != 0 && tmp.m_interface != 0 &&
3403 strcmp(interf, tmp.m_interface) == 0)
3404 {
3405 DBUG_VOID_RETURN; // found match, no need to insert
3406 }
3407 if (interf == 0 && tmp.m_interface == 0)
3408 {
3409 DBUG_VOID_RETURN; // found match, no need to insert
3410 }
3411 }
3412 Transporter_interface t;
3413 t.m_remote_nodeId= remoteNodeId;
3414 t.m_s_service_port= s_port;
3415 t.m_interface= interf;
3416 m_transporter_interface.push_back(t);
3417 DBUG_PRINT("exit",("interface and port added"));
3418 DBUG_VOID_RETURN;
3419 }
3420
3421 bool
start_service(SocketServer & socket_server)3422 TransporterRegistry::start_service(SocketServer& socket_server)
3423 {
3424 DBUG_ENTER("TransporterRegistry::start_service");
3425 if (m_transporter_interface.size() > 0 &&
3426 localNodeId == 0)
3427 {
3428 g_eventLogger->error("INTERNAL ERROR: not initialized");
3429 DBUG_RETURN(false);
3430 }
3431
3432 for (unsigned i= 0; i < m_transporter_interface.size(); i++)
3433 {
3434 Transporter_interface &t= m_transporter_interface[i];
3435
3436 unsigned short port= (unsigned short)t.m_s_service_port;
3437 if(t.m_s_service_port<0)
3438 port= -t.m_s_service_port; // is a dynamic port
3439 TransporterService *transporter_service =
3440 new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
3441 if(!socket_server.setup(transporter_service,
3442 &port, t.m_interface))
3443 {
3444 DBUG_PRINT("info", ("Trying new port"));
3445 port= 0;
3446 if(t.m_s_service_port>0
3447 || !socket_server.setup(transporter_service,
3448 &port, t.m_interface))
3449 {
3450 /*
3451 * If it wasn't a dynamically allocated port, or
3452 * our attempts at getting a new dynamic port failed
3453 */
3454 g_eventLogger->error("Unable to setup transporter service port: %s:%d!\n"
3455 "Please check if the port is already used,\n"
3456 "(perhaps the node is already running)",
3457 t.m_interface ? t.m_interface : "*", t.m_s_service_port);
3458 delete transporter_service;
3459 DBUG_RETURN(false);
3460 }
3461 }
3462 t.m_s_service_port= (t.m_s_service_port<=0)?-port:port; // -`ve if dynamic
3463 DBUG_PRINT("info", ("t.m_s_service_port = %d",t.m_s_service_port));
3464 transporter_service->setTransporterRegistry(this);
3465 }
3466 DBUG_RETURN(true);
3467 }
3468
3469 void
startReceiving()3470 TransporterRegistry::startReceiving()
3471 {
3472 DBUG_ENTER("TransporterRegistry::startReceiving");
3473
3474 #ifndef WIN32
3475 m_shm_own_pid = getpid();
3476 #endif
3477 DBUG_VOID_RETURN;
3478 }
3479
3480 void
stopReceiving()3481 TransporterRegistry::stopReceiving(){
3482 }
3483
3484 void
startSending()3485 TransporterRegistry::startSending(){
3486 }
3487
3488 void
stopSending()3489 TransporterRegistry::stopSending(){
3490 }
3491
operator <<(NdbOut & out,SignalHeader & sh)3492 NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
3493 out << "-- Signal Header --" << endl;
3494 out << "theLength: " << sh.theLength << endl;
3495 out << "gsn: " << sh.theVerId_signalNumber << endl;
3496 out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
3497 out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
3498 out << "sendersSig: " << sh.theSendersSignalId << endl;
3499 out << "theSignalId: " << sh.theSignalId << endl;
3500 out << "trace: " << (int)sh.theTrace << endl;
3501 return out;
3502 }
3503
3504 int
get_transporter_count() const3505 TransporterRegistry::get_transporter_count() const
3506 {
3507 assert(nTransporters > 0);
3508 return nTransporters;
3509 }
3510
3511 bool
is_shm_transporter(TrpId trp_id)3512 TransporterRegistry::is_shm_transporter(TrpId trp_id)
3513 {
3514 assert(trp_id < maxTransporters);
3515 Transporter *trp = allTransporters[trp_id];
3516 if (trp->getTransporterType() == tt_SHM_TRANSPORTER)
3517 return true;
3518 else
3519 return false;
3520 }
3521
3522 Transporter*
get_transporter(TrpId trp_id) const3523 TransporterRegistry::get_transporter(TrpId trp_id) const
3524 {
3525 assert(trp_id < MAX_NTRANSPORTERS);
3526 return allTransporters[trp_id];
3527 }
3528
3529 Transporter*
get_node_transporter(NodeId nodeId) const3530 TransporterRegistry::get_node_transporter(NodeId nodeId) const
3531 {
3532 assert(nodeId <= MAX_NODES);
3533 return theNodeIdTransporters[nodeId];
3534 }
3535
connect_client(NdbMgmHandle * h)3536 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
3537 {
3538 DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
3539
3540 Uint32 mgm_nodeid= ndb_mgm_get_mgmd_nodeid(*h);
3541
3542 if(!mgm_nodeid)
3543 {
3544 g_eventLogger->error("%s: %d", __FILE__, __LINE__);
3545 return false;
3546 }
3547 Transporter * t = theNodeIdTransporters[mgm_nodeid];
3548 if (!t)
3549 {
3550 g_eventLogger->error("%s: %d", __FILE__, __LINE__);
3551 return false;
3552 }
3553
3554 if (t->isMultiTransporter())
3555 {
3556 Multi_Transporter *multi_trp = (Multi_Transporter*)t;
3557 require(multi_trp->get_num_active_transporters() == 1);
3558 t = multi_trp->get_active_transporter(0);
3559 }
3560 require(!t->isMultiTransporter());
3561 require(!t->isPartOfMultiTransporter());
3562 bool res = t->connect_client(connect_ndb_mgmd(h));
3563 if (res == true)
3564 {
3565 DEBUG_FPRINTF((stderr, "(%u)performStates[%u] = DISCONNECTING,"
3566 " connect_client\n", localNodeId, mgm_nodeid));
3567 performStates[mgm_nodeid] = TransporterRegistry::CONNECTING;
3568 }
3569 DBUG_RETURN(res);
3570 }
3571
3572
report_dynamic_ports(NdbMgmHandle h) const3573 bool TransporterRegistry::report_dynamic_ports(NdbMgmHandle h) const
3574 {
3575 // Fill array of nodeid/port pairs for those ports which are dynamic
3576 unsigned num_ports = 0;
3577 ndb_mgm_dynamic_port ports[MAX_NODES];
3578 for(unsigned i = 0; i < m_transporter_interface.size(); i++)
3579 {
3580 const Transporter_interface& ti = m_transporter_interface[i];
3581 if (ti.m_s_service_port >= 0)
3582 continue; // Not a dynamic port
3583
3584 assert(num_ports < NDB_ARRAY_SIZE(ports));
3585 ports[num_ports].nodeid = ti.m_remote_nodeId;
3586 ports[num_ports].port = ti.m_s_service_port;
3587 num_ports++;
3588 }
3589
3590 if (num_ports == 0)
3591 {
3592 // No dynamic ports in use, nothing to report
3593 return true;
3594 }
3595
3596 // Send array of nodeid/port pairs to mgmd
3597 if (ndb_mgm_set_dynamic_ports(h, localNodeId,
3598 ports, num_ports) < 0)
3599 {
3600 g_eventLogger->error("Failed to register dynamic ports, error: %d - '%s'",
3601 ndb_mgm_get_latest_error(h),
3602 ndb_mgm_get_latest_error_desc(h));
3603 return false;
3604 }
3605
3606 return true;
3607 }
3608
3609
3610 /**
3611 * Given a connected NdbMgmHandle, turns it into a transporter
3612 * and returns the socket.
3613 */
connect_ndb_mgmd(NdbMgmHandle * h)3614 NDB_SOCKET_TYPE TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle *h)
3615 {
3616 NDB_SOCKET_TYPE sockfd;
3617 ndb_socket_invalidate(&sockfd);
3618
3619 DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
3620
3621 if ( h==NULL || *h == NULL )
3622 {
3623 g_eventLogger->error("Mgm handle is NULL (%s:%d)", __FILE__, __LINE__);
3624 DBUG_RETURN(sockfd);
3625 }
3626
3627 if (!report_dynamic_ports(*h))
3628 {
3629 ndb_mgm_destroy_handle(h);
3630 DBUG_RETURN(sockfd);
3631 }
3632
3633 /**
3634 * convert_to_transporter also disposes of the handle (i.e. we don't leak
3635 * memory here.
3636 */
3637 DBUG_PRINT("info", ("Converting handle to transporter"));
3638 sockfd= ndb_mgm_convert_to_transporter(h);
3639 if (!ndb_socket_valid(sockfd))
3640 {
3641 g_eventLogger->error("Failed to convert to transporter (%s: %d)",
3642 __FILE__, __LINE__);
3643 ndb_mgm_destroy_handle(h);
3644 }
3645 DBUG_RETURN(sockfd);
3646 }
3647
3648 /**
3649 * Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
3650 * and returns the socket.
3651 */
3652 NDB_SOCKET_TYPE
connect_ndb_mgmd(const char * server_name,unsigned short server_port)3653 TransporterRegistry::connect_ndb_mgmd(const char* server_name,
3654 unsigned short server_port)
3655 {
3656 NdbMgmHandle h= ndb_mgm_create_handle();
3657 NDB_SOCKET_TYPE s;
3658 ndb_socket_invalidate(&s);
3659
3660 DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
3661
3662 if ( h == NULL )
3663 {
3664 DBUG_RETURN(s);
3665 }
3666
3667 /**
3668 * Set connectstring
3669 */
3670 {
3671 BaseString cs;
3672 cs.assfmt("%s:%u", server_name, server_port);
3673 ndb_mgm_set_connectstring(h, cs.c_str());
3674 }
3675
3676 if(ndb_mgm_connect(h, 0, 0, 0)<0)
3677 {
3678 DBUG_PRINT("info", ("connection to mgmd failed"));
3679 ndb_mgm_destroy_handle(&h);
3680 DBUG_RETURN(s);
3681 }
3682
3683 DBUG_RETURN(connect_ndb_mgmd(&h));
3684 }
3685
3686 /**
3687 * The calls below are used by all implementations: NDB API, ndbd and
3688 * ndbmtd. The calls to handle->getWritePtr, handle->updateWritePtr
3689 * are handled by special implementations for NDB API, ndbd and
3690 * ndbmtd.
3691 */
3692
3693 Uint32 *
getWritePtr(TransporterSendBufferHandle * handle,Transporter * t,Uint32 trp_id,Uint32 lenBytes,Uint32 prio,SendStatus * error)3694 TransporterRegistry::getWritePtr(TransporterSendBufferHandle *handle,
3695 Transporter* t,
3696 Uint32 trp_id,
3697 Uint32 lenBytes,
3698 Uint32 prio,
3699 SendStatus *error)
3700 {
3701 NodeId nodeId = t->getRemoteNodeId();
3702 Uint32 *insertPtr = handle->getWritePtr(nodeId,
3703 trp_id,
3704 lenBytes,
3705 prio,
3706 t->get_max_send_buffer(),
3707 error);
3708
3709 if (unlikely(insertPtr == nullptr && *error != SEND_MESSAGE_TOO_BIG))
3710 {
3711 //-------------------------------------------------
3712 // Buffer was completely full. We have severe problems.
3713 // We will attempt to wait for a small time
3714 //-------------------------------------------------
3715 if (t->send_is_possible(10))
3716 {
3717 //-------------------------------------------------
3718 // Send is possible after the small timeout.
3719 //-------------------------------------------------
3720 if (!handle->forceSend(nodeId, trp_id))
3721 {
3722 return 0;
3723 }
3724 else
3725 {
3726 //-------------------------------------------------
3727 // Since send was successful we will make a renewed
3728 // attempt at inserting the signal into the buffer.
3729 //-------------------------------------------------
3730 insertPtr = handle->getWritePtr(nodeId,
3731 trp_id,
3732 lenBytes,
3733 prio,
3734 t->get_max_send_buffer(),
3735 error);
3736 }//if
3737 }
3738 else
3739 {
3740 return 0;
3741 }//if
3742 }
3743 return insertPtr;
3744 }
3745
3746 void
updateWritePtr(TransporterSendBufferHandle * handle,Transporter * t,Uint32 trp_id,Uint32 lenBytes,Uint32 prio)3747 TransporterRegistry::updateWritePtr(TransporterSendBufferHandle *handle,
3748 Transporter* t,
3749 Uint32 trp_id,
3750 Uint32 lenBytes,
3751 Uint32 prio)
3752 {
3753 NodeId nodeId = t->getRemoteNodeId();
3754 Uint32 used = handle->updateWritePtr(nodeId, trp_id, lenBytes, prio);
3755 t->update_status_overloaded(used);
3756
3757 if (t->send_limit_reached(used))
3758 {
3759 //-------------------------------------------------
3760 // Buffer is full and we are ready to send. We will
3761 // not wait since the signal is already in the buffer.
3762 // Force flag set has the same indication that we
3763 // should always send. If it is not possible to send
3764 // we will not worry since we will soon be back for
3765 // a renewed trial.
3766 //-------------------------------------------------
3767 if (t->send_is_possible(0))
3768 {
3769 //-------------------------------------------------
3770 // Send was possible, attempt at a send.
3771 //-------------------------------------------------
3772 handle->forceSend(nodeId, trp_id);
3773 }//if
3774 }
3775 }
3776
3777 void
inc_overload_count(Uint32 nodeId)3778 TransporterRegistry::inc_overload_count(Uint32 nodeId)
3779 {
3780 assert(nodeId < MAX_NODES);
3781 assert(theNodeIdTransporters[nodeId] != NULL);
3782 theNodeIdTransporters[nodeId]->inc_overload_count();
3783 }
3784
3785 void
inc_slowdown_count(Uint32 nodeId)3786 TransporterRegistry::inc_slowdown_count(Uint32 nodeId)
3787 {
3788 assert(nodeId < MAX_NODES);
3789 assert(theNodeIdTransporters[nodeId] != NULL);
3790 theNodeIdTransporters[nodeId]->inc_slowdown_count();
3791 }
3792
3793 Uint32
get_overload_count(Uint32 nodeId)3794 TransporterRegistry::get_overload_count(Uint32 nodeId)
3795 {
3796 assert(nodeId < MAX_NODES);
3797 assert(theNodeIdTransporters[nodeId] != NULL);
3798 return theNodeIdTransporters[nodeId]->get_overload_count();
3799 }
3800
3801 Uint32
get_slowdown_count(Uint32 nodeId)3802 TransporterRegistry::get_slowdown_count(Uint32 nodeId)
3803 {
3804 assert(nodeId < MAX_NODES);
3805 assert(theNodeIdTransporters[nodeId] != NULL);
3806 return theNodeIdTransporters[nodeId]->get_slowdown_count();
3807 }
3808
3809 Uint32
get_connect_count(Uint32 nodeId)3810 TransporterRegistry::get_connect_count(Uint32 nodeId)
3811 {
3812 assert(nodeId < MAX_NODES);
3813 assert(theNodeIdTransporters[nodeId] != NULL);
3814 return theNodeIdTransporters[nodeId]->get_connect_count();
3815 }
3816
3817 void
get_trps_for_node(Uint32 nodeId,TrpId * trp_ids,Uint32 & num_ids,Uint32 max_size)3818 TransporterRegistry::get_trps_for_node(Uint32 nodeId,
3819 TrpId *trp_ids,
3820 Uint32 &num_ids,
3821 Uint32 max_size)
3822 {
3823 Transporter *t = theNodeIdTransporters[nodeId];
3824 if (!t)
3825 {
3826 num_ids = 0;
3827 }
3828 else if (t->isMultiTransporter())
3829 {
3830 Multi_Transporter *multi_trp = (Multi_Transporter*)t;
3831 num_ids = multi_trp->get_num_active_transporters();
3832 num_ids = MIN(num_ids, max_size);
3833 for (Uint32 i = 0; i < num_ids; i++)
3834 {
3835 Transporter* tmp_trp = multi_trp->get_active_transporter(i);
3836 trp_ids[i] = tmp_trp->getTransporterIndex();
3837 require(trp_ids[i] != 0);
3838 }
3839 }
3840 else
3841 {
3842 num_ids = 1;
3843 trp_ids[0] = t->getTransporterIndex();
3844 require(trp_ids[0] != 0);
3845 }
3846 require(max_size >= 1);
3847 }
3848
3849 TrpId
getTransporterIndex(Transporter * t)3850 TransporterRegistry::getTransporterIndex(Transporter* t)
3851 {
3852 return t->getTransporterIndex();
3853 }
3854
3855 bool
isMultiTransporter(Transporter * t)3856 TransporterRegistry::isMultiTransporter(Transporter* t)
3857 {
3858 return t->isMultiTransporter();
3859 }
3860
3861 void
switch_active_trp(Multi_Transporter * t)3862 TransporterRegistry::switch_active_trp(Multi_Transporter *t)
3863 {
3864 require(t->isMultiTransporter());
3865 t->switch_active_trp();
3866 }
3867
3868 Uint32
get_num_active_transporters(Multi_Transporter * t)3869 TransporterRegistry::get_num_active_transporters(Multi_Transporter *t)
3870 {
3871 require(t->isMultiTransporter());
3872 {
3873 return t->get_num_active_transporters();
3874 }
3875 }
3876
3877 /**
3878 * We calculate the risk level for a send buffer.
3879 * The primary instrument is the current size of
3880 * the node send buffer. However if the total
3881 * buffer for all send buffers is also close to
3882 * empty, then we will adjust the node send
3883 * buffer size for this. In this manner a very
3884 * contested total buffer will also slow down
3885 * the entire node operation.
3886 */
3887 void
calculate_send_buffer_level(Uint64 node_send_buffer_size,Uint64 total_send_buffer_size,Uint64 total_used_send_buffer_size,Uint32 num_threads,SB_LevelType & level)3888 calculate_send_buffer_level(Uint64 node_send_buffer_size,
3889 Uint64 total_send_buffer_size,
3890 Uint64 total_used_send_buffer_size,
3891 Uint32 num_threads,
3892 SB_LevelType &level)
3893 {
3894 Uint64 percentage =
3895 (total_used_send_buffer_size * 100) / total_send_buffer_size;
3896
3897 if (percentage < 90)
3898 {
3899 ;
3900 }
3901 else if (percentage < 95)
3902 {
3903 node_send_buffer_size *= 2;
3904 }
3905 else if (percentage < 97)
3906 {
3907 node_send_buffer_size *= 4;
3908 }
3909 else if (percentage < 98)
3910 {
3911 node_send_buffer_size *= 8;
3912 }
3913 else if (percentage < 99)
3914 {
3915 node_send_buffer_size *= 16;
3916 }
3917 else
3918 {
3919 level = SB_CRITICAL_LEVEL;
3920 return;
3921 }
3922
3923 if (node_send_buffer_size < 128 * 1024)
3924 {
3925 level = SB_NO_RISK_LEVEL;
3926 return;
3927 }
3928 else if (node_send_buffer_size < 256 * 1024)
3929 {
3930 level = SB_LOW_LEVEL;
3931 return;
3932 }
3933 else if (node_send_buffer_size < 384 * 1024)
3934 {
3935 level = SB_MEDIUM_LEVEL;
3936 return;
3937 }
3938 else if (node_send_buffer_size < 1024 * 1024)
3939 {
3940 level = SB_HIGH_LEVEL;
3941 return;
3942 }
3943 else if (node_send_buffer_size < 2 * 1024 * 1024)
3944 {
3945 level = SB_RISK_LEVEL;
3946 return;
3947 }
3948 else
3949 {
3950 level = SB_CRITICAL_LEVEL;
3951 return;
3952 }
3953 }
3954
3955 template class Vector<TransporterRegistry::Transporter_interface>;
3956