1 /*
2 Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <ndb_global.h>
26
27 #include "ndb_cluster_connection_impl.hpp"
28 #include <mgmapi_configuration.hpp>
29 #include <mgmapi_config_parameters.h>
30 #include "TransporterFacade.hpp"
31 #include <NdbOut.hpp>
32 #include <NdbSleep.h>
33 #include <NdbThread.h>
34 #include <ndb_limits.h>
35 #include <ConfigRetriever.hpp>
36 #include <ndb_version.h>
37 #include <mgmapi_debug.h>
38 #include <mgmapi_internal.h>
39 #include "NdbImpl.hpp"
40 #include "NdbDictionaryImpl.hpp"
41
42 #include <NdbMutex.h>
43 #ifdef VM_TRACE
44 NdbMutex *ndb_print_state_mutex= NULL;
45 #endif
46
47 #include <EventLogger.hpp>
48 extern EventLogger *g_eventLogger;
49
50 static int g_ndb_connection_count = 0;
51
52 /*
53 * Ndb_cluster_connection
54 */
Ndb_cluster_connection(const char * connect_string)55 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
56 : m_impl(* new Ndb_cluster_connection_impl(connect_string, 0, 0))
57 {
58 }
59
Ndb_cluster_connection(const char * connect_string,int force_api_nodeid)60 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string,
61 int force_api_nodeid)
62 : m_impl(* new Ndb_cluster_connection_impl(connect_string, 0,
63 force_api_nodeid))
64 {
65 }
66
Ndb_cluster_connection(const char * connect_string,Ndb_cluster_connection * main_connection)67 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string,
68 Ndb_cluster_connection *
69 main_connection)
70 : m_impl(* new Ndb_cluster_connection_impl(connect_string,
71 main_connection, 0))
72 {
73 }
74
Ndb_cluster_connection(Ndb_cluster_connection_impl & impl)75 Ndb_cluster_connection::Ndb_cluster_connection
76 (Ndb_cluster_connection_impl& impl) : m_impl(impl)
77 {
78 }
79
~Ndb_cluster_connection()80 Ndb_cluster_connection::~Ndb_cluster_connection()
81 {
82 Ndb_cluster_connection_impl *tmp = &m_impl;
83 if (this != tmp)
84 delete tmp;
85 }
86
get_connected_port() const87 int Ndb_cluster_connection::get_connected_port() const
88 {
89 if (m_impl.m_config_retriever)
90 return m_impl.m_config_retriever->get_mgmd_port();
91 return -1;
92 }
93
get_connected_host() const94 const char *Ndb_cluster_connection::get_connected_host() const
95 {
96 if (m_impl.m_config_retriever)
97 return m_impl.m_config_retriever->get_mgmd_host();
98 return 0;
99 }
100
get_connectstring(char * buf,int buf_sz) const101 const char *Ndb_cluster_connection::get_connectstring(char *buf,
102 int buf_sz) const
103 {
104 if (m_impl.m_config_retriever)
105 return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
106 return 0;
107 }
108
109 extern "C"
110 void *
run_ndb_cluster_connection_connect_thread(void * me)111 run_ndb_cluster_connection_connect_thread(void *me)
112 {
113 Ndb_cluster_connection_impl* connection= (Ndb_cluster_connection_impl*) me;
114 connection->m_run_connect_thread= 1;
115 connection->connect_thread();
116 return me;
117 }
118
start_connect_thread(int (* connect_callback)(void))119 int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
120 {
121 int r;
122 DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
123 m_impl.m_connect_callback= connect_callback;
124 if ((r = connect(0,0,0)) == 1)
125 {
126 DBUG_PRINT("info",("starting thread"));
127 m_impl.m_connect_thread=
128 NdbThread_Create(run_ndb_cluster_connection_connect_thread,
129 (void**)&m_impl,
130 0, // default stack size
131 "ndb_cluster_connection",
132 NDB_THREAD_PRIO_LOW);
133 }
134 else if (r < 0)
135 {
136 DBUG_RETURN(-1);
137 }
138 else if (m_impl.m_connect_callback)
139 {
140 (*m_impl.m_connect_callback)();
141 }
142 DBUG_RETURN(0);
143 }
144
set_optimized_node_selection(int val)145 void Ndb_cluster_connection::set_optimized_node_selection(int val)
146 {
147 m_impl.m_optimized_node_selection= val;
148 }
149
150 void
init_get_next_node(Ndb_cluster_connection_node_iter & iter)151 Ndb_cluster_connection_impl::init_get_next_node
152 (Ndb_cluster_connection_node_iter &iter)
153 {
154 if (iter.scan_state != (Uint8)~0)
155 iter.cur_pos= iter.scan_state;
156 if (iter.cur_pos >= no_db_nodes())
157 iter.cur_pos= 0;
158 iter.init_pos= iter.cur_pos;
159 iter.scan_state= 0;
160 // fprintf(stderr,"[init %d]",iter.init_pos);
161 return;
162 }
163
164 Uint32
get_next_node(Ndb_cluster_connection_node_iter & iter)165 Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
166 {
167 Uint32 cur_pos= iter.cur_pos;
168 if (cur_pos >= no_db_nodes())
169 return 0;
170
171 Ndb_cluster_connection_impl::Node *nodes= m_all_nodes.getBase();
172 Ndb_cluster_connection_impl::Node &node= nodes[cur_pos];
173
174 if (iter.scan_state != (Uint8)~0)
175 {
176 assert(iter.scan_state < no_db_nodes());
177 if (nodes[iter.scan_state].group == node.group)
178 iter.scan_state= ~0;
179 else
180 return nodes[iter.scan_state++].id;
181 }
182
183 // fprintf(stderr,"[%d]",node.id);
184
185 cur_pos++;
186 Uint32 init_pos= iter.init_pos;
187 if (cur_pos == node.next_group)
188 {
189 cur_pos= nodes[init_pos].this_group;
190 }
191
192 // fprintf(stderr,"[cur_pos %d]",cur_pos);
193 if (cur_pos != init_pos)
194 iter.cur_pos= cur_pos;
195 else
196 {
197 iter.cur_pos= node.next_group;
198 iter.init_pos= node.next_group;
199 }
200 return node.id;
201 }
202
203 Uint32
get_next_alive_node(Ndb_cluster_connection_node_iter & iter)204 Ndb_cluster_connection_impl::get_next_alive_node(Ndb_cluster_connection_node_iter &iter)
205 {
206 Uint32 id;
207
208 TransporterFacade *tp = m_impl.m_transporter_facade;
209 if (tp == 0 || tp->ownId() == 0)
210 return 0;
211
212 while ((id = get_next_node(iter)))
213 {
214 tp->lock_mutex();
215 if (tp->get_node_alive(id) != 0)
216 {
217 tp->unlock_mutex();
218 return id;
219 }
220 tp->unlock_mutex();
221 }
222 return 0;
223 }
224
225 unsigned
no_db_nodes()226 Ndb_cluster_connection::no_db_nodes()
227 {
228 return m_impl.m_all_nodes.size();
229 }
230
231 unsigned
node_id()232 Ndb_cluster_connection::node_id()
233 {
234 return m_impl.m_transporter_facade->ownId();
235 }
236
237 unsigned
max_nodegroup()238 Ndb_cluster_connection::max_nodegroup()
239 {
240 TransporterFacade *tp = m_impl.m_transporter_facade;
241 if (tp == 0 || tp->ownId() == 0)
242 return 0;
243
244 Bitmask<MAX_NDB_NODES> ng;
245 tp->lock_mutex();
246 for(unsigned i= 0; i < no_db_nodes(); i++)
247 {
248 //************************************************
249 // If any node is answering, ndb is answering
250 //************************************************
251 trp_node n = tp->theClusterMgr->getNodeInfo(m_impl.m_all_nodes[i].id);
252 if (n.is_confirmed() && n.m_state.nodeGroup <= MAX_NDB_NODES)
253 ng.set(n.m_state.nodeGroup);
254 }
255 tp->unlock_mutex();
256
257 if (ng.isclear())
258 return 0;
259
260 Uint32 n = ng.find_first();
261 Uint32 m;
262 do
263 {
264 m = n;
265 } while ((n = ng.find(n+1)) != ng.NotFound);
266
267 return m;
268 }
269
get_no_ready()270 int Ndb_cluster_connection::get_no_ready()
271 {
272 TransporterFacade *tp = m_impl.m_transporter_facade;
273 if (tp == 0 || tp->ownId() == 0)
274 return -1;
275
276 unsigned int foundAliveNode = 0;
277 tp->lock_mutex();
278 for(unsigned i= 0; i < no_db_nodes(); i++)
279 {
280 //************************************************
281 // If any node is answering, ndb is answering
282 //************************************************
283 if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
284 foundAliveNode++;
285 }
286 }
287 tp->unlock_mutex();
288
289 return foundAliveNode;
290 }
291
292 int
wait_until_ready(int timeout,int timeout_after_first_alive)293 Ndb_cluster_connection::wait_until_ready(int timeout,
294 int timeout_after_first_alive)
295 {
296 DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
297 TransporterFacade *tp = m_impl.m_transporter_facade;
298 if (tp == 0)
299 {
300 DBUG_RETURN(-1);
301 }
302 if (tp->ownId() == 0)
303 {
304 DBUG_RETURN(-1);
305 }
306 int secondsCounter = 0;
307 int milliCounter = 0;
308 int noChecksSinceFirstAliveFound = 0;
309 do {
310 unsigned int foundAliveNode = get_no_ready();
311
312 if (foundAliveNode == no_db_nodes())
313 {
314 DBUG_RETURN(0);
315 }
316 else if (foundAliveNode > 0)
317 {
318 noChecksSinceFirstAliveFound++;
319 // 100 ms delay -> 10*
320 if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive)
321 DBUG_RETURN(1);
322 }
323 else if (secondsCounter >= timeout)
324 { // no alive nodes and timed out
325 DBUG_RETURN(-1);
326 }
327 NdbSleep_MilliSleep(100);
328 milliCounter += 100;
329 if (milliCounter >= 1000) {
330 secondsCounter++;
331 milliCounter = 0;
332 }//if
333 } while (1);
334 }
335
get_connect_count() const336 unsigned Ndb_cluster_connection::get_connect_count() const
337 {
338 return m_impl.get_connect_count();
339 }
340
get_min_db_version() const341 unsigned Ndb_cluster_connection::get_min_db_version() const
342 {
343 return m_impl.get_min_db_version();
344 }
345
get_latest_error() const346 int Ndb_cluster_connection::get_latest_error() const
347 {
348 return m_impl.m_latest_error;
349 }
350
get_latest_error_msg() const351 const char *Ndb_cluster_connection::get_latest_error_msg() const
352 {
353 return m_impl.m_latest_error_msg.c_str();
354 }
355
356 /*
357 * Ndb_cluster_connection_impl
358 */
359
360 Ndb_cluster_connection_impl::
Ndb_cluster_connection_impl(const char * connect_string,Ndb_cluster_connection * main_connection,int force_api_nodeid)361 Ndb_cluster_connection_impl(const char * connect_string,
362 Ndb_cluster_connection *main_connection,
363 int force_api_nodeid)
364 : Ndb_cluster_connection(*this),
365 m_main_connection(main_connection),
366 m_optimized_node_selection(1),
367 m_run_connect_thread(0),
368 m_latest_trans_gci(0),
369 m_first_ndb_object(0),
370 m_latest_error_msg(),
371 m_latest_error(0),
372 m_max_trans_id(0)
373 {
374 DBUG_ENTER("Ndb_cluster_connection");
375 DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%lx", (long) this));
376
377 NdbMutex_Lock(g_ndb_connection_mutex);
378 if(g_ndb_connection_count++ == 0)
379 {
380 NdbColumnImpl::create_pseudo_columns();
381 g_eventLogger->createConsoleHandler();
382 g_eventLogger->setCategory("NdbApi");
383 g_eventLogger->enable(Logger::LL_ON, Logger::LL_ERROR);
384 /*
385 Disable repeated message handling as it interfers
386 with mysqld logging, in which case messages come out
387 of order. Same applies for regular ndbapi user.
388 */
389 g_eventLogger->setRepeatFrequency(0);
390 }
391 NdbMutex_Unlock(g_ndb_connection_mutex);
392
393 m_event_add_drop_mutex= NdbMutex_Create();
394 m_new_delete_ndb_mutex = NdbMutex_Create();
395
396 m_connect_thread= 0;
397 m_connect_callback= 0;
398
399 /* Clear global stats baseline */
400 memset(globalApiStatsBaseline, 0, sizeof(globalApiStatsBaseline));
401
402 #ifdef VM_TRACE
403 if (ndb_print_state_mutex == NULL)
404 ndb_print_state_mutex= NdbMutex_Create();
405 #endif
406 m_config_retriever=
407 new ConfigRetriever(connect_string, force_api_nodeid,
408 NDB_VERSION, NDB_MGM_NODE_TYPE_API);
409 if (m_config_retriever->hasError())
410 {
411 m_latest_error= 1;
412 m_latest_error_msg.assfmt
413 ("Could not initialize handle to management server: %s",
414 m_config_retriever->getErrorString());
415 printf("%s\n", get_latest_error_msg());
416 }
417 if (!m_main_connection)
418 {
419 m_globalDictCache = new GlobalDictCache;
420 m_transporter_facade= new TransporterFacade(m_globalDictCache);
421 }
422 else
423 {
424 assert(m_main_connection->m_impl.m_globalDictCache != NULL);
425 m_globalDictCache = 0;
426 m_transporter_facade=
427 new TransporterFacade(m_main_connection->m_impl.m_globalDictCache);
428
429 // The secondary connection can't use same nodeid, clear the nodeid
430 // in ConfigRetriver to avoid asking for the same nodeid again
431 m_config_retriever->setNodeId(0);
432
433 }
434
435 DBUG_VOID_RETURN;
436 }
437
~Ndb_cluster_connection_impl()438 Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
439 {
440 DBUG_ENTER("~Ndb_cluster_connection");
441
442 if (m_first_ndb_object != 0)
443 {
444 g_eventLogger->warning("Deleting Ndb_cluster_connection with Ndb-object"
445 " not deleted");
446 Ndb * p = m_first_ndb_object;
447 printf("this: %p Ndb-object(s): ", (Ndb_cluster_connection*)this);
448 while (p)
449 {
450 printf("%p ", p);
451 p = p->theImpl->m_next_ndb_object;
452 }
453 printf("\n");
454 fflush(stdout);
455 }
456
457 if (m_transporter_facade != 0)
458 {
459 m_transporter_facade->stop_instance();
460 }
461 if (m_globalDictCache)
462 {
463 delete m_globalDictCache;
464 }
465 if (m_connect_thread)
466 {
467 void *status;
468 m_run_connect_thread= 0;
469 NdbThread_WaitFor(m_connect_thread, &status);
470 NdbThread_Destroy(&m_connect_thread);
471 m_connect_thread= 0;
472 }
473 if (m_transporter_facade != 0)
474 {
475 delete m_transporter_facade;
476 m_transporter_facade = 0;
477 }
478 if (m_config_retriever)
479 {
480 delete m_config_retriever;
481 m_config_retriever= NULL;
482 }
483 #ifdef VM_TRACE
484 if (ndb_print_state_mutex != NULL)
485 {
486 NdbMutex_Destroy(ndb_print_state_mutex);
487 ndb_print_state_mutex= NULL;
488 }
489 #endif
490
491 NdbMutex_Lock(g_ndb_connection_mutex);
492 if(--g_ndb_connection_count == 0)
493 {
494 NdbColumnImpl::destory_pseudo_columns();
495 }
496 NdbMutex_Unlock(g_ndb_connection_mutex);
497
498 if (m_event_add_drop_mutex)
499 NdbMutex_Destroy(m_event_add_drop_mutex);
500 m_event_add_drop_mutex = 0;
501
502 if (m_new_delete_ndb_mutex)
503 NdbMutex_Destroy(m_new_delete_ndb_mutex);
504 m_new_delete_ndb_mutex = 0;
505
506 DBUG_VOID_RETURN;
507 }
508
509 void
lock_ndb_objects()510 Ndb_cluster_connection::lock_ndb_objects()
511 {
512 NdbMutex_Lock(m_impl.m_new_delete_ndb_mutex);
513 }
514
515 void
unlock_ndb_objects()516 Ndb_cluster_connection::unlock_ndb_objects()
517 {
518 NdbMutex_Unlock(m_impl.m_new_delete_ndb_mutex);
519 }
520
521 const Ndb*
get_next_ndb_object(const Ndb * p)522 Ndb_cluster_connection::get_next_ndb_object(const Ndb* p)
523 {
524 if (p == 0)
525 return m_impl.m_first_ndb_object;
526
527 return p->theImpl->m_next_ndb_object;
528 }
529
530 void
link_ndb_object(Ndb * p)531 Ndb_cluster_connection_impl::link_ndb_object(Ndb* p)
532 {
533 lock_ndb_objects();
534 if (m_first_ndb_object != 0)
535 {
536 m_first_ndb_object->theImpl->m_prev_ndb_object = p;
537 }
538
539 p->theImpl->m_next_ndb_object = m_first_ndb_object;
540 m_first_ndb_object = p;
541
542 p->theFirstTransId += m_max_trans_id;
543 unlock_ndb_objects();
544 }
545
546 void
unlink_ndb_object(Ndb * p)547 Ndb_cluster_connection_impl::unlink_ndb_object(Ndb* p)
548 {
549 lock_ndb_objects();
550 Ndb* prev = p->theImpl->m_prev_ndb_object;
551 Ndb* next = p->theImpl->m_next_ndb_object;
552
553 if (prev == 0)
554 {
555 assert(m_first_ndb_object == p);
556 m_first_ndb_object = next;
557 }
558 else
559 {
560 prev->theImpl->m_next_ndb_object = next;
561 }
562
563 if (next)
564 {
565 next->theImpl->m_prev_ndb_object = prev;
566 }
567
568 p->theImpl->m_prev_ndb_object = 0;
569 p->theImpl->m_next_ndb_object = 0;
570
571 Uint32 transId = (Uint32)p->theFirstTransId;
572 if (transId > m_max_trans_id)
573 {
574 m_max_trans_id = transId;
575 }
576
577 /* This Ndb is leaving for a better place,
578 * record its contribution to global warming
579 * for posterity
580 */
581 for (Uint32 i=0; i<Ndb::NumClientStatistics; i++)
582 {
583 globalApiStatsBaseline[i] += p->theImpl->clientStats[i];
584 }
585
586 unlock_ndb_objects();
587 }
588
589 void
set_name(const char * name)590 Ndb_cluster_connection_impl::set_name(const char *name)
591 {
592 NdbMgmHandle h= m_config_retriever->get_mgmHandle();
593 ndb_mgm_set_name(h, name);
594 }
595
596 int
init_nodes_vector(Uint32 nodeid,const ndb_mgm_configuration & config)597 Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
598 const ndb_mgm_configuration
599 &config)
600 {
601 DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
602 ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
603
604 for(iter.first(); iter.valid(); iter.next())
605 {
606 Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
607 const char * remoteHostName= 0, * localHostName= 0;
608 if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
609 if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
610
611 if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
612 remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
613
614 iter.get(CFG_CONNECTION_GROUP, &group);
615
616 {
617 const char * host1= 0, * host2= 0;
618 iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
619 iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
620 localHostName = (nodeid == nodeid1 ? host1 : host2);
621 remoteHostName = (nodeid == nodeid1 ? host2 : host1);
622 }
623
624 Uint32 type = ~0;
625 if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
626
627 switch(type){
628 case CONNECTION_TYPE_SHM:{
629 break;
630 }
631 case CONNECTION_TYPE_SCI:{
632 break;
633 }
634 case CONNECTION_TYPE_TCP:{
635 // connecting through localhost
636 // check if config_hostname is local
637 if (SocketServer::tryBind(0,remoteHostName))
638 group--; // upgrade group value
639 break;
640 }
641 }
642 if (m_all_nodes.push_back(Node(group,remoteNodeId)))
643 {
644 DBUG_RETURN(-1);
645 }
646 DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
647 for (int i= m_all_nodes.size()-2;
648 i >= 0 && m_all_nodes[i].group > m_all_nodes[i+1].group;
649 i--)
650 {
651 Node tmp= m_all_nodes[i];
652 m_all_nodes[i]= m_all_nodes[i+1];
653 m_all_nodes[i+1]= tmp;
654 }
655 }
656
657 int i;
658 Uint32 cur_group, i_group= 0;
659 cur_group= ~0;
660 for (i= (int)m_all_nodes.size()-1; i >= 0; i--)
661 {
662 if (m_all_nodes[i].group != cur_group)
663 {
664 cur_group= m_all_nodes[i].group;
665 i_group= i+1;
666 }
667 m_all_nodes[i].next_group= i_group;
668 }
669 cur_group= ~0;
670 for (i= 0; i < (int)m_all_nodes.size(); i++)
671 {
672 if (m_all_nodes[i].group != cur_group)
673 {
674 cur_group= m_all_nodes[i].group;
675 i_group= i;
676 }
677 m_all_nodes[i].this_group= i_group;
678 }
679 #if 0
680 for (i= 0; i < (int)m_all_nodes.size(); i++)
681 {
682 fprintf(stderr, "[%d] %d %d %d %d\n",
683 i,
684 m_all_nodes[i].id,
685 m_all_nodes[i].group,
686 m_all_nodes[i].this_group,
687 m_all_nodes[i].next_group);
688 }
689
690 do_test();
691 #endif
692 DBUG_RETURN(0);
693 }
694
695 Uint32
get_db_nodes(Uint8 arr[MAX_NDB_NODES]) const696 Ndb_cluster_connection_impl::get_db_nodes(Uint8 arr[MAX_NDB_NODES]) const
697 {
698 Uint32 cnt = (Uint32)m_all_nodes.size();
699 assert(cnt < MAX_NDB_NODES);
700 const Node *nodes = m_all_nodes.getBase();
701 for (Uint32 i = 0; i<cnt; i++)
702 arr[i] = (Uint8)nodes[i].id;
703 return cnt;
704 }
705
706 int
configure(Uint32 nodeId,const ndb_mgm_configuration & config)707 Ndb_cluster_connection_impl::configure(Uint32 nodeId,
708 const ndb_mgm_configuration &config)
709 {
710 DBUG_ENTER("Ndb_cluster_connection_impl::configure");
711 {
712 ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
713 if(iter.find(CFG_NODE_ID, nodeId))
714 DBUG_RETURN(-1);
715
716 // Configure scan settings
717 Uint32 scan_batch_size= 0;
718 if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
719 m_config.m_scan_batch_size= scan_batch_size;
720 }
721 Uint32 batch_byte_size= 0;
722 if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
723 m_config.m_batch_byte_size= batch_byte_size;
724 }
725 Uint32 batch_size= 0;
726 if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
727 m_config.m_batch_size= batch_size;
728 }
729
730 // Configure timeouts
731 Uint32 timeout = 120000;
732 for (iter.first(); iter.valid(); iter.next())
733 {
734 Uint32 tmp1 = 0, tmp2 = 0;
735 iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
736 iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
737 tmp1 += tmp2;
738 if (tmp1 > timeout)
739 timeout = tmp1;
740 }
741 m_config.m_waitfor_timeout = timeout;
742
743 Uint32 queue = 0;
744 if (!iter.get(CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION, &queue))
745 {
746 m_config.m_default_queue_option = queue;
747 }
748 }
749 DBUG_RETURN(init_nodes_vector(nodeId, config));
750 }
751
752 void
do_test()753 Ndb_cluster_connection_impl::do_test()
754 {
755 Ndb_cluster_connection_node_iter iter;
756 int n= no_db_nodes()+5;
757 Uint32 *nodes= new Uint32[n+1];
758
759 for (int g= 0; g < n; g++)
760 {
761 for (int h= 0; h < n; h++)
762 {
763 Uint32 id;
764 Ndb_cluster_connection_node_iter iter2;
765 {
766 for (int j= 0; j < g; j++)
767 {
768 nodes[j]= get_next_node(iter2);
769 }
770 }
771
772 for (int i= 0; i < n; i++)
773 {
774 init_get_next_node(iter);
775 fprintf(stderr, "%d dead:(", g);
776 id= 0;
777 while (id == 0)
778 {
779 if ((id= get_next_node(iter)) == 0)
780 break;
781 for (int j= 0; j < g; j++)
782 {
783 if (nodes[j] == id)
784 {
785 fprintf(stderr, " %d", id);
786 id= 0;
787 break;
788 }
789 }
790 }
791 fprintf(stderr, ")");
792 if (id == 0)
793 {
794 break;
795 }
796 fprintf(stderr, " %d\n", id);
797 }
798 fprintf(stderr, "\n");
799 }
800 }
801 delete [] nodes;
802 }
803
set_name(const char * name)804 void Ndb_cluster_connection::set_name(const char *name)
805 {
806 m_impl.set_name(name);
807 }
808
connect(int no_retries,int retry_delay_in_seconds,int verbose)809 int Ndb_cluster_connection_impl::connect(int no_retries,
810 int retry_delay_in_seconds,
811 int verbose)
812 {
813 DBUG_ENTER("Ndb_cluster_connection::connect");
814 do {
815 if (m_config_retriever == 0)
816 {
817 if (!m_latest_error)
818 {
819 m_latest_error = 1;
820 m_latest_error_msg.assign("Ndb_cluster_connection init "
821 "error: m_config_retriever==0");
822 }
823 DBUG_PRINT("exit", ("no m_config_retriever, ret: -1"));
824 DBUG_RETURN(-1);
825 }
826 if (m_config_retriever->do_connect(no_retries,
827 retry_delay_in_seconds,
828 verbose))
829 {
830 char buf[1024];
831 m_latest_error = 1;
832 m_latest_error_msg.assfmt("Connect using '%s' timed out",
833 get_connectstring(buf, sizeof(buf)));
834 DBUG_PRINT("exit", ("mgmt server not up yet, ret: 1"));
835 DBUG_RETURN(1); // mgmt server not up yet
836 }
837
838 Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,
839 3/*delay*/);
840 if(nodeId == 0)
841 break;
842 ndb_mgm_configuration * props = m_config_retriever->getConfig(nodeId);
843 if(props == 0)
844 break;
845
846 if (configure(nodeId, *props))
847 {
848 ndb_mgm_destroy_configuration(props);
849 DBUG_PRINT("exit", ("malloc failure, ret: -1"));
850 DBUG_RETURN(-1);
851 }
852
853 if (m_transporter_facade->start_instance(nodeId, props) < 0)
854 {
855 ndb_mgm_destroy_configuration(props);
856 DBUG_RETURN(-1);
857 }
858
859 ndb_mgm_destroy_configuration(props);
860 m_transporter_facade->connected();
861 m_latest_error = 0;
862 m_latest_error_msg.assign("");
863 DBUG_PRINT("exit", ("connect ok, ret: 0"));
864 DBUG_RETURN(0);
865 } while(0);
866
867 const char* erString = m_config_retriever->getErrorString();
868 if (erString == 0) {
869 erString = "No error specified!";
870 }
871 m_latest_error = 1;
872 m_latest_error_msg.assfmt("Configuration error: %s", erString);
873 ndbout << get_latest_error_msg() << endl;
874 DBUG_PRINT("exit", ("connect failed, '%s' ret: -1", erString));
875 DBUG_RETURN(-1);
876 }
877
878
879 int
connect(int no_retries,int retry_delay_in_seconds,int verbose)880 Ndb_cluster_connection::connect(int no_retries,
881 int retry_delay_in_seconds,
882 int verbose)
883 {
884 return m_impl.connect(no_retries, retry_delay_in_seconds, verbose);
885 }
886
887
connect_thread()888 void Ndb_cluster_connection_impl::connect_thread()
889 {
890 DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
891 int r;
892 do {
893 NdbSleep_SecSleep(1);
894 if ((r = connect(0,0,0)) == 0)
895 break;
896 if (r == -1) {
897 printf("Ndb_cluster_connection::connect_thread error\n");
898 DBUG_ASSERT(false);
899 m_run_connect_thread= 0;
900 } else {
901 // Wait before making a new connect attempt
902 NdbSleep_SecSleep(1);
903 }
904 } while (m_run_connect_thread);
905 if (m_connect_callback)
906 (*m_connect_callback)();
907 DBUG_VOID_RETURN;
908 }
909
910 Uint64 *
get_latest_trans_gci()911 Ndb_cluster_connection::get_latest_trans_gci()
912 {
913 return m_impl.get_latest_trans_gci();
914 }
915
916 void
init_get_next_node(Ndb_cluster_connection_node_iter & iter)917 Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter)
918 {
919 m_impl.init_get_next_node(iter);
920 }
921
922 Uint32
get_next_node(Ndb_cluster_connection_node_iter & iter)923 Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter)
924 {
925 return m_impl.get_next_node(iter);
926 }
927
928 unsigned int
get_next_alive_node(Ndb_cluster_connection_node_iter & iter)929 Ndb_cluster_connection::get_next_alive_node(Ndb_cluster_connection_node_iter &iter)
930 {
931 return m_impl.get_next_alive_node(iter);
932 }
933
934 unsigned
get_active_ndb_objects() const935 Ndb_cluster_connection::get_active_ndb_objects() const
936 {
937 return m_impl.m_transporter_facade->get_active_ndb_objects();
938 }
939
set_timeout(int timeout_ms)940 int Ndb_cluster_connection::set_timeout(int timeout_ms)
941 {
942 return ndb_mgm_set_timeout(m_impl.m_config_retriever->get_mgmHandle(),
943 timeout_ms);
944 }
945
946 int
get_auto_reconnect() const947 Ndb_cluster_connection::get_auto_reconnect() const
948 {
949 return m_impl.m_transporter_facade->get_auto_reconnect();
950 }
951
952 void
set_auto_reconnect(int value)953 Ndb_cluster_connection::set_auto_reconnect(int value)
954 {
955 m_impl.m_transporter_facade->set_auto_reconnect(value);
956 }
957
958 Uint32
collect_client_stats(Uint64 * statsArr,Uint32 sz)959 Ndb_cluster_connection::collect_client_stats(Uint64* statsArr, Uint32 sz)
960 {
961 /* We have a global stats baseline which contains all
962 * the stats for Ndb objects which have been and gone.
963 * Start with that, then add in stats for Ndb objects
964 * currently in use.
965 * Note that despite the lock, this is not thread safe
966 * as we are reading data that other threads may be
967 * concurrently writing. The lock just guards against
968 * concurrent changes to the set of active Ndbs while
969 * we are iterating it.
970 */
971 const Uint32 relevant = MIN((Uint32)Ndb::NumClientStatistics, sz);
972 const Ndb* ndb = NULL;
973 lock_ndb_objects();
974 {
975 memcpy(statsArr, &m_impl.globalApiStatsBaseline[0], sizeof(Uint64)*relevant);
976
977 while((ndb = get_next_ndb_object(ndb)) != NULL)
978 {
979 for (Uint32 i=0; i<relevant; i++)
980 {
981 statsArr[i] += ndb->theImpl->clientStats[i];
982 }
983 }
984 }
985 unlock_ndb_objects();
986
987 return relevant;
988 }
989
990 template class Vector<Ndb_cluster_connection_impl::Node>;
991
992