1 /*
2    Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include "ndb_cluster_connection_impl.hpp"
28 #include <mgmapi_configuration.hpp>
29 #include <mgmapi_config_parameters.h>
30 #include "TransporterFacade.hpp"
31 #include <NdbOut.hpp>
32 #include <NdbSleep.h>
33 #include <NdbThread.h>
34 #include <ndb_limits.h>
35 #include <ConfigRetriever.hpp>
36 #include <ndb_version.h>
37 #include <mgmapi_debug.h>
38 #include <mgmapi_internal.h>
39 #include "NdbImpl.hpp"
40 #include "NdbDictionaryImpl.hpp"
41 
42 #include <NdbMutex.h>
43 #ifdef VM_TRACE
44 NdbMutex *ndb_print_state_mutex= NULL;
45 #endif
46 
47 #include <EventLogger.hpp>
48 extern EventLogger *g_eventLogger;
49 
50 static int g_ndb_connection_count = 0;
51 
52 /*
53  * Ndb_cluster_connection
54  */
Ndb_cluster_connection(const char * connect_string)55 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
56   : m_impl(* new Ndb_cluster_connection_impl(connect_string, 0, 0))
57 {
58 }
59 
Ndb_cluster_connection(const char * connect_string,int force_api_nodeid)60 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string,
61                                                int force_api_nodeid)
62   : m_impl(* new Ndb_cluster_connection_impl(connect_string, 0,
63                                              force_api_nodeid))
64 {
65 }
66 
Ndb_cluster_connection(const char * connect_string,Ndb_cluster_connection * main_connection)67 Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string,
68                                                Ndb_cluster_connection *
69                                                main_connection)
70   : m_impl(* new Ndb_cluster_connection_impl(connect_string,
71                                              main_connection, 0))
72 {
73 }
74 
Ndb_cluster_connection(Ndb_cluster_connection_impl & impl)75 Ndb_cluster_connection::Ndb_cluster_connection
76 (Ndb_cluster_connection_impl& impl) : m_impl(impl)
77 {
78 }
79 
~Ndb_cluster_connection()80 Ndb_cluster_connection::~Ndb_cluster_connection()
81 {
82   Ndb_cluster_connection_impl *tmp = &m_impl;
83   if (this != tmp)
84     delete tmp;
85 }
86 
get_connected_port() const87 int Ndb_cluster_connection::get_connected_port() const
88 {
89   if (m_impl.m_config_retriever)
90     return m_impl.m_config_retriever->get_mgmd_port();
91   return -1;
92 }
93 
get_connected_host() const94 const char *Ndb_cluster_connection::get_connected_host() const
95 {
96   if (m_impl.m_config_retriever)
97     return m_impl.m_config_retriever->get_mgmd_host();
98   return 0;
99 }
100 
get_connectstring(char * buf,int buf_sz) const101 const char *Ndb_cluster_connection::get_connectstring(char *buf,
102 						      int buf_sz) const
103 {
104   if (m_impl.m_config_retriever)
105     return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
106   return 0;
107 }
108 
109 extern "C"
110 void *
run_ndb_cluster_connection_connect_thread(void * me)111 run_ndb_cluster_connection_connect_thread(void *me)
112 {
113   Ndb_cluster_connection_impl* connection= (Ndb_cluster_connection_impl*) me;
114   connection->m_run_connect_thread= 1;
115   connection->connect_thread();
116   return me;
117 }
118 
start_connect_thread(int (* connect_callback)(void))119 int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
120 {
121   int r;
122   DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
123   m_impl.m_connect_callback= connect_callback;
124   if ((r = connect(0,0,0)) == 1)
125   {
126     DBUG_PRINT("info",("starting thread"));
127     m_impl.m_connect_thread=
128       NdbThread_Create(run_ndb_cluster_connection_connect_thread,
129 		       (void**)&m_impl,
130                        0, // default stack size
131                        "ndb_cluster_connection",
132 		       NDB_THREAD_PRIO_LOW);
133   }
134   else if (r < 0)
135   {
136     DBUG_RETURN(-1);
137   }
138   else if (m_impl.m_connect_callback)
139   {
140     (*m_impl.m_connect_callback)();
141   }
142   DBUG_RETURN(0);
143 }
144 
set_optimized_node_selection(int val)145 void Ndb_cluster_connection::set_optimized_node_selection(int val)
146 {
147   m_impl.m_optimized_node_selection= val;
148 }
149 
150 void
init_get_next_node(Ndb_cluster_connection_node_iter & iter)151 Ndb_cluster_connection_impl::init_get_next_node
152 (Ndb_cluster_connection_node_iter &iter)
153 {
154   if (iter.scan_state != (Uint8)~0)
155     iter.cur_pos= iter.scan_state;
156   if (iter.cur_pos >= no_db_nodes())
157     iter.cur_pos= 0;
158   iter.init_pos= iter.cur_pos;
159   iter.scan_state= 0;
160   //  fprintf(stderr,"[init %d]",iter.init_pos);
161   return;
162 }
163 
164 Uint32
get_next_node(Ndb_cluster_connection_node_iter & iter)165 Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
166 {
167   Uint32 cur_pos= iter.cur_pos;
168   if (cur_pos >= no_db_nodes())
169     return 0;
170 
171   Ndb_cluster_connection_impl::Node *nodes= m_all_nodes.getBase();
172   Ndb_cluster_connection_impl::Node &node=  nodes[cur_pos];
173 
174   if (iter.scan_state != (Uint8)~0)
175   {
176     assert(iter.scan_state < no_db_nodes());
177     if (nodes[iter.scan_state].group == node.group)
178       iter.scan_state= ~0;
179     else
180       return nodes[iter.scan_state++].id;
181   }
182 
183   //  fprintf(stderr,"[%d]",node.id);
184 
185   cur_pos++;
186   Uint32 init_pos= iter.init_pos;
187   if (cur_pos == node.next_group)
188   {
189     cur_pos= nodes[init_pos].this_group;
190   }
191 
192   //  fprintf(stderr,"[cur_pos %d]",cur_pos);
193   if (cur_pos != init_pos)
194     iter.cur_pos= cur_pos;
195   else
196   {
197     iter.cur_pos= node.next_group;
198     iter.init_pos= node.next_group;
199   }
200   return node.id;
201 }
202 
203 Uint32
get_next_alive_node(Ndb_cluster_connection_node_iter & iter)204 Ndb_cluster_connection_impl::get_next_alive_node(Ndb_cluster_connection_node_iter &iter)
205 {
206   Uint32 id;
207 
208   TransporterFacade *tp = m_impl.m_transporter_facade;
209   if (tp == 0 || tp->ownId() == 0)
210     return 0;
211 
212   while ((id = get_next_node(iter)))
213   {
214     tp->lock_mutex();
215     if (tp->get_node_alive(id) != 0)
216     {
217       tp->unlock_mutex();
218       return id;
219     }
220     tp->unlock_mutex();
221   }
222   return 0;
223 }
224 
225 unsigned
no_db_nodes()226 Ndb_cluster_connection::no_db_nodes()
227 {
228   return m_impl.m_all_nodes.size();
229 }
230 
231 unsigned
node_id()232 Ndb_cluster_connection::node_id()
233 {
234   return m_impl.m_transporter_facade->ownId();
235 }
236 
237 unsigned
max_nodegroup()238 Ndb_cluster_connection::max_nodegroup()
239 {
240   TransporterFacade *tp = m_impl.m_transporter_facade;
241   if (tp == 0 || tp->ownId() == 0)
242     return 0;
243 
244   Bitmask<MAX_NDB_NODES> ng;
245   tp->lock_mutex();
246   for(unsigned i= 0; i < no_db_nodes(); i++)
247   {
248     //************************************************
249     // If any node is answering, ndb is answering
250     //************************************************
251     trp_node n = tp->theClusterMgr->getNodeInfo(m_impl.m_all_nodes[i].id);
252     if (n.is_confirmed() && n.m_state.nodeGroup <= MAX_NDB_NODES)
253       ng.set(n.m_state.nodeGroup);
254   }
255   tp->unlock_mutex();
256 
257   if (ng.isclear())
258     return 0;
259 
260   Uint32 n = ng.find_first();
261   Uint32 m;
262   do
263   {
264     m = n;
265   } while ((n = ng.find(n+1)) != ng.NotFound);
266 
267   return m;
268 }
269 
get_no_ready()270 int Ndb_cluster_connection::get_no_ready()
271 {
272   TransporterFacade *tp = m_impl.m_transporter_facade;
273   if (tp == 0 || tp->ownId() == 0)
274     return -1;
275 
276   unsigned int foundAliveNode = 0;
277   tp->lock_mutex();
278   for(unsigned i= 0; i < no_db_nodes(); i++)
279   {
280     //************************************************
281     // If any node is answering, ndb is answering
282     //************************************************
283     if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
284       foundAliveNode++;
285     }
286   }
287   tp->unlock_mutex();
288 
289   return foundAliveNode;
290 }
291 
292 int
wait_until_ready(int timeout,int timeout_after_first_alive)293 Ndb_cluster_connection::wait_until_ready(int timeout,
294 					 int timeout_after_first_alive)
295 {
296   DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
297   TransporterFacade *tp = m_impl.m_transporter_facade;
298   if (tp == 0)
299   {
300     DBUG_RETURN(-1);
301   }
302   if (tp->ownId() == 0)
303   {
304     DBUG_RETURN(-1);
305   }
306   int secondsCounter = 0;
307   int milliCounter = 0;
308   int noChecksSinceFirstAliveFound = 0;
309   do {
310     unsigned int foundAliveNode = get_no_ready();
311 
312     if (foundAliveNode == no_db_nodes())
313     {
314       DBUG_RETURN(0);
315     }
316     else if (foundAliveNode > 0)
317     {
318       noChecksSinceFirstAliveFound++;
319       // 100 ms delay -> 10*
320       if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive)
321 	DBUG_RETURN(1);
322     }
323     else if (secondsCounter >= timeout)
324     { // no alive nodes and timed out
325       DBUG_RETURN(-1);
326     }
327     NdbSleep_MilliSleep(100);
328     milliCounter += 100;
329     if (milliCounter >= 1000) {
330       secondsCounter++;
331       milliCounter = 0;
332     }//if
333   } while (1);
334 }
335 
get_connect_count() const336 unsigned Ndb_cluster_connection::get_connect_count() const
337 {
338   return m_impl.get_connect_count();
339 }
340 
get_min_db_version() const341 unsigned Ndb_cluster_connection::get_min_db_version() const
342 {
343   return m_impl.get_min_db_version();
344 }
345 
get_latest_error() const346 int Ndb_cluster_connection::get_latest_error() const
347 {
348   return m_impl.m_latest_error;
349 }
350 
get_latest_error_msg() const351 const char *Ndb_cluster_connection::get_latest_error_msg() const
352 {
353   return m_impl.m_latest_error_msg.c_str();
354 }
355 
356 /*
357  * Ndb_cluster_connection_impl
358  */
359 
360 Ndb_cluster_connection_impl::
Ndb_cluster_connection_impl(const char * connect_string,Ndb_cluster_connection * main_connection,int force_api_nodeid)361 Ndb_cluster_connection_impl(const char * connect_string,
362                             Ndb_cluster_connection *main_connection,
363                             int force_api_nodeid)
364   : Ndb_cluster_connection(*this),
365     m_main_connection(main_connection),
366     m_optimized_node_selection(1),
367     m_run_connect_thread(0),
368     m_latest_trans_gci(0),
369     m_first_ndb_object(0),
370     m_latest_error_msg(),
371     m_latest_error(0),
372     m_max_trans_id(0)
373 {
374   DBUG_ENTER("Ndb_cluster_connection");
375   DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%lx", (long) this));
376 
377   NdbMutex_Lock(g_ndb_connection_mutex);
378   if(g_ndb_connection_count++ == 0)
379   {
380     NdbColumnImpl::create_pseudo_columns();
381     g_eventLogger->createConsoleHandler();
382     g_eventLogger->setCategory("NdbApi");
383     g_eventLogger->enable(Logger::LL_ON, Logger::LL_ERROR);
384     /*
385       Disable repeated message handling as it interfers
386       with mysqld logging, in which case messages come out
387       of order.  Same applies for regular ndbapi user.
388     */
389     g_eventLogger->setRepeatFrequency(0);
390   }
391   NdbMutex_Unlock(g_ndb_connection_mutex);
392 
393   m_event_add_drop_mutex= NdbMutex_Create();
394   m_new_delete_ndb_mutex = NdbMutex_Create();
395 
396   m_connect_thread= 0;
397   m_connect_callback= 0;
398 
399   /* Clear global stats baseline */
400   memset(globalApiStatsBaseline, 0, sizeof(globalApiStatsBaseline));
401 
402 #ifdef VM_TRACE
403   if (ndb_print_state_mutex == NULL)
404     ndb_print_state_mutex= NdbMutex_Create();
405 #endif
406   m_config_retriever=
407     new ConfigRetriever(connect_string, force_api_nodeid,
408                         NDB_VERSION, NDB_MGM_NODE_TYPE_API);
409   if (m_config_retriever->hasError())
410   {
411     m_latest_error= 1;
412     m_latest_error_msg.assfmt
413       ("Could not initialize handle to management server: %s",
414        m_config_retriever->getErrorString());
415     printf("%s\n", get_latest_error_msg());
416   }
417   if (!m_main_connection)
418   {
419     m_globalDictCache = new GlobalDictCache;
420     m_transporter_facade= new TransporterFacade(m_globalDictCache);
421   }
422   else
423   {
424     assert(m_main_connection->m_impl.m_globalDictCache != NULL);
425     m_globalDictCache = 0;
426     m_transporter_facade=
427       new TransporterFacade(m_main_connection->m_impl.m_globalDictCache);
428 
429     // The secondary connection can't use same nodeid, clear the nodeid
430     // in ConfigRetriver to avoid asking for the same nodeid again
431     m_config_retriever->setNodeId(0);
432 
433   }
434 
435   DBUG_VOID_RETURN;
436 }
437 
~Ndb_cluster_connection_impl()438 Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
439 {
440   DBUG_ENTER("~Ndb_cluster_connection");
441 
442   if (m_first_ndb_object != 0)
443   {
444     g_eventLogger->warning("Deleting Ndb_cluster_connection with Ndb-object"
445                            " not deleted");
446     Ndb * p = m_first_ndb_object;
447     printf("this: %p Ndb-object(s): ", (Ndb_cluster_connection*)this);
448     while (p)
449     {
450       printf("%p ", p);
451       p = p->theImpl->m_next_ndb_object;
452     }
453     printf("\n");
454     fflush(stdout);
455   }
456 
457   if (m_transporter_facade != 0)
458   {
459     m_transporter_facade->stop_instance();
460   }
461   if (m_globalDictCache)
462   {
463     delete m_globalDictCache;
464   }
465   if (m_connect_thread)
466   {
467     void *status;
468     m_run_connect_thread= 0;
469     NdbThread_WaitFor(m_connect_thread, &status);
470     NdbThread_Destroy(&m_connect_thread);
471     m_connect_thread= 0;
472   }
473   if (m_transporter_facade != 0)
474   {
475     delete m_transporter_facade;
476     m_transporter_facade = 0;
477   }
478   if (m_config_retriever)
479   {
480     delete m_config_retriever;
481     m_config_retriever= NULL;
482   }
483 #ifdef VM_TRACE
484   if (ndb_print_state_mutex != NULL)
485   {
486     NdbMutex_Destroy(ndb_print_state_mutex);
487     ndb_print_state_mutex= NULL;
488   }
489 #endif
490 
491   NdbMutex_Lock(g_ndb_connection_mutex);
492   if(--g_ndb_connection_count == 0)
493   {
494     NdbColumnImpl::destory_pseudo_columns();
495   }
496   NdbMutex_Unlock(g_ndb_connection_mutex);
497 
498   if (m_event_add_drop_mutex)
499     NdbMutex_Destroy(m_event_add_drop_mutex);
500   m_event_add_drop_mutex = 0;
501 
502   if (m_new_delete_ndb_mutex)
503     NdbMutex_Destroy(m_new_delete_ndb_mutex);
504   m_new_delete_ndb_mutex = 0;
505 
506   DBUG_VOID_RETURN;
507 }
508 
509 void
lock_ndb_objects()510 Ndb_cluster_connection::lock_ndb_objects()
511 {
512   NdbMutex_Lock(m_impl.m_new_delete_ndb_mutex);
513 }
514 
515 void
unlock_ndb_objects()516 Ndb_cluster_connection::unlock_ndb_objects()
517 {
518   NdbMutex_Unlock(m_impl.m_new_delete_ndb_mutex);
519 }
520 
521 const Ndb*
get_next_ndb_object(const Ndb * p)522 Ndb_cluster_connection::get_next_ndb_object(const Ndb* p)
523 {
524   if (p == 0)
525     return m_impl.m_first_ndb_object;
526 
527   return p->theImpl->m_next_ndb_object;
528 }
529 
530 void
link_ndb_object(Ndb * p)531 Ndb_cluster_connection_impl::link_ndb_object(Ndb* p)
532 {
533   lock_ndb_objects();
534   if (m_first_ndb_object != 0)
535   {
536     m_first_ndb_object->theImpl->m_prev_ndb_object = p;
537   }
538 
539   p->theImpl->m_next_ndb_object = m_first_ndb_object;
540   m_first_ndb_object = p;
541 
542   p->theFirstTransId += m_max_trans_id;
543   unlock_ndb_objects();
544 }
545 
546 void
unlink_ndb_object(Ndb * p)547 Ndb_cluster_connection_impl::unlink_ndb_object(Ndb* p)
548 {
549   lock_ndb_objects();
550   Ndb* prev = p->theImpl->m_prev_ndb_object;
551   Ndb* next = p->theImpl->m_next_ndb_object;
552 
553   if (prev == 0)
554   {
555     assert(m_first_ndb_object == p);
556     m_first_ndb_object = next;
557   }
558   else
559   {
560     prev->theImpl->m_next_ndb_object = next;
561   }
562 
563   if (next)
564   {
565     next->theImpl->m_prev_ndb_object = prev;
566   }
567 
568   p->theImpl->m_prev_ndb_object = 0;
569   p->theImpl->m_next_ndb_object = 0;
570 
571   Uint32 transId = (Uint32)p->theFirstTransId;
572   if (transId > m_max_trans_id)
573   {
574     m_max_trans_id = transId;
575   }
576 
577   /* This Ndb is leaving for a better place,
578    * record its contribution to global warming
579    * for posterity
580    */
581   for (Uint32 i=0; i<Ndb::NumClientStatistics; i++)
582   {
583     globalApiStatsBaseline[i] += p->theImpl->clientStats[i];
584   }
585 
586   unlock_ndb_objects();
587 }
588 
589 void
set_name(const char * name)590 Ndb_cluster_connection_impl::set_name(const char *name)
591 {
592   NdbMgmHandle h= m_config_retriever->get_mgmHandle();
593   ndb_mgm_set_name(h, name);
594 }
595 
596 int
init_nodes_vector(Uint32 nodeid,const ndb_mgm_configuration & config)597 Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
598 					       const ndb_mgm_configuration
599 					       &config)
600 {
601   DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
602   ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
603 
604   for(iter.first(); iter.valid(); iter.next())
605   {
606     Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
607     const char * remoteHostName= 0, * localHostName= 0;
608     if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
609     if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
610 
611     if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
612     remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
613 
614     iter.get(CFG_CONNECTION_GROUP, &group);
615 
616     {
617       const char * host1= 0, * host2= 0;
618       iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
619       iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
620       localHostName  = (nodeid == nodeid1 ? host1 : host2);
621       remoteHostName = (nodeid == nodeid1 ? host2 : host1);
622     }
623 
624     Uint32 type = ~0;
625     if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
626 
627     switch(type){
628     case CONNECTION_TYPE_SHM:{
629       break;
630     }
631     case CONNECTION_TYPE_SCI:{
632       break;
633     }
634     case CONNECTION_TYPE_TCP:{
635       // connecting through localhost
636       // check if config_hostname is local
637       if (SocketServer::tryBind(0,remoteHostName))
638 	group--; // upgrade group value
639       break;
640     }
641     }
642     if (m_all_nodes.push_back(Node(group,remoteNodeId)))
643     {
644       DBUG_RETURN(-1);
645     }
646     DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
647     for (int i= m_all_nodes.size()-2;
648 	 i >= 0 && m_all_nodes[i].group > m_all_nodes[i+1].group;
649 	 i--)
650     {
651       Node tmp= m_all_nodes[i];
652       m_all_nodes[i]= m_all_nodes[i+1];
653       m_all_nodes[i+1]= tmp;
654     }
655   }
656 
657   int i;
658   Uint32 cur_group, i_group= 0;
659   cur_group= ~0;
660   for (i= (int)m_all_nodes.size()-1; i >= 0; i--)
661   {
662     if (m_all_nodes[i].group != cur_group)
663     {
664       cur_group= m_all_nodes[i].group;
665       i_group= i+1;
666     }
667     m_all_nodes[i].next_group= i_group;
668   }
669   cur_group= ~0;
670   for (i= 0; i < (int)m_all_nodes.size(); i++)
671   {
672     if (m_all_nodes[i].group != cur_group)
673     {
674       cur_group= m_all_nodes[i].group;
675       i_group= i;
676     }
677     m_all_nodes[i].this_group= i_group;
678   }
679 #if 0
680   for (i= 0; i < (int)m_all_nodes.size(); i++)
681   {
682     fprintf(stderr, "[%d] %d %d %d %d\n",
683 	   i,
684 	   m_all_nodes[i].id,
685 	   m_all_nodes[i].group,
686 	   m_all_nodes[i].this_group,
687 	   m_all_nodes[i].next_group);
688   }
689 
690   do_test();
691 #endif
692   DBUG_RETURN(0);
693 }
694 
695 Uint32
get_db_nodes(Uint8 arr[MAX_NDB_NODES]) const696 Ndb_cluster_connection_impl::get_db_nodes(Uint8 arr[MAX_NDB_NODES]) const
697 {
698   Uint32 cnt = (Uint32)m_all_nodes.size();
699   assert(cnt < MAX_NDB_NODES);
700   const Node *nodes = m_all_nodes.getBase();
701   for (Uint32 i = 0; i<cnt; i++)
702     arr[i] = (Uint8)nodes[i].id;
703   return cnt;
704 }
705 
706 int
configure(Uint32 nodeId,const ndb_mgm_configuration & config)707 Ndb_cluster_connection_impl::configure(Uint32 nodeId,
708                                        const ndb_mgm_configuration &config)
709 {
710   DBUG_ENTER("Ndb_cluster_connection_impl::configure");
711   {
712     ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
713     if(iter.find(CFG_NODE_ID, nodeId))
714       DBUG_RETURN(-1);
715 
716     // Configure scan settings
717     Uint32 scan_batch_size= 0;
718     if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
719       m_config.m_scan_batch_size= scan_batch_size;
720     }
721     Uint32 batch_byte_size= 0;
722     if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
723       m_config.m_batch_byte_size= batch_byte_size;
724     }
725     Uint32 batch_size= 0;
726     if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
727       m_config.m_batch_size= batch_size;
728     }
729 
730     // Configure timeouts
731     Uint32 timeout = 120000;
732     for (iter.first(); iter.valid(); iter.next())
733     {
734       Uint32 tmp1 = 0, tmp2 = 0;
735       iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
736       iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
737       tmp1 += tmp2;
738       if (tmp1 > timeout)
739         timeout = tmp1;
740     }
741     m_config.m_waitfor_timeout = timeout;
742 
743     Uint32 queue = 0;
744     if (!iter.get(CFG_DEFAULT_OPERATION_REDO_PROBLEM_ACTION, &queue))
745     {
746       m_config.m_default_queue_option = queue;
747     }
748   }
749   DBUG_RETURN(init_nodes_vector(nodeId, config));
750 }
751 
752 void
do_test()753 Ndb_cluster_connection_impl::do_test()
754 {
755   Ndb_cluster_connection_node_iter iter;
756   int n= no_db_nodes()+5;
757   Uint32 *nodes= new Uint32[n+1];
758 
759   for (int g= 0; g < n; g++)
760   {
761     for (int h= 0; h < n; h++)
762     {
763       Uint32 id;
764       Ndb_cluster_connection_node_iter iter2;
765       {
766 	for (int j= 0; j < g; j++)
767 	{
768 	  nodes[j]= get_next_node(iter2);
769 	}
770       }
771 
772       for (int i= 0; i < n; i++)
773       {
774 	init_get_next_node(iter);
775 	fprintf(stderr, "%d dead:(", g);
776 	id= 0;
777 	while (id == 0)
778 	{
779 	  if ((id= get_next_node(iter)) == 0)
780 	    break;
781 	  for (int j= 0; j < g; j++)
782 	  {
783 	    if (nodes[j] == id)
784 	    {
785 	      fprintf(stderr, " %d", id);
786 	      id= 0;
787 	      break;
788 	    }
789 	  }
790 	}
791 	fprintf(stderr, ")");
792 	if (id == 0)
793 	{
794 	  break;
795 	}
796 	fprintf(stderr, " %d\n", id);
797       }
798       fprintf(stderr, "\n");
799     }
800   }
801   delete [] nodes;
802 }
803 
set_name(const char * name)804 void Ndb_cluster_connection::set_name(const char *name)
805 {
806   m_impl.set_name(name);
807 }
808 
connect(int no_retries,int retry_delay_in_seconds,int verbose)809 int Ndb_cluster_connection_impl::connect(int no_retries,
810                                          int retry_delay_in_seconds,
811                                          int verbose)
812 {
813   DBUG_ENTER("Ndb_cluster_connection::connect");
814   do {
815     if (m_config_retriever == 0)
816     {
817       if (!m_latest_error)
818       {
819         m_latest_error = 1;
820         m_latest_error_msg.assign("Ndb_cluster_connection init "
821                                   "error: m_config_retriever==0");
822       }
823       DBUG_PRINT("exit", ("no m_config_retriever, ret: -1"));
824       DBUG_RETURN(-1);
825     }
826     if (m_config_retriever->do_connect(no_retries,
827                                        retry_delay_in_seconds,
828                                        verbose))
829     {
830       char buf[1024];
831       m_latest_error = 1;
832       m_latest_error_msg.assfmt("Connect using '%s' timed out",
833                                 get_connectstring(buf, sizeof(buf)));
834       DBUG_PRINT("exit", ("mgmt server not up yet, ret: 1"));
835       DBUG_RETURN(1); // mgmt server not up yet
836     }
837 
838     Uint32 nodeId = m_config_retriever->allocNodeId(4/*retries*/,
839                                                     3/*delay*/);
840     if(nodeId == 0)
841       break;
842     ndb_mgm_configuration * props = m_config_retriever->getConfig(nodeId);
843     if(props == 0)
844       break;
845 
846     if (configure(nodeId, *props))
847     {
848       ndb_mgm_destroy_configuration(props);
849       DBUG_PRINT("exit", ("malloc failure, ret: -1"));
850       DBUG_RETURN(-1);
851     }
852 
853     if (m_transporter_facade->start_instance(nodeId, props) < 0)
854     {
855       ndb_mgm_destroy_configuration(props);
856       DBUG_RETURN(-1);
857     }
858 
859     ndb_mgm_destroy_configuration(props);
860     m_transporter_facade->connected();
861     m_latest_error = 0;
862     m_latest_error_msg.assign("");
863     DBUG_PRINT("exit", ("connect ok, ret: 0"));
864     DBUG_RETURN(0);
865   } while(0);
866 
867   const char* erString = m_config_retriever->getErrorString();
868   if (erString == 0) {
869     erString = "No error specified!";
870   }
871   m_latest_error = 1;
872   m_latest_error_msg.assfmt("Configuration error: %s", erString);
873   ndbout << get_latest_error_msg() << endl;
874   DBUG_PRINT("exit", ("connect failed, '%s' ret: -1", erString));
875   DBUG_RETURN(-1);
876 }
877 
878 
879 int
connect(int no_retries,int retry_delay_in_seconds,int verbose)880 Ndb_cluster_connection::connect(int no_retries,
881                                 int retry_delay_in_seconds,
882                                 int verbose)
883 {
884   return m_impl.connect(no_retries, retry_delay_in_seconds, verbose);
885 }
886 
887 
connect_thread()888 void Ndb_cluster_connection_impl::connect_thread()
889 {
890   DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
891   int r;
892   do {
893     NdbSleep_SecSleep(1);
894     if ((r = connect(0,0,0)) == 0)
895       break;
896     if (r == -1) {
897       printf("Ndb_cluster_connection::connect_thread error\n");
898       DBUG_ASSERT(false);
899       m_run_connect_thread= 0;
900     } else {
901       // Wait before making a new connect attempt
902       NdbSleep_SecSleep(1);
903     }
904   } while (m_run_connect_thread);
905   if (m_connect_callback)
906     (*m_connect_callback)();
907   DBUG_VOID_RETURN;
908 }
909 
910 Uint64 *
get_latest_trans_gci()911 Ndb_cluster_connection::get_latest_trans_gci()
912 {
913  return m_impl.get_latest_trans_gci();
914 }
915 
916 void
init_get_next_node(Ndb_cluster_connection_node_iter & iter)917 Ndb_cluster_connection::init_get_next_node(Ndb_cluster_connection_node_iter &iter)
918 {
919   m_impl.init_get_next_node(iter);
920 }
921 
922 Uint32
get_next_node(Ndb_cluster_connection_node_iter & iter)923 Ndb_cluster_connection::get_next_node(Ndb_cluster_connection_node_iter &iter)
924 {
925   return m_impl.get_next_node(iter);
926 }
927 
928 unsigned int
get_next_alive_node(Ndb_cluster_connection_node_iter & iter)929 Ndb_cluster_connection::get_next_alive_node(Ndb_cluster_connection_node_iter &iter)
930 {
931   return m_impl.get_next_alive_node(iter);
932 }
933 
934 unsigned
get_active_ndb_objects() const935 Ndb_cluster_connection::get_active_ndb_objects() const
936 {
937   return m_impl.m_transporter_facade->get_active_ndb_objects();
938 }
939 
set_timeout(int timeout_ms)940 int Ndb_cluster_connection::set_timeout(int timeout_ms)
941 {
942   return ndb_mgm_set_timeout(m_impl.m_config_retriever->get_mgmHandle(),
943                              timeout_ms);
944 }
945 
946 int
get_auto_reconnect() const947 Ndb_cluster_connection::get_auto_reconnect() const
948 {
949   return m_impl.m_transporter_facade->get_auto_reconnect();
950 }
951 
952 void
set_auto_reconnect(int value)953 Ndb_cluster_connection::set_auto_reconnect(int value)
954 {
955   m_impl.m_transporter_facade->set_auto_reconnect(value);
956 }
957 
958 Uint32
collect_client_stats(Uint64 * statsArr,Uint32 sz)959 Ndb_cluster_connection::collect_client_stats(Uint64* statsArr, Uint32 sz)
960 {
961   /* We have a global stats baseline which contains all
962    * the stats for Ndb objects which have been and gone.
963    * Start with that, then add in stats for Ndb objects
964    * currently in use.
965    * Note that despite the lock, this is not thread safe
966    * as we are reading data that other threads may be
967    * concurrently writing.  The lock just guards against
968    * concurrent changes to the set of active Ndbs while
969    * we are iterating it.
970    */
971   const Uint32 relevant = MIN((Uint32)Ndb::NumClientStatistics, sz);
972   const Ndb* ndb = NULL;
973   lock_ndb_objects();
974   {
975     memcpy(statsArr, &m_impl.globalApiStatsBaseline[0], sizeof(Uint64)*relevant);
976 
977     while((ndb = get_next_ndb_object(ndb)) != NULL)
978     {
979       for (Uint32 i=0; i<relevant; i++)
980       {
981         statsArr[i] += ndb->theImpl->clientStats[i];
982       }
983     }
984   }
985   unlock_ndb_objects();
986 
987   return relevant;
988 }
989 
990 template class Vector<Ndb_cluster_connection_impl::Node>;
991 
992