1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include <ndb_global.h>
26 
27 #include <TransporterRegistry.hpp>
28 #include "Configuration.hpp"
29 #include <ErrorHandlingMacros.hpp>
30 #include "GlobalData.hpp"
31 
32 #include <ConfigRetriever.hpp>
33 #include <IPCConfig.hpp>
34 #include <ndb_version.h>
35 #include <NdbOut.hpp>
36 #include <WatchDog.hpp>
37 #include <NdbConfig.h>
38 #include <NdbSpin.h>
39 
40 #include <mgmapi_configuration.hpp>
41 #include <kernel_config_parameters.h>
42 
43 #include <util/ConfigValues.hpp>
44 #include <NdbEnv.h>
45 
46 #include <ndbapi_limits.h>
47 #include "mt.hpp"
48 
49 #include "../../common/util/parse_mask.hpp"
50 
51 #include <EventLogger.hpp>
52 
53 #define JAM_FILE_ID 301
54 
55 extern EventLogger * g_eventLogger;
56 
57 extern Uint32 g_start_type;
58 
59 bool
init(int _no_start,int _initial,int _initialstart)60 Configuration::init(int _no_start, int _initial,
61                     int _initialstart)
62 {
63   // Check the start flag
64   if (_no_start)
65     globalData.theRestartFlag = initial_state;
66   else
67     globalData.theRestartFlag = perform_start;
68 
69   // Check the initial flag
70   if (_initial)
71     _initialStart = true;
72 
73   globalData.ownId= 0;
74 
75   if (_initialstart)
76   {
77     _initialStart = true;
78     g_start_type |= (1 << NodeState::ST_INITIAL_START);
79   }
80 
81   threadIdMutex = NdbMutex_Create();
82   if (!threadIdMutex)
83   {
84     g_eventLogger->error("Failed to create threadIdMutex");
85     return false;
86   }
87   initThreadArray();
88   return true;
89 }
90 
Configuration()91 Configuration::Configuration()
92 {
93   _fsPath = 0;
94   _backupPath = 0;
95   _initialStart = false;
96   m_config_retriever= 0;
97   m_clusterConfig= 0;
98   m_clusterConfigIter= 0;
99   m_logLevel= 0;
100 }
101 
~Configuration()102 Configuration::~Configuration(){
103 
104   if(_fsPath != NULL)
105     free(_fsPath);
106 
107   if(_backupPath != NULL)
108     free(_backupPath);
109 
110   if (m_config_retriever) {
111     delete m_config_retriever;
112   }
113 
114   if(m_logLevel) {
115     delete m_logLevel;
116   }
117 }
118 
119 void
closeConfiguration(bool end_session)120 Configuration::closeConfiguration(bool end_session){
121   m_config_retriever->end_session(end_session);
122   if (m_config_retriever) {
123     delete m_config_retriever;
124   }
125   m_config_retriever= 0;
126 }
127 
128 void
fetch_configuration(const char * _connect_string,int force_nodeid,const char * _bind_address,NodeId allocated_nodeid,int connect_retries,int connect_delay)129 Configuration::fetch_configuration(const char* _connect_string,
130                                    int force_nodeid,
131                                    const char* _bind_address,
132                                    NodeId allocated_nodeid,
133                                    int connect_retries, int connect_delay)
134 {
135   /**
136    * Fetch configuration from management server
137    */
138   if (m_config_retriever) {
139     delete m_config_retriever;
140   }
141 
142   m_config_retriever= new ConfigRetriever(_connect_string,
143                                           force_nodeid,
144                                           NDB_VERSION,
145                                           NDB_MGM_NODE_TYPE_NDB,
146 					  _bind_address);
147   if (!m_config_retriever)
148   {
149     ERROR_SET(fatal, NDBD_EXIT_MEMALLOC,
150               "Failed to create ConfigRetriever", "");
151   }
152 
153   if (m_config_retriever->hasError())
154   {
155     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
156 	      "Could not initialize handle to management server",
157 	      m_config_retriever->getErrorString());
158   }
159 
160   if(m_config_retriever->do_connect(connect_retries, connect_delay, 1) == -1){
161     const char * s = m_config_retriever->getErrorString();
162     if(s == 0)
163       s = "No error given!";
164     /* Set stop on error to true otherwise NDB will
165        go into an restart loop...
166     */
167     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Could not connect to ndb_mgmd", s);
168   }
169 
170   ConfigRetriever &cr= *m_config_retriever;
171 
172   if (allocated_nodeid)
173   {
174     // The angel has already allocated the nodeid, no need to
175     // allocate it
176     globalData.ownId = allocated_nodeid;
177   }
178   else
179   {
180 
181     const int alloc_retries = 10;
182     const int alloc_delay = 3;
183     globalData.ownId = cr.allocNodeId(alloc_retries, alloc_delay);
184     if(globalData.ownId == 0)
185     {
186       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
187                 "Unable to alloc node id",
188                 m_config_retriever->getErrorString());
189     }
190   }
191   assert(globalData.ownId);
192 
193   ndb_mgm_configuration * p = cr.getConfig(globalData.ownId);
194   if(p == 0){
195     const char * s = cr.getErrorString();
196     if(s == 0)
197       s = "No error given!";
198 
199     /* Set stop on error to true otherwise NDB will
200        go into an restart loop...
201     */
202 
203     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Could not fetch configuration"
204 	      "/invalid configuration", s);
205   }
206   if(m_clusterConfig)
207     free(m_clusterConfig);
208 
209   m_clusterConfig = p;
210 
211   const ConfigValues * cfg = (ConfigValues*)m_clusterConfig;
212   cfg->pack_v1(m_clusterConfigPacked_v1);
213   if (OUR_V2_VERSION)
214   {
215     cfg->pack_v2(m_clusterConfigPacked_v2);
216   }
217 
218   {
219     Uint32 generation;
220     ndb_mgm_configuration_iterator sys_iter(*p, CFG_SECTION_SYSTEM);
221     if (sys_iter.get(CFG_SYS_CONFIG_GENERATION, &generation))
222     {
223       g_eventLogger->info("Configuration fetched from '%s:%d', unknown generation!! (likely older ndb_mgmd)",
224                           m_config_retriever->get_mgmd_host(),
225                           m_config_retriever->get_mgmd_port());
226     }
227     else
228     {
229       g_eventLogger->info("Configuration fetched from '%s:%d', generation: %d",
230                           m_config_retriever->get_mgmd_host(),
231                           m_config_retriever->get_mgmd_port(),
232                           generation);
233     }
234   }
235 
236   ndb_mgm_configuration_iterator iter(* p, CFG_SECTION_NODE);
237   if (iter.find(CFG_NODE_ID, globalData.ownId)){
238     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", "DB missing");
239   }
240 
241   if(iter.get(CFG_DB_STOP_ON_ERROR, &_stopOnError)){
242     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
243 	      "StopOnError missing");
244   }
245 
246   const char * datadir;
247   if(iter.get(CFG_NODE_DATADIR, &datadir)){
248     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
249 	      "DataDir missing");
250   }
251   NdbConfig_SetPath(datadir);
252 
253 }
254 
get_and_validate_path(ndb_mgm_configuration_iterator & iter,Uint32 param,const char * param_string)255 static char * get_and_validate_path(ndb_mgm_configuration_iterator &iter,
256 				    Uint32 param, const char *param_string)
257 {
258   const char* path = NULL;
259   if(iter.get(param, &path)){
260     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched missing ",
261 	      param_string);
262   }
263 
264   if(path == 0 || strlen(path) == 0){
265     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
266 	      "Invalid configuration fetched. Configuration does not contain valid ",
267 	      param_string);
268   }
269 
270   // check that it is pointing on a valid directory
271   //
272   char buf2[PATH_MAX];
273   memset(buf2, 0,sizeof(buf2));
274 #ifdef _WIN32
275   char* szFilePart;
276   if(!GetFullPathName(path, sizeof(buf2), buf2, &szFilePart) ||
277      (GetFileAttributes(buf2) & FILE_ATTRIBUTE_READONLY))
278 #else
279   if((::realpath(path, buf2) == NULL)||
280        (::access(buf2, W_OK) != 0))
281 #endif
282   {
283     ERROR_SET(fatal, NDBD_EXIT_AFS_INVALIDPATH, path, param_string);
284   }
285 
286   if (strcmp(&buf2[strlen(buf2) - 1], DIR_SEPARATOR))
287     strcat(buf2, DIR_SEPARATOR);
288 
289   return strdup(buf2);
290 }
291 
292 void
setupConfiguration()293 Configuration::setupConfiguration(){
294 
295   DBUG_ENTER("Configuration::setupConfiguration");
296 
297   ndb_mgm_configuration * p = m_clusterConfig;
298 
299   /**
300    * Configure transporters
301    */
302   if (!globalTransporterRegistry.init(globalData.ownId))
303   {
304     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
305               "Invalid configuration fetched",
306               "Could not init transporter registry");
307   }
308 
309   if (!IPCConfig::configureTransporters(globalData.ownId,
310                                         * p,
311                                         globalTransporterRegistry))
312   {
313     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
314               "Invalid configuration fetched",
315               "Could not configure transporters");
316   }
317 
318   /**
319    * Setup cluster configuration data
320    */
321   ndb_mgm_configuration_iterator iter(* p, CFG_SECTION_NODE);
322   if (iter.find(CFG_NODE_ID, globalData.ownId)){
323     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", "DB missing");
324   }
325 
326   unsigned type;
327   if(!(iter.get(CFG_TYPE_OF_SECTION, &type) == 0 && type == NODE_TYPE_DB)){
328     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
329 	      "I'm wrong type of node");
330   }
331 
332   /**
333    * Iff we use the 'default' (non-mt) send buffer implementation, the
334    * send buffers are allocated here.
335    */
336   if (getNonMTTransporterSendHandle() != NULL)
337   {
338     Uint32 total_send_buffer = 0;
339     iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
340     Uint64 extra_send_buffer = 0;
341     iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer);
342     getNonMTTransporterSendHandle()->
343       allocate_send_buffers(total_send_buffer,
344                             extra_send_buffer);
345   }
346 
347   if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){
348     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
349 	      "MaxNoOfSavedMessages missing");
350   }
351 
352   if(iter.get(CFG_DB_MEMLOCK, &_lockPagesInMainMemory)){
353     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
354 	      "LockPagesInMainMemory missing");
355   }
356 
357   if(iter.get(CFG_DB_WATCHDOG_INTERVAL, &_timeBetweenWatchDogCheck)){
358     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
359 	      "TimeBetweenWatchDogCheck missing");
360   }
361 
362   _schedulerResponsiveness = 5;
363   iter.get(CFG_DB_SCHED_RESPONSIVENESS, &_schedulerResponsiveness);
364 
365   _schedulerExecutionTimer = 50;
366   iter.get(CFG_DB_SCHED_EXEC_TIME, &_schedulerExecutionTimer);
367 
368   _schedulerSpinTimer = DEFAULT_SPIN_TIME;
369   iter.get(CFG_DB_SCHED_SPIN_TIME, &_schedulerSpinTimer);
370   /* Always set SchedulerSpinTimer to 0 on platforms not supporting spin */
371   if (!NdbSpin_is_supported())
372   {
373     _schedulerSpinTimer = 0;
374   }
375   g_eventLogger->info("SchedulerSpinTimer = %u", _schedulerSpinTimer);
376 
377   _spinTimePerCall = 1000;
378   iter.get(CFG_DB_SPIN_TIME_PER_CALL, &_spinTimePerCall);
379 
380   _maxSendDelay = 0;
381   iter.get(CFG_DB_MAX_SEND_DELAY, &_maxSendDelay);
382 
383   _realtimeScheduler = 0;
384   iter.get(CFG_DB_REALTIME_SCHEDULER, &_realtimeScheduler);
385 
386   if(iter.get(CFG_DB_WATCHDOG_INTERVAL_INITIAL,
387               &_timeBetweenWatchDogCheckInitial)){
388     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
389 	      "TimeBetweenWatchDogCheckInitial missing");
390   }
391 
392 #ifdef ERROR_INSERT
393   _mixologyLevel = 0;
394   iter.get(CFG_MIXOLOGY_LEVEL, &_mixologyLevel);
395   if (_mixologyLevel)
396   {
397     ndbout_c("Mixology level set to 0x%x", _mixologyLevel);
398     globalTransporterRegistry.setMixologyLevel(_mixologyLevel);
399   }
400 #endif
401 
402   /**
403    * Get paths
404    */
405   if (_fsPath)
406     free(_fsPath);
407   _fsPath= get_and_validate_path(iter, CFG_DB_FILESYSTEM_PATH, "FileSystemPath");
408   if (_backupPath)
409     free(_backupPath);
410   _backupPath= get_and_validate_path(iter, CFG_DB_BACKUP_DATADIR, "BackupDataDir");
411 
412   if(iter.get(CFG_DB_STOP_ON_ERROR_INSERT, &m_restartOnErrorInsert)){
413     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
414 	      "RestartOnErrorInsert missing");
415   }
416 
417   /**
418    * Create the watch dog thread
419    */
420   {
421     if (_timeBetweenWatchDogCheckInitial < _timeBetweenWatchDogCheck)
422       _timeBetweenWatchDogCheckInitial = _timeBetweenWatchDogCheck;
423 
424     Uint32 t = _timeBetweenWatchDogCheckInitial;
425     t = globalEmulatorData.theWatchDog ->setCheckInterval(t);
426     _timeBetweenWatchDogCheckInitial = t;
427   }
428 
429   const char * lockmask = 0;
430   {
431     if (iter.get(CFG_DB_EXECUTE_LOCK_CPU, &lockmask) == 0)
432     {
433       int res = m_thr_config.setLockExecuteThreadToCPU(lockmask);
434       if (res < 0)
435       {
436         // Could not parse LockExecuteThreadToCPU mask
437         g_eventLogger->warning("Failed to parse 'LockExecuteThreadToCPU=%s' "
438                                "(error: %d), ignoring it!",
439                                lockmask, res);
440       }
441     }
442   }
443 
444   {
445     Uint32 maintCPU = NO_LOCK_CPU;
446     iter.get(CFG_DB_MAINT_LOCK_CPU, &maintCPU);
447     if (maintCPU == 65535)
448       maintCPU = NO_LOCK_CPU; // Ignore old default(may come from old mgmd)
449     if (maintCPU != NO_LOCK_CPU)
450       m_thr_config.setLockIoThreadsToCPU(maintCPU);
451   }
452 
453 #ifdef NDB_USE_GET_ENV
454   const char * thrconfigstring = NdbEnv_GetEnv("NDB_MT_THREAD_CONFIG",
455                                                (char*)0, 0);
456 #else
457   const char * thrconfigstring = NULL;
458 #endif
459   if (thrconfigstring ||
460       iter.get(CFG_DB_MT_THREAD_CONFIG, &thrconfigstring) == 0)
461   {
462     int res = m_thr_config.do_parse(thrconfigstring,
463                                     _realtimeScheduler,
464                                     _schedulerSpinTimer);
465     if (res != 0)
466     {
467       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
468                 "Invalid configuration fetched, invalid ThreadConfig",
469                 m_thr_config.getErrorMessage());
470     }
471   }
472   else
473   {
474     Uint32 mtthreads = 0;
475     iter.get(CFG_DB_MT_THREADS, &mtthreads);
476 
477     Uint32 classic = 0;
478     iter.get(CFG_NDBMT_CLASSIC, &classic);
479 #ifdef NDB_USE_GET_ENV
480     const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
481     if (p != 0)
482     {
483       if (strstr(p, "NOPLEASE") != 0)
484         classic = 1;
485     }
486 #endif
487     Uint32 lqhthreads = 0;
488     iter.get(CFG_NDBMT_LQH_THREADS, &lqhthreads);
489 
490     int res = m_thr_config.do_parse(mtthreads,
491                                     lqhthreads,
492                                     classic,
493                                     _realtimeScheduler,
494                                     _schedulerSpinTimer);
495     if (res != 0)
496     {
497       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
498                 "Invalid configuration fetched, invalid thread configuration",
499                 m_thr_config.getErrorMessage());
500     }
501   }
502   if (NdbIsMultiThreaded())
503   {
504     if (thrconfigstring)
505     {
506       ndbout_c("ThreadConfig: input: %s LockExecuteThreadToCPU: %s => parsed: %s",
507                thrconfigstring,
508                lockmask ? lockmask : "",
509                m_thr_config.getConfigString());
510     }
511     else
512     {
513       ndbout_c("ThreadConfig (old ndb_mgmd) LockExecuteThreadToCPU: %s => parsed: %s",
514                lockmask ? lockmask : "",
515                m_thr_config.getConfigString());
516     }
517   }
518 
519   ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config);
520 
521   if(m_clusterConfigIter)
522     ndb_mgm_destroy_iterator(m_clusterConfigIter);
523   m_clusterConfigIter = ndb_mgm_create_configuration_iterator
524     (p, CFG_SECTION_NODE);
525 
526   /**
527    * This is parts of get_multithreaded_config
528    */
529   do
530   {
531     globalData.isNdbMt = NdbIsMultiThreaded();
532     if (!globalData.isNdbMt)
533       break;
534 
535     globalData.ndbMtTcThreads = m_thr_config.getThreadCount(THRConfig::T_TC);
536     globalData.ndbMtSendThreads =
537       m_thr_config.getThreadCount(THRConfig::T_SEND);
538     globalData.ndbMtReceiveThreads =
539       m_thr_config.getThreadCount(THRConfig::T_RECV);
540 
541     globalData.isNdbMtLqh = true;
542     {
543       if (m_thr_config.getMtClassic())
544       {
545         globalData.isNdbMtLqh = false;
546       }
547     }
548 
549     if (!globalData.isNdbMtLqh)
550       break;
551 
552     Uint32 threads = m_thr_config.getThreadCount(THRConfig::T_LDM);
553     Uint32 workers = threads;
554     iter.get(CFG_NDBMT_LQH_WORKERS, &workers);
555 
556 #ifdef VM_TRACE
557 #ifdef NDB_USE_GET_ENV
558     // testing
559     {
560       const char* p;
561       p = NdbEnv_GetEnv("NDBMT_LQH_WORKERS", (char*)0, 0);
562       if (p != 0)
563         workers = atoi(p);
564     }
565 #endif
566 #endif
567 
568 
569     assert(workers != 0 && workers <= MAX_NDBMT_LQH_WORKERS);
570     assert(threads != 0 && threads <= MAX_NDBMT_LQH_THREADS);
571     assert(workers % threads == 0);
572 
573     globalData.ndbMtLqhWorkers = workers;
574     globalData.ndbMtLqhThreads = threads;
575   } while (0);
576 
577   calcSizeAlt(cf);
578 
579   DBUG_VOID_RETURN;
580 }
581 
582 Uint32
lockPagesInMainMemory() const583 Configuration::lockPagesInMainMemory() const {
584   return _lockPagesInMainMemory;
585 }
586 
587 int
schedulerExecutionTimer() const588 Configuration::schedulerExecutionTimer() const {
589   return _schedulerExecutionTimer;
590 }
591 
592 void
schedulerExecutionTimer(int value)593 Configuration::schedulerExecutionTimer(int value) {
594   if (value < 11000)
595     _schedulerExecutionTimer = value;
596 }
597 
598 Uint32
spinTimePerCall() const599 Configuration::spinTimePerCall() const {
600   return _spinTimePerCall;
601 }
602 
603 int
schedulerSpinTimer() const604 Configuration::schedulerSpinTimer() const {
605   return _schedulerSpinTimer;
606 }
607 
608 void
schedulerSpinTimer(int value)609 Configuration::schedulerSpinTimer(int value) {
610   if (value < 500)
611     value = 500;
612   _schedulerSpinTimer = value;
613 }
614 
615 bool
realtimeScheduler() const616 Configuration::realtimeScheduler() const
617 {
618   return (bool)_realtimeScheduler;
619 }
620 
621 Uint32
maxSendDelay() const622 Configuration::maxSendDelay() const
623 {
624   return _maxSendDelay;
625 }
626 
627 void
realtimeScheduler(bool realtime_on)628 Configuration::realtimeScheduler(bool realtime_on)
629 {
630    bool old_value = (bool)_realtimeScheduler;
631   _realtimeScheduler = (Uint32)realtime_on;
632   if (old_value != realtime_on)
633     setAllRealtimeScheduler();
634 }
635 
636 int
timeBetweenWatchDogCheck() const637 Configuration::timeBetweenWatchDogCheck() const {
638   return _timeBetweenWatchDogCheck;
639 }
640 
641 void
timeBetweenWatchDogCheck(int value)642 Configuration::timeBetweenWatchDogCheck(int value) {
643   _timeBetweenWatchDogCheck = value;
644 }
645 
646 int
maxNoOfErrorLogs() const647 Configuration::maxNoOfErrorLogs() const {
648   return _maxErrorLogs;
649 }
650 
651 void
maxNoOfErrorLogs(int val)652 Configuration::maxNoOfErrorLogs(int val){
653   _maxErrorLogs = val;
654 }
655 
656 bool
stopOnError() const657 Configuration::stopOnError() const {
658   return _stopOnError;
659 }
660 
661 void
stopOnError(bool val)662 Configuration::stopOnError(bool val){
663   _stopOnError = val;
664 }
665 
666 int
getRestartOnErrorInsert() const667 Configuration::getRestartOnErrorInsert() const {
668   return m_restartOnErrorInsert;
669 }
670 
671 void
setRestartOnErrorInsert(int i)672 Configuration::setRestartOnErrorInsert(int i){
673   m_restartOnErrorInsert = i;
674 }
675 
676 #ifdef ERROR_INSERT
677 Uint32
getMixologyLevel() const678 Configuration::getMixologyLevel() const {
679   return _mixologyLevel;
680 }
681 
682 void
setMixologyLevel(Uint32 l)683 Configuration::setMixologyLevel(Uint32 l){
684   _mixologyLevel = l;
685 }
686 #endif
687 
688 const ndb_mgm_configuration_iterator *
getOwnConfigIterator() const689 Configuration::getOwnConfigIterator() const {
690   return m_ownConfigIterator;
691 }
692 
693 const class ConfigValues*
get_own_config_values()694 Configuration::get_own_config_values()
695 {
696   return &m_ownConfig->m_config;
697 }
698 
699 
700 ndb_mgm_configuration_iterator *
getClusterConfigIterator() const701 Configuration::getClusterConfigIterator() const {
702   return m_clusterConfigIter;
703 }
704 
705 Uint32
get_config_generation() const706 Configuration::get_config_generation() const {
707   Uint32 generation = ~0;
708   ndb_mgm_configuration_iterator sys_iter(*m_clusterConfig,
709                                           CFG_SECTION_SYSTEM);
710   sys_iter.get(CFG_SYS_CONFIG_GENERATION, &generation);
711   return generation;
712 }
713 
714 
715 void
calcSizeAlt(ConfigValues * ownConfig)716 Configuration::calcSizeAlt(ConfigValues * ownConfig)
717 {
718   const char * msg = "Invalid configuration fetched";
719   char buf[255];
720 
721   unsigned int noOfTables = 0;
722   unsigned int noOfUniqueHashIndexes = 0;
723   unsigned int noOfOrderedIndexes = 0;
724   unsigned int noOfTriggers = 0;
725   unsigned int noOfReplicas = 0;
726   unsigned int noOfDBNodes = 0;
727   unsigned int noOfAPINodes = 0;
728   unsigned int noOfMGMNodes = 0;
729   unsigned int noOfNodes = 0;
730   unsigned int noOfAttributes = 0;
731   unsigned int noOfOperations = 32768;
732   unsigned int noOfLocalOperations = 32;
733   unsigned int noOfTransactions = 4096;
734   unsigned int noOfIndexPages = 0;
735   unsigned int noOfDataPages = 0;
736   unsigned int noOfScanRecords = 256;
737   unsigned int noOfLocalScanRecords = 32;
738   unsigned int noBatchSize = 0;
739   unsigned int noOfIndexOperations = 8192;
740   unsigned int noOfTriggerOperations = 4000;
741   unsigned int reservedScanRecords = 256 / 4;
742   unsigned int reservedLocalScanRecords = 32 / 4;
743   unsigned int reservedOperations = 32768 / 4;
744   unsigned int reservedTransactions = 4096 / 4;
745   unsigned int reservedIndexOperations = 8192 / 4;
746   unsigned int reservedTriggerOperations = 4000 / 4;
747   unsigned int transactionBufferBytes = 1048576;
748   unsigned int reservedTransactionBufferBytes = 1048576 / 4;
749   unsigned int maxOpsPerTrans = ~(Uint32)0;
750 
751   m_logLevel = new LogLevel();
752   if (!m_logLevel)
753   {
754     ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, "Failed to create LogLevel", "");
755   }
756 
757   struct AttribStorage { int paramId; Uint32 * storage; bool computable; };
758   AttribStorage tmp[] = {
759     { CFG_DB_NO_SCANS, &noOfScanRecords, false },
760     { CFG_DB_RESERVED_SCANS, &reservedScanRecords, true },
761     { CFG_DB_NO_LOCAL_SCANS, &noOfLocalScanRecords, true },
762     { CFG_DB_RESERVED_LOCAL_SCANS, &reservedLocalScanRecords, true },
763     { CFG_DB_BATCH_SIZE, &noBatchSize, false },
764     { CFG_DB_NO_TABLES, &noOfTables, false },
765     { CFG_DB_NO_ORDERED_INDEXES, &noOfOrderedIndexes, false },
766     { CFG_DB_NO_UNIQUE_HASH_INDEXES, &noOfUniqueHashIndexes, false },
767     { CFG_DB_NO_TRIGGERS, &noOfTriggers, true },
768     { CFG_DB_NO_REPLICAS, &noOfReplicas, false },
769     { CFG_DB_NO_ATTRIBUTES, &noOfAttributes, false },
770     { CFG_DB_NO_OPS, &noOfOperations, false },
771     { CFG_DB_RESERVED_OPS, &reservedOperations, true },
772     { CFG_DB_NO_LOCAL_OPS, &noOfLocalOperations, true },
773     { CFG_DB_NO_TRANSACTIONS, &noOfTransactions, false },
774     { CFG_DB_RESERVED_TRANSACTIONS, &reservedTransactions, true },
775     { CFG_DB_MAX_DML_OPERATIONS_PER_TRANSACTION, &maxOpsPerTrans, false },
776     { CFG_DB_NO_INDEX_OPS, &noOfIndexOperations, true },
777     { CFG_DB_RESERVED_INDEX_OPS, &reservedIndexOperations, true },
778     { CFG_DB_NO_TRIGGER_OPS, &noOfTriggerOperations, true },
779     { CFG_DB_RESERVED_TRIGGER_OPS, &reservedTriggerOperations, true },
780     { CFG_DB_TRANS_BUFFER_MEM, &transactionBufferBytes, false },
781     { CFG_DB_RESERVED_TRANS_BUFFER_MEM, &reservedTransactionBufferBytes, true },
782   };
783 
784   ndb_mgm_configuration_iterator db(*(ndb_mgm_configuration*)ownConfig, 0);
785 
786   const int sz = sizeof(tmp)/sizeof(AttribStorage);
787   for(int i = 0; i<sz; i++){
788     if(ndb_mgm_get_int_parameter(&db, tmp[i].paramId, tmp[i].storage)){
789       if (tmp[i].computable) {
790         *tmp[i].storage = 0;
791       } else {
792         BaseString::snprintf(buf, sizeof(buf),"ConfigParam: %d not found", tmp[i].paramId);
793         ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, buf);
794       }
795     }
796   }
797 
798   Uint32 ldmInstances = 1;
799   if (globalData.isNdbMtLqh)
800   {
801     ldmInstances = globalData.ndbMtLqhWorkers;
802   }
803 
804   Uint32 tcInstances = 1;
805   if (globalData.ndbMtTcThreads > 1)
806   {
807     tcInstances = globalData.ndbMtTcThreads;
808   }
809 
810   Uint64 indexMem = 0, dataMem = 0;
811   ndb_mgm_get_int64_parameter(&db, CFG_DB_DATA_MEM, &dataMem);
812   ndb_mgm_get_int64_parameter(&db, CFG_DB_INDEX_MEM, &indexMem);
813   if(dataMem == 0){
814     BaseString::snprintf(buf, sizeof(buf), "ConfigParam: %d not found", CFG_DB_DATA_MEM);
815     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, buf);
816   }
817 
818 #define DO_DIV(x,y) (((x) + (y - 1)) / (y))
819 
820   noOfDataPages = (Uint32)(dataMem / 32768);
821   noOfIndexPages = (Uint32)(indexMem / 8192);
822   noOfIndexPages = DO_DIV(noOfIndexPages, ldmInstances);
823 
824   for(unsigned j = 0; j<LogLevel::LOGLEVEL_CATEGORIES; j++)
825   {
826     Uint32 tmp;
827     if (!ndb_mgm_get_int_parameter(&db, CFG_MIN_LOGLEVEL+j, &tmp))
828     {
829       m_logLevel->setLogLevel((LogLevel::EventCategory)j, tmp);
830     }
831   }
832 
833   // tmp
834   ndb_mgm_configuration_iterator * p = m_clusterConfigIter;
835 
836   Uint32 nodeNo = noOfNodes = 0;
837   NodeBitmask nodes;
838   for(ndb_mgm_first(p); ndb_mgm_valid(p); ndb_mgm_next(p), nodeNo++){
839 
840     Uint32 nodeId;
841     Uint32 nodeType;
842 
843     if(ndb_mgm_get_int_parameter(p, CFG_NODE_ID, &nodeId)){
844       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, "Node data (Id) missing");
845     }
846 
847     if(ndb_mgm_get_int_parameter(p, CFG_TYPE_OF_SECTION, &nodeType)){
848       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, "Node data (Type) missing");
849     }
850 
851     if(nodeId > MAX_NODES || nodeId == 0){
852       BaseString::snprintf(buf, sizeof(buf),
853 	       "Invalid node id: %d", nodeId);
854       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, buf);
855     }
856 
857     if(nodes.get(nodeId)){
858       BaseString::snprintf(buf, sizeof(buf), "Two node can not have the same node id: %d",
859 	       nodeId);
860       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, buf);
861     }
862     nodes.set(nodeId);
863 
864     switch(nodeType){
865     case NODE_TYPE_DB:
866       noOfDBNodes++; // No of NDB processes
867 
868       if(nodeId > MAX_NDB_NODES){
869 		  BaseString::snprintf(buf, sizeof(buf), "Maximum node id for a ndb node is: %d",
870 		 MAX_NDB_NODES);
871 	ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, buf);
872       }
873       break;
874     case NODE_TYPE_API:
875       noOfAPINodes++; // No of API processes
876       break;
877     case NODE_TYPE_MGM:
878       noOfMGMNodes++; // No of MGM processes
879       break;
880     default:
881       BaseString::snprintf(buf, sizeof(buf), "Unknown node type: %d", nodeType);
882       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, buf);
883     }
884   }
885   noOfNodes = nodeNo;
886 
887   noOfTables+= 2; // Add System tables
888   noOfAttributes += 9;  // Add System table attributes
889 
890   ConfigValues::Iterator it2(*ownConfig, db.m_config);
891   it2.set(CFG_DB_NO_TABLES, noOfTables);
892   it2.set(CFG_DB_NO_ATTRIBUTES, noOfAttributes);
893   {
894     Uint32 neededNoOfTriggers =   /* types: Insert/Update/Delete/Custom */
895       3 * noOfUniqueHashIndexes + /* for unique hash indexes, I/U/D */
896       3 * NDB_MAX_ACTIVE_EVENTS + /* for events in suma, I/U/D */
897       3 * noOfTables +            /* for backup, I/U/D */
898       3 * noOfTables +            /* for Fully replicated tables, I/U/D */
899       noOfOrderedIndexes;         /* for ordered indexes, C */
900     if (noOfTriggers < neededNoOfTriggers)
901     {
902       noOfTriggers= neededNoOfTriggers;
903       it2.set(CFG_DB_NO_TRIGGERS, noOfTriggers);
904     }
905     g_eventLogger->info("MaxNoOfTriggers set to %u", noOfTriggers);
906   }
907 
908   /**
909    * Do size calculations
910    */
911   ConfigValuesFactory cfg(ownConfig);
912 
913   cfg.begin();
914   /**
915    * Ensure that Backup doesn't fail due to lack of trigger resources
916    */
917   cfg.put(CFG_TUP_NO_TRIGGERS, noOfTriggers + 3 * noOfTables);
918 
919   Uint32 noOfMetaTables= noOfTables + noOfOrderedIndexes +
920                            noOfUniqueHashIndexes;
921   Uint32 noOfMetaTablesDict= noOfMetaTables;
922   if (noOfMetaTablesDict > NDB_MAX_TABLES)
923     noOfMetaTablesDict= NDB_MAX_TABLES;
924 
925   {
926     /**
927      * Dict Size Alt values
928      */
929     cfg.put(CFG_DICT_ATTRIBUTE,
930 	    noOfAttributes);
931 
932     cfg.put(CFG_DICT_TABLE,
933 	    noOfMetaTablesDict);
934   }
935 
936 
937   if (noOfLocalScanRecords == 0)
938   {
939     noOfLocalScanRecords = tcInstances * ldmInstances *
940       (noOfDBNodes * noOfScanRecords) +
941       1 /* NR */ +
942       1 /* LCP */;
943     if (noOfLocalScanRecords > 100000)
944     {
945       /**
946        * Number of local scan records is clearly very large, this should
947        * only happen in very large clusters with lots of data nodes, lots
948        * of TC instances, lots of LDM instances. In this case it is highly
949        * unlikely that all these resources are allocated simultaneously.
950        * It is still possible to set MaxNoOfLocalScanRecords to a higher
951        * number if desirable.
952        */
953       g_eventLogger->info("Capped calculation of local scan records to "
954                           "100000 from %u, still possible to set"
955                           " MaxNoOfLocalScans"
956                           " explicitly to go higher",
957                           noOfLocalScanRecords);
958       noOfLocalScanRecords = 100000;
959     }
960     if (noOfLocalScanRecords * noBatchSize > 1000000)
961     {
962       /**
963        * Ensure that we don't use up more than 100 MByte of lock operation
964        * records per LDM instance to avoid ridiculous amount of memory
965        * allocated for operation records. We keep old numbers in smaller
966        * configs for easier upgrades.
967        */
968       Uint32 oldBatchSize = noBatchSize;
969       noBatchSize = 1000000 / noOfLocalScanRecords;
970       g_eventLogger->info("Capped BatchSizePerLocalScan to %u from %u to avoid"
971                           " very large memory allocations"
972                           ", still possible to set MaxNoOfLocalScans"
973                           " explicitly to go higher",
974                           noBatchSize,
975                           oldBatchSize);
976     }
977   }
978   cfg.put(CFG_LDM_BATCH_SIZE, noBatchSize);
979 
980   if (noOfLocalOperations == 0) {
981     if (noOfOperations == 0)
982       noOfLocalOperations = 11 * 32768 / 10;
983     else
984       noOfLocalOperations= (11 * noOfOperations) / 10;
985   }
986 
987   const Uint32 noOfTCLocalScanRecords = DO_DIV(noOfLocalScanRecords,
988                                                tcInstances);
989   const Uint32 noOfTCScanRecords = noOfScanRecords;
990 
991   // ReservedXXX defaults to 25% of MaxNoOfXXX
992   if (reservedScanRecords == 0)
993   {
994     reservedScanRecords = noOfScanRecords / 4;
995   }
996   if (reservedLocalScanRecords == 0)
997   {
998     reservedLocalScanRecords = noOfLocalScanRecords / 4;
999   }
1000   if (reservedOperations == 0)
1001   {
1002     reservedOperations = noOfOperations / 4;
1003   }
1004   if (reservedTransactions == 0)
1005   {
1006     reservedTransactions = noOfTransactions / 4;
1007   }
1008   if (reservedIndexOperations == 0)
1009   {
1010     reservedIndexOperations = noOfIndexOperations / 4;
1011   }
1012   if (reservedTriggerOperations == 0)
1013   {
1014     reservedTriggerOperations = noOfTriggerOperations / 4;
1015   }
1016   if (reservedTransactionBufferBytes == 0)
1017   {
1018     reservedTransactionBufferBytes = transactionBufferBytes / 4;
1019   }
1020 
1021   noOfLocalOperations = DO_DIV(noOfLocalOperations, ldmInstances);
1022   noOfLocalScanRecords = DO_DIV(noOfLocalScanRecords, ldmInstances);
1023 
1024   {
1025     Uint32 noOfAccTables= noOfMetaTables/*noOfTables+noOfUniqueHashIndexes*/;
1026     /**
1027      * Acc Size Alt values
1028      */
1029     // Can keep 65536 pages (= 0.5 GByte)
1030     cfg.put(CFG_ACC_FRAGMENT,
1031 	    NO_OF_FRAG_PER_NODE * noOfAccTables* noOfReplicas);
1032 
1033     /*-----------------------------------------------------------------------*/
1034     // The extra operation records added are used by the scan and node
1035     // recovery process.
1036     // Node recovery process will have its operations dedicated to ensure
1037     // that they never have a problem with allocation of the operation record.
1038     // The remainder are allowed for use by the scan processes.
1039     /*-----------------------------------------------------------------------*/
1040     /**
1041      * We add an extra 150 operations, 100 of those are dedicated to DBUTIL
1042      * interactions and LCP and Backup scans. The remaining 50 are
1043      * non-dedicated things for local usage.
1044      */
1045 #define EXTRA_LOCAL_OPERATIONS 150
1046     Uint32 local_operations =
1047 	    (noOfLocalOperations + EXTRA_LOCAL_OPERATIONS) +
1048 	    (noOfLocalScanRecords * noBatchSize) +
1049 	    NODE_RECOVERY_SCAN_OP_RECORDS;
1050     local_operations = MIN(local_operations, UINT28_MAX);
1051     cfg.put(CFG_ACC_OP_RECS, local_operations);
1052 
1053 #ifdef VM_TRACE
1054     ndbout_c("reservedOperations: %u, reservedLocalScanRecords: %u,"
1055              " NODE_RECOVERY_SCAN_OP_RECORDS: %u, "
1056              "noOfLocalScanRecords: %u, "
1057              "noOfLocalOperations: %u",
1058              reservedOperations,
1059              reservedLocalScanRecords,
1060              NODE_RECOVERY_SCAN_OP_RECORDS,
1061              noOfLocalScanRecords,
1062              noOfLocalOperations);
1063 #endif
1064     Uint32 ldm_reserved_operations =
1065             (reservedOperations / ldmInstances) + EXTRA_LOCAL_OPERATIONS +
1066             (reservedLocalScanRecords / ldmInstances) +
1067             NODE_RECOVERY_SCAN_OP_RECORDS;
1068     ldm_reserved_operations = MIN(ldm_reserved_operations, UINT28_MAX);
1069     cfg.put(CFG_LDM_RESERVED_OPERATIONS, ldm_reserved_operations);
1070 
1071     cfg.put(CFG_ACC_TABLE, noOfAccTables);
1072 
1073     cfg.put(CFG_ACC_SCAN, noOfLocalScanRecords);
1074     cfg.put(CFG_ACC_RESERVED_SCAN_RECORDS,
1075             reservedLocalScanRecords / ldmInstances);
1076     cfg.put(CFG_TUP_RESERVED_SCAN_RECORDS,
1077             reservedLocalScanRecords / ldmInstances);
1078     cfg.put(CFG_TUX_RESERVED_SCAN_RECORDS,
1079             reservedLocalScanRecords / ldmInstances);
1080     cfg.put(CFG_LQH_RESERVED_SCAN_RECORDS,
1081             reservedLocalScanRecords / ldmInstances);
1082   }
1083 
1084   {
1085     /**
1086      * Dih Size Alt values
1087      */
1088     Uint32 noFragPerTable= (((noOfDBNodes * ldmInstances) +
1089                              NO_OF_FRAGS_PER_CHUNK - 1) >>
1090                             LOG_NO_OF_FRAGS_PER_CHUNK) <<
1091       LOG_NO_OF_FRAGS_PER_CHUNK;
1092 
1093     cfg.put(CFG_DIH_FRAG_CONNECT,
1094 	    noFragPerTable *  noOfMetaTables);
1095 
1096     cfg.put(CFG_DIH_REPLICAS,
1097 	    NO_OF_FRAG_PER_NODE * noOfMetaTables *
1098 	    noOfDBNodes * noOfReplicas * ldmInstances);
1099 
1100     cfg.put(CFG_DIH_TABLE,
1101 	    noOfMetaTables);
1102   }
1103 
1104   {
1105     /**
1106      * Lqh Size Alt values
1107      */
1108     cfg.put(CFG_LQH_FRAG,
1109 	    NO_OF_FRAG_PER_NODE * noOfMetaTables * noOfReplicas);
1110 
1111     cfg.put(CFG_LQH_TABLE,
1112 	    noOfMetaTables);
1113 
1114     Uint32 local_operations =
1115 	    noOfLocalOperations + EXTRA_LOCAL_OPERATIONS;
1116     local_operations = MIN(local_operations, UINT28_MAX);
1117     cfg.put(CFG_LQH_TC_CONNECT, local_operations);
1118 
1119     cfg.put(CFG_LQH_SCAN,
1120 	    noOfLocalScanRecords);
1121   }
1122 
1123   {
1124     /**
1125      * Spj Size Alt values
1126      */
1127     cfg.put(CFG_SPJ_TABLE,
1128 	    noOfMetaTables);
1129   }
1130 
1131   {
1132     /**
1133      * Tc Size Alt values
1134      */
1135     const Uint32 takeOverOperations = noOfOperations;
1136     if (maxOpsPerTrans == ~(Uint32)0)
1137     {
1138       maxOpsPerTrans = noOfOperations;
1139     }
1140     if (maxOpsPerTrans > noOfOperations)
1141     {
1142       BaseString::snprintf(
1143           buf,
1144           sizeof(buf),
1145           "Config param MaxDMLOperationsPerTransaction(%u) must not be bigger"
1146           " than available failover records given by "
1147           "MaxNoOfConcurrentOperations(%u)\n",
1148           maxOpsPerTrans,
1149           noOfOperations);
1150       ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, msg, buf);
1151     }
1152 
1153     cfg.put(CFG_TC_TARGET_FRAG_LOCATION, Uint32(0));
1154     cfg.put(CFG_TC_MAX_FRAG_LOCATION, UINT32_MAX);
1155     cfg.put(CFG_TC_RESERVED_FRAG_LOCATION, Uint32(0));
1156 
1157     cfg.put(CFG_TC_TARGET_SCAN_FRAGMENT, noOfTCLocalScanRecords);
1158     cfg.put(CFG_TC_MAX_SCAN_FRAGMENT, UINT32_MAX);
1159     cfg.put(CFG_TC_RESERVED_SCAN_FRAGMENT, reservedLocalScanRecords / tcInstances);
1160 
1161     cfg.put(CFG_TC_TARGET_SCAN_RECORD, noOfTCScanRecords);
1162     cfg.put(CFG_TC_MAX_SCAN_RECORD, noOfTCScanRecords);
1163     cfg.put(CFG_TC_RESERVED_SCAN_RECORD, reservedScanRecords / tcInstances);
1164 
1165     cfg.put(CFG_TC_TARGET_CONNECT_RECORD, noOfOperations + 16 + noOfTransactions);
1166     cfg.put(CFG_TC_MAX_CONNECT_RECORD, UINT32_MAX);
1167     cfg.put(CFG_TC_RESERVED_CONNECT_RECORD, reservedOperations / tcInstances);
1168 
1169     cfg.put(CFG_TC_TARGET_TO_CONNECT_RECORD, takeOverOperations);
1170     cfg.put(CFG_TC_MAX_TO_CONNECT_RECORD, takeOverOperations);
1171     cfg.put(CFG_TC_RESERVED_TO_CONNECT_RECORD, takeOverOperations);
1172 
1173     cfg.put(CFG_TC_TARGET_COMMIT_ACK_MARKER, noOfTransactions);
1174     cfg.put(CFG_TC_MAX_COMMIT_ACK_MARKER, UINT32_MAX);
1175     cfg.put(CFG_TC_RESERVED_COMMIT_ACK_MARKER, reservedTransactions / tcInstances);
1176 
1177     cfg.put(CFG_TC_TARGET_TO_COMMIT_ACK_MARKER, Uint32(0));
1178     cfg.put(CFG_TC_MAX_TO_COMMIT_ACK_MARKER, Uint32(0));
1179     cfg.put(CFG_TC_RESERVED_TO_COMMIT_ACK_MARKER, Uint32(0));
1180 
1181     cfg.put(CFG_TC_TARGET_INDEX_OPERATION, noOfIndexOperations);
1182     cfg.put(CFG_TC_MAX_INDEX_OPERATION, UINT32_MAX);
1183     cfg.put(CFG_TC_RESERVED_INDEX_OPERATION, reservedIndexOperations / tcInstances);
1184 
1185     cfg.put(CFG_TC_TARGET_API_CONNECT_RECORD, noOfTransactions);
1186     cfg.put(CFG_TC_MAX_API_CONNECT_RECORD, UINT32_MAX);
1187     cfg.put(CFG_TC_RESERVED_API_CONNECT_RECORD, reservedTransactions / tcInstances);
1188 
1189     cfg.put(CFG_TC_TARGET_TO_API_CONNECT_RECORD, reservedTransactions);
1190     cfg.put(CFG_TC_MAX_TO_API_CONNECT_RECORD, noOfTransactions);
1191     cfg.put(CFG_TC_RESERVED_TO_API_CONNECT_RECORD, reservedTransactions / tcInstances);
1192 
1193     cfg.put(CFG_TC_TARGET_CACHE_RECORD, noOfTransactions);
1194     cfg.put(CFG_TC_MAX_CACHE_RECORD, noOfTransactions);
1195     cfg.put(CFG_TC_RESERVED_CACHE_RECORD, reservedTransactions / tcInstances);
1196 
1197     cfg.put(CFG_TC_TARGET_FIRED_TRIGGER_DATA, noOfTriggerOperations);
1198     cfg.put(CFG_TC_MAX_FIRED_TRIGGER_DATA, UINT32_MAX);
1199     cfg.put(CFG_TC_RESERVED_FIRED_TRIGGER_DATA, reservedTriggerOperations / tcInstances);
1200 
1201     cfg.put(CFG_TC_TARGET_ATTRIBUTE_BUFFER, transactionBufferBytes);
1202     cfg.put(CFG_TC_MAX_ATTRIBUTE_BUFFER, UINT32_MAX);
1203     cfg.put(CFG_TC_RESERVED_ATTRIBUTE_BUFFER, reservedTransactionBufferBytes / tcInstances);
1204 
1205     cfg.put(CFG_TC_TARGET_COMMIT_ACK_MARKER_BUFFER, 2 * noOfTransactions);
1206     cfg.put(CFG_TC_MAX_COMMIT_ACK_MARKER_BUFFER, UINT32_MAX);
1207     cfg.put(CFG_TC_RESERVED_COMMIT_ACK_MARKER_BUFFER, 2 * reservedTransactions / tcInstances);
1208 
1209     cfg.put(CFG_TC_TARGET_TO_COMMIT_ACK_MARKER_BUFFER, Uint32(0));
1210     cfg.put(CFG_TC_MAX_TO_COMMIT_ACK_MARKER_BUFFER, Uint32(0));
1211     cfg.put(CFG_TC_RESERVED_TO_COMMIT_ACK_MARKER_BUFFER, Uint32(0));
1212 
1213     cfg.put(CFG_TC_TABLE,
1214 	    noOfMetaTables);
1215   }
1216 
1217   {
1218     /**
1219      * Tup Size Alt values
1220      */
1221     cfg.put(CFG_TUP_FRAG,
1222 	    NO_OF_FRAG_PER_NODE * noOfMetaTables* noOfReplicas);
1223 
1224     Uint32 local_operations =
1225 	    noOfLocalOperations + EXTRA_LOCAL_OPERATIONS;
1226     local_operations = MIN(local_operations, UINT28_MAX);
1227     cfg.put(CFG_TUP_OP_RECS, local_operations);
1228 
1229     cfg.put(CFG_TUP_PAGE,
1230 	    noOfDataPages);
1231 
1232     cfg.put(CFG_TUP_TABLE,
1233 	    noOfMetaTables);
1234 
1235     cfg.put(CFG_TUP_STORED_PROC,
1236 	    noOfLocalScanRecords);
1237   }
1238 
1239   {
1240     /**
1241      * Tux Size Alt values
1242      */
1243     cfg.put(CFG_TUX_INDEX,
1244 	    noOfMetaTables /*noOfOrderedIndexes*/);
1245 
1246     cfg.put(CFG_TUX_FRAGMENT,
1247 	    NO_OF_FRAG_PER_NODE * noOfOrderedIndexes * noOfReplicas);
1248 
1249     cfg.put(CFG_TUX_ATTRIBUTE,
1250 	    noOfOrderedIndexes * 4);
1251 
1252     cfg.put(CFG_TUX_SCAN_OP, noOfLocalScanRecords);
1253   }
1254 
1255   require(cfg.commit(true));
1256   m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues();
1257   m_ownConfigIterator = ndb_mgm_create_configuration_iterator
1258     (m_ownConfig, 0);
1259 }
1260 
1261 void
setAllRealtimeScheduler()1262 Configuration::setAllRealtimeScheduler()
1263 {
1264   Uint32 i;
1265   for (i = 0; i < threadInfo.size(); i++)
1266   {
1267     if (threadInfo[i].type != NotInUse)
1268     {
1269       if (setRealtimeScheduler(threadInfo[i].pThread,
1270                                threadInfo[i].type,
1271                                _realtimeScheduler,
1272                                FALSE))
1273         return;
1274     }
1275   }
1276 }
1277 
1278 void
setAllLockCPU(bool exec_thread)1279 Configuration::setAllLockCPU(bool exec_thread)
1280 {
1281   Uint32 i;
1282   for (i = 0; i < threadInfo.size(); i++)
1283   {
1284     if (threadInfo[i].type == NotInUse)
1285       continue;
1286 
1287     bool run =
1288       (exec_thread && threadInfo[i].type == BlockThread) ||
1289       (!exec_thread && threadInfo[i].type != BlockThread);
1290 
1291     if (run)
1292     {
1293       setLockCPU(threadInfo[i].pThread, threadInfo[i].type);
1294     }
1295   }
1296 }
1297 
1298 int
setRealtimeScheduler(NdbThread * pThread,enum ThreadTypes type,bool real_time,bool init)1299 Configuration::setRealtimeScheduler(NdbThread* pThread,
1300                                     enum ThreadTypes type,
1301                                     bool real_time,
1302                                     bool init)
1303 {
1304   /*
1305     We ignore thread characteristics on platforms where we cannot
1306     determine the thread id.
1307   */
1308   if (!init || real_time)
1309   {
1310     int error_no;
1311     bool high_prio = !((type == BlockThread) ||
1312                        (type == ReceiveThread) ||
1313                        (type == SendThread));
1314     if ((error_no = NdbThread_SetScheduler(pThread, real_time, high_prio)))
1315     {
1316       //Warning, no permission to set scheduler
1317       if (init)
1318       {
1319         g_eventLogger->info("Failed to set real-time prio on tid = %d,"
1320                             " error_no = %d",
1321                             NdbThread_GetTid(pThread), error_no);
1322         abort(); /* Fail on failures at init */
1323       }
1324       return 1;
1325     }
1326     else if (init)
1327     {
1328       g_eventLogger->info("Successfully set real-time prio on tid = %d",
1329                           NdbThread_GetTid(pThread));
1330     }
1331   }
1332   return 0;
1333 }
1334 
1335 int
setLockCPU(NdbThread * pThread,enum ThreadTypes type)1336 Configuration::setLockCPU(NdbThread * pThread,
1337                           enum ThreadTypes type)
1338 {
1339   int res = 0;
1340   if (type != BlockThread &&
1341       type != SendThread &&
1342       type != ReceiveThread)
1343   {
1344     if (type == NdbfsThread)
1345     {
1346       /*
1347        * NdbfsThread (IO threads).
1348        */
1349       res = m_thr_config.do_bind_io(pThread);
1350     }
1351     else
1352     {
1353       /*
1354        * WatchDogThread, SocketClientThread, SocketServerThread
1355        */
1356       res = m_thr_config.do_bind_watchdog(pThread);
1357     }
1358   }
1359   else if (!NdbIsMultiThreaded())
1360   {
1361     BlockNumber list[] = { DBDIH };
1362     res = m_thr_config.do_bind(pThread, list, 1);
1363   }
1364 
1365   if (res != 0)
1366   {
1367     if (res > 0)
1368     {
1369       g_eventLogger->info("Locked tid = %d to CPU ok",
1370                           NdbThread_GetTid(pThread));
1371       return 0;
1372     }
1373     else
1374     {
1375       g_eventLogger->info("Failed to lock tid = %d to CPU, error_no = %d",
1376                           NdbThread_GetTid(pThread), (-res));
1377 #ifndef HAVE_MAC_OS_X_THREAD_INFO
1378       abort(); /* We fail when failing to lock to CPUs */
1379 #endif
1380       return 1;
1381     }
1382   }
1383 
1384   return 0;
1385 }
1386 
1387 int
setThreadPrio(NdbThread * pThread,enum ThreadTypes type)1388 Configuration::setThreadPrio(NdbThread * pThread,
1389                              enum ThreadTypes type)
1390 {
1391   int res = 0;
1392   unsigned thread_prio = 0;
1393   if (type != BlockThread &&
1394       type != SendThread &&
1395       type != ReceiveThread)
1396   {
1397     if (type == NdbfsThread)
1398     {
1399       /*
1400        * NdbfsThread (IO threads).
1401        */
1402       res = m_thr_config.do_thread_prio_io(pThread, thread_prio);
1403     }
1404     else
1405     {
1406       /*
1407        * WatchDogThread, SocketClientThread, SocketServerThread
1408        */
1409       res = m_thr_config.do_thread_prio_watchdog(pThread, thread_prio);
1410     }
1411   }
1412   else if (!NdbIsMultiThreaded())
1413   {
1414     BlockNumber list[] = { DBDIH };
1415     res = m_thr_config.do_thread_prio(pThread, list, 1, thread_prio);
1416   }
1417 
1418   if (res != 0)
1419   {
1420     if (res > 0)
1421     {
1422       g_eventLogger->info("Set thread prio to %u for tid: %d ok",
1423                           thread_prio, NdbThread_GetTid(pThread));
1424       return 0;
1425     }
1426     else
1427     {
1428       g_eventLogger->info("Failed to set thread prio to %u for tid: %d,"
1429                           " error_no = %d",
1430                           thread_prio,
1431                           NdbThread_GetTid(pThread),
1432                           (-res));
1433       abort(); /* We fail when failing to set thread prio */
1434       return 1;
1435     }
1436   }
1437   return 0;
1438 }
1439 
1440 bool
get_io_real_time() const1441 Configuration::get_io_real_time() const
1442 {
1443   return m_thr_config.do_get_realtime_io();
1444 }
1445 
1446 const char*
get_type_string(enum ThreadTypes type)1447 Configuration::get_type_string(enum ThreadTypes type)
1448 {
1449   const char *type_str;
1450   switch (type)
1451   {
1452     case WatchDogThread:
1453       type_str = "WatchDogThread";
1454       break;
1455     case SocketServerThread:
1456       type_str = "SocketServerThread";
1457       break;
1458     case SocketClientThread:
1459       type_str = "SocketClientThread";
1460       break;
1461     case NdbfsThread:
1462       type_str = "NdbfsThread";
1463       break;
1464     case BlockThread:
1465       type_str = "BlockThread";
1466       break;
1467     case SendThread:
1468       type_str = "SendThread";
1469       break;
1470     case ReceiveThread:
1471       type_str = "ReceiveThread";
1472       break;
1473     default:
1474       type_str = NULL;
1475       abort();
1476   }
1477   return type_str;
1478 }
1479 
1480 Uint32
addThread(struct NdbThread * pThread,enum ThreadTypes type,bool single_threaded)1481 Configuration::addThread(struct NdbThread* pThread,
1482                          enum ThreadTypes type,
1483                          bool single_threaded)
1484 {
1485   const char *type_str;
1486   Uint32 i;
1487   NdbMutex_Lock(threadIdMutex);
1488   for (i = 0; i < threadInfo.size(); i++)
1489   {
1490     if (threadInfo[i].type == NotInUse)
1491       break;
1492   }
1493   if (i == threadInfo.size())
1494   {
1495     struct ThreadInfo tmp;
1496     threadInfo.push_back(tmp);
1497   }
1498   threadInfo[i].pThread = pThread;
1499   threadInfo[i].type = type;
1500   NdbMutex_Unlock(threadIdMutex);
1501 
1502   type_str = get_type_string(type);
1503 
1504   bool real_time;
1505   if (single_threaded)
1506   {
1507     setRealtimeScheduler(pThread, type, _realtimeScheduler, TRUE);
1508   }
1509   else if (type == WatchDogThread ||
1510            type == SocketClientThread ||
1511            type == SocketServerThread ||
1512            type == NdbfsThread)
1513   {
1514     if (type != NdbfsThread)
1515     {
1516       /**
1517        * IO threads are handled internally in NDBFS with
1518        * regard to setting real time properties on the
1519        * IO thread.
1520        *
1521        * WatchDog, SocketServer and SocketClient have no
1522        * special handling of real-time breaks since we
1523        * don't expect these threads to long without
1524        * breaks.
1525        */
1526       real_time = m_thr_config.do_get_realtime_wd();
1527       setRealtimeScheduler(pThread, type, real_time, TRUE);
1528     }
1529     /**
1530      * main threads are set in ThreadConfig::ipControlLoop
1531      * as it's handled differently with mt
1532      */
1533     ndbout_c("Started thread, index = %u, id = %d, type = %s",
1534              i,
1535              NdbThread_GetTid(pThread),
1536              type_str);
1537     setLockCPU(pThread, type);
1538   }
1539   /**
1540    * All other thread types requires special handling of real-time
1541    * property which is handled in the thread itself for multithreaded
1542    * nbdmtd process.
1543    */
1544   return i;
1545 }
1546 
1547 void
removeThread(struct NdbThread * pThread)1548 Configuration::removeThread(struct NdbThread *pThread)
1549 {
1550   NdbMutex_Lock(threadIdMutex);
1551   for (Uint32 i = 0; i < threadInfo.size(); i++)
1552   {
1553     if (threadInfo[i].pThread == pThread)
1554     {
1555       threadInfo[i].pThread = 0;
1556       threadInfo[i].type = NotInUse;
1557       break;
1558     }
1559   }
1560   NdbMutex_Unlock(threadIdMutex);
1561 }
1562 
1563 void
yield_main(Uint32 index,bool start)1564 Configuration::yield_main(Uint32 index, bool start)
1565 {
1566   if (_realtimeScheduler)
1567   {
1568     if (start)
1569       setRealtimeScheduler(threadInfo[index].pThread,
1570                            threadInfo[index].type,
1571                            FALSE,
1572                            FALSE);
1573     else
1574       setRealtimeScheduler(threadInfo[index].pThread,
1575                            threadInfo[index].type,
1576                            TRUE,
1577                            FALSE);
1578   }
1579 }
1580 
1581 void
initThreadArray()1582 Configuration::initThreadArray()
1583 {
1584   NdbMutex_Lock(threadIdMutex);
1585   for (Uint32 i = 0; i < threadInfo.size(); i++)
1586   {
1587     threadInfo[i].pThread = 0;
1588     threadInfo[i].type = NotInUse;
1589   }
1590   NdbMutex_Unlock(threadIdMutex);
1591 }
1592 
1593 template class Vector<struct ThreadInfo>;
1594 
1595