1 /*
2    Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "thrman.hpp"
26 #include <mt.hpp>
27 #include <signaldata/DbinfoScan.hpp>
28 #include <signaldata/Sync.hpp>
29 #include <signaldata/DumpStateOrd.hpp>
30 
31 #include <EventLogger.hpp>
32 #include <NdbSpin.h>
33 
34 #define JAM_FILE_ID 440
35 
36 #define MAIN_THRMAN_INSTANCE 1
37 #define NUM_MEASUREMENTS 20
38 #define NUM_MEASUREMENT_RECORDS (3 * NUM_MEASUREMENTS)
39 
40 static NdbMutex *g_freeze_mutex = 0;
41 static NdbCondition *g_freeze_condition = 0;
42 static Uint32 g_freeze_waiters = 0;
43 static bool g_freeze_wakeup = 0;
44 
45 //#define DEBUG_SPIN 1
46 #ifdef DEBUG_SPIN
47 #define DEB_SPIN(arglist) do { g_eventLogger->info arglist ; } while (0)
48 #else
49 #define DEB_SPIN(arglist) do { } while (0)
50 #endif
51 
52 //define HIGH_DEBUG_CPU_USAGE 1
53 //#define DEBUG_CPU_USAGE 1
54 extern EventLogger * g_eventLogger;
55 
Thrman(Block_context & ctx,Uint32 instanceno)56 Thrman::Thrman(Block_context & ctx, Uint32 instanceno) :
57   SimulatedBlock(THRMAN, ctx, instanceno),
58   c_next_50ms_measure(c_measurementRecordPool),
59   c_next_1sec_measure(c_measurementRecordPool),
60   c_next_20sec_measure(c_measurementRecordPool)
61 {
62   BLOCK_CONSTRUCTOR(Thrman);
63 
64   if (g_freeze_mutex == 0)
65   {
66     g_freeze_mutex = NdbMutex_Create();
67     g_freeze_condition = NdbCondition_Create();
68   }
69   addRecSignal(GSN_DBINFO_SCANREQ, &Thrman::execDBINFO_SCANREQ);
70   addRecSignal(GSN_CONTINUEB, &Thrman::execCONTINUEB);
71   addRecSignal(GSN_GET_CPU_USAGE_REQ, &Thrman::execGET_CPU_USAGE_REQ);
72   addRecSignal(GSN_OVERLOAD_STATUS_REP, &Thrman::execOVERLOAD_STATUS_REP);
73   addRecSignal(GSN_NODE_OVERLOAD_STATUS_ORD, &Thrman::execNODE_OVERLOAD_STATUS_ORD);
74   addRecSignal(GSN_READ_CONFIG_REQ, &Thrman::execREAD_CONFIG_REQ);
75   addRecSignal(GSN_SEND_THREAD_STATUS_REP, &Thrman::execSEND_THREAD_STATUS_REP);
76   addRecSignal(GSN_SET_WAKEUP_THREAD_ORD, &Thrman::execSET_WAKEUP_THREAD_ORD);
77   addRecSignal(GSN_WAKEUP_THREAD_ORD, &Thrman::execWAKEUP_THREAD_ORD);
78   addRecSignal(GSN_SEND_WAKEUP_THREAD_ORD, &Thrman::execSEND_WAKEUP_THREAD_ORD);
79   addRecSignal(GSN_FREEZE_THREAD_REQ, &Thrman::execFREEZE_THREAD_REQ);
80   addRecSignal(GSN_FREEZE_ACTION_CONF, &Thrman::execFREEZE_ACTION_CONF);
81   addRecSignal(GSN_STTOR, &Thrman::execSTTOR);
82   addRecSignal(GSN_MEASURE_WAKEUP_TIME_ORD, &Thrman::execMEASURE_WAKEUP_TIME_ORD);
83   addRecSignal(GSN_DUMP_STATE_ORD, &Thrman::execDUMP_STATE_ORD);
84 
85   m_enable_adaptive_spinning = false;
86   m_allowed_spin_overhead = 130;
87   m_phase2_done = false;
88   m_is_idle = true;
89 }
90 
~Thrman()91 Thrman::~Thrman()
92 {
93   if (g_freeze_mutex != 0)
94   {
95     NdbMutex_Destroy(g_freeze_mutex);
96     NdbCondition_Destroy(g_freeze_condition);
97     g_freeze_mutex = 0;
98     g_freeze_condition = 0;
99     g_freeze_waiters = 0;
100     g_freeze_wakeup = false;
101   }
102 }
103 
BLOCK_FUNCTIONS(Thrman)104 BLOCK_FUNCTIONS(Thrman)
105 
106 void Thrman::mark_measurements_not_done()
107 {
108   MeasurementRecordPtr measurePtr;
109   jam();
110   c_next_50ms_measure.first(measurePtr);
111   while (measurePtr.i != RNIL)
112   {
113     measurePtr.p->m_first_measure_done = false;
114     c_next_50ms_measure.next(measurePtr);
115   }
116   c_next_1sec_measure.first(measurePtr);
117   while (measurePtr.i != RNIL)
118   {
119     measurePtr.p->m_first_measure_done = false;
120     c_next_1sec_measure.next(measurePtr);
121   }
122   c_next_20sec_measure.first(measurePtr);
123   while (measurePtr.i != RNIL)
124   {
125     measurePtr.p->m_first_measure_done = false;
126     c_next_20sec_measure.next(measurePtr);
127   }
128 }
129 
130 void
set_configured_spintime(Uint32 val,bool specific)131 Thrman::set_configured_spintime(Uint32 val, bool specific)
132 {
133   if (!NdbSpin_is_supported())
134   {
135     return;
136   }
137   if (val > MAX_SPIN_TIME)
138   {
139     if (specific ||
140         instance() == MAIN_THRMAN_INSTANCE)
141     {
142       g_eventLogger->info("(%u)Attempt to set spintime > 500 not possible",
143                           instance());
144     }
145     return;
146   }
147   g_eventLogger->info("(%u)Setting spintime to %u",
148                        instance(),
149                        val);
150 
151   m_configured_spintime = val;
152   if (val == 0)
153   {
154     jam();
155     setSpintime(val);
156     return;
157   }
158   else if (!m_enable_adaptive_spinning)
159   {
160     jam();
161     setSpintime(val);
162   }
163 }
164 
165 void
set_allowed_spin_overhead(Uint32 val)166 Thrman::set_allowed_spin_overhead(Uint32 val)
167 {
168   if (val > MAX_SPIN_OVERHEAD)
169   {
170     if (instance() == MAIN_THRMAN_INSTANCE)
171     {
172       g_eventLogger->info("AllowedSpinOverhead is max 10000");
173     }
174     return;
175   }
176   Uint32 add_val = 0;
177   if (val > 100)
178   {
179     add_val = val - 100;
180     val = 100;
181   }
182   /**
183    * At low allowed spin overhead it makes more sense to spend time
184    * spinning in recv thread since we have many more wakeups that can
185    * gain from spinning in this thread.
186    *
187    * As we increase the allowed spin overhead we will have more and more
188    * benefits of spinning also in TC threads.
189    *
190    * At very high allowed overhead it becomes essential to also grab the
191    * wait states in the LDM threads and thus give back the allowed
192    * overhead to them.
193    */
194   if (m_recv_thread)
195   {
196     jam();
197     val *= 3;
198     val /= 2;
199     add_val *= 8;
200     add_val /= 10;
201     m_allowed_spin_overhead = val + add_val + 150;
202   }
203   else if (m_tc_thread)
204   {
205     jam();
206     add_val *= 9;
207     add_val /= 10;
208     m_allowed_spin_overhead = val + add_val + 140;
209   }
210   else if (m_ldm_thread)
211   {
212     jam();
213     val *= 2;
214     val /= 3;
215     add_val *= 12;
216     add_val /= 10;
217     m_allowed_spin_overhead = val + add_val + 120;
218   }
219   else
220   {
221     jam();
222     m_allowed_spin_overhead = val + 130;
223   }
224   g_eventLogger->debug("(%u) Setting AllowedSpinOverhead to %u",
225                        instance(),
226                        m_allowed_spin_overhead);
227 }
228 
229 void
set_enable_adaptive_spinning(bool val)230 Thrman::set_enable_adaptive_spinning(bool val)
231 {
232   m_enable_adaptive_spinning = val;
233   setSpintime(m_configured_spintime);
234   if (instance() == MAIN_THRMAN_INSTANCE)
235   {
236     g_eventLogger->info("(%u) %s adaptive spinning",
237                         instance(),
238                         val ? "Enable" : "Disable");
239   }
240 }
241 
242 void
set_spintime_per_call(Uint32 val)243 Thrman::set_spintime_per_call(Uint32 val)
244 {
245   if (instance() == MAIN_THRMAN_INSTANCE)
246   {
247     if (val < MIN_SPINTIME_PER_CALL || val > MAX_SPINTIME_PER_CALL)
248     {
249       g_eventLogger->info("SpintimePerCall can only be set between"
250                           " 300 and 8000");
251       return;
252     }
253     NdbSpin_Change(val);
254     g_eventLogger->info("SpintimePerCall set to %u", val);
255   }
256 }
257 
execREAD_CONFIG_REQ(Signal * signal)258 void Thrman::execREAD_CONFIG_REQ(Signal *signal)
259 {
260   jamEntry();
261 
262   /* Receive signal */
263   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
264   Uint32 ref = req->senderRef;
265   Uint32 senderData = req->senderData;
266 
267   m_thread_name = getThreadName();
268   m_recv_thread = false;
269   m_ldm_thread = false;
270   m_tc_thread = false;
271   m_spin_time_change_count = 0;
272   if (strcmp(m_thread_name, "recv") == 0)
273   {
274     m_recv_thread = true;
275   }
276   if (strcmp(m_thread_name, "tc") == 0)
277   {
278     m_tc_thread = true;
279   }
280   if (strcmp(m_thread_name, "ldm") == 0)
281   {
282     m_ldm_thread = true;
283   }
284   m_thread_description = getThreadDescription();
285   m_send_thread_name = "send";
286   m_send_thread_description = "Send thread";
287   m_enable_adaptive_spinning = false;
288 
289   if (NdbSpin_is_supported())
290   {
291     const char *conf = 0;
292     Uint32 val = 0;
293     const ndb_mgm_configuration_iterator * p =
294       m_ctx.m_config.getOwnConfigIterator();
295     ndbrequire(p != 0);
296     if (!ndb_mgm_get_string_parameter(p, CFG_DB_SPIN_METHOD, &conf))
297     {
298       jam();
299       if (native_strcasecmp(conf, "staticspinning"))
300       {
301         if (instance() == MAIN_THRMAN_INSTANCE)
302         {
303           g_eventLogger->info("Using StaticSpinning according to spintime"
304                               " configuration");
305         }
306       }
307       else if (native_strcasecmp(conf, "costbasedspinning"))
308       {
309         if (instance() == MAIN_THRMAN_INSTANCE)
310         {
311           g_eventLogger->info("Using CostBasedSpinning with max spintime = 100"
312                               " and allowed spin overhead 70 percent");
313         }
314         val = 200;
315         m_enable_adaptive_spinning = true;
316         m_configured_spintime = 100;
317       }
318       else if (native_strcasecmp(conf, "latencyoptimisedspinning"))
319       {
320         if (instance() == MAIN_THRMAN_INSTANCE)
321         {
322           g_eventLogger->info("Using LatencyOptimisedSpinning with max"
323                               " spintime = 200 and allowed spin"
324                               " overhead 1000 percent");
325         }
326         val = 1000;
327         m_enable_adaptive_spinning = true;
328         m_configured_spintime = 200;
329       }
330       else if (native_strcasecmp(conf, "databasemachinespinning"))
331       {
332         if (instance() == MAIN_THRMAN_INSTANCE)
333         {
334           g_eventLogger->info("Using DatabaseMachineSpinning with max"
335                               " spintime = 500 and"
336                               " allowed spin overhead 10000 percent");
337         }
338         val = 10000;
339         m_enable_adaptive_spinning = true;
340         m_configured_spintime = MAX_SPIN_TIME;
341       }
342       else
343       {
344         g_eventLogger->info("SpinMethod set to %s, ignored this use either "
345                             "StaticSpinning, CostBasedSpinning, "
346                             "AggressiveSpinning or DatabaseMachineSpinning"
347                             ", falling back to default StaticSpinning",
348                             conf);
349       }
350     }
351     else
352     {
353       m_enable_adaptive_spinning = false;
354     }
355     /**
356      * A spin overhead of 0% means that we will spin if it costs 30% more CPU
357      * to gain the earned latency. For example if we by spinning 1300 us can
358      * gain 1000 us in latency we will always treat this as something we
359      * consider as no overhead at all. The reason is that we while spinning
360      * don't use the CPU at full speed, thus other hyperthreads or other CPU
361      * cores will have more access to CPU core parts and to the memory
362      * subsystem in the CPU.
363      * By default we will even spend an extra 70% of CPU overhead to gain
364      * the desired latency gains.
365      *
366      * Most of the long work is done by the LDM threads. These threads work
367      * for a longer time. The receive thread and the TC threads usually
368      * handle small execution times, but very many of them. This means
369      * that for spinning it is more useful to spin on the recv threads
370      * and on the tc threads.
371      *
372      * What this means is that most of the overhead that the user have
373      * configured will be used for spinning in the recv thread and tc
374      * threads.
375      *
376      * High overhead we treat a bit different. Since most gain for small
377      * overhead comes from receive thread and tc thread, the gain with
378      * high overhead instead comes from the ldm thread. So we give the
379      * ldm thread higher weight for high overhead values. The highest
380      * overhead configurable is 800 and will give the allowed spin overhead
381      * for recv thread to be 860, for tc thread it will 870 and for
382      * the ldm thread it will be 1010.
383      *
384      * This high overhead I will refer to as the database machine mode.
385      * It means that we expect the OS to not have to be involved in the
386      * database thread operation, thus running more or less 100% load
387      * even at low concurrency is ok, this mode also requires setting
388      * the SchedulerSpinTimer to its maximum value 500.
389      */
390     set_allowed_spin_overhead(val);
391   }
392 
393   /**
394    * Allocate the 60 records needed for 3 lists with 20 measurements in each
395    * list. We keep track of the last second with high resolution of 50 millis
396    * between each measurement, we also keep track of longer back for 20
397    * seconds where we have one measurement per second and finally we also
398    * keep track of long-term statistics going back more than 6 minutes.
399    * We could go back longer or have a higher resolution, but at the moment
400    * it seems a bit unnecessary. We could go back further if we are going to
401    * implement approaches more based on statistics and also finding patterns
402    * of change.
403    */
404   m_num_send_threads = getNumSendThreads();
405   m_num_threads = getNumThreads();
406 
407   c_measurementRecordPool.setSize(NUM_MEASUREMENT_RECORDS);
408   if (instance() == MAIN_THRMAN_INSTANCE)
409   {
410     jam();
411     c_sendThreadRecordPool.setSize(m_num_send_threads);
412     c_sendThreadMeasurementPool.setSize(NUM_MEASUREMENT_RECORDS *
413                                         m_num_send_threads);
414   }
415   else
416   {
417     jam();
418     c_sendThreadRecordPool.setSize(0);
419     c_sendThreadMeasurementPool.setSize(0);
420   }
421 
422   /* Create the 3 lists with 20 records in each. */
423   MeasurementRecordPtr measurePtr;
424   for (Uint32 i = 0; i < NUM_MEASUREMENTS; i++)
425   {
426     jam();
427     c_measurementRecordPool.seize(measurePtr);
428     measurePtr.p = new (measurePtr.p) MeasurementRecord();
429     c_next_50ms_measure.addFirst(measurePtr);
430     c_measurementRecordPool.seize(measurePtr);
431     measurePtr.p = new (measurePtr.p) MeasurementRecord();
432     c_next_1sec_measure.addFirst(measurePtr);
433     c_measurementRecordPool.seize(measurePtr);
434     measurePtr.p = new (measurePtr.p) MeasurementRecord();
435     c_next_20sec_measure.addFirst(measurePtr);
436   }
437   if (instance() == MAIN_THRMAN_INSTANCE)
438   {
439     jam();
440     for (Uint32 send_instance = 0;
441          send_instance < m_num_send_threads;
442          send_instance++)
443     {
444       jam();
445       SendThreadPtr sendThreadPtr;
446       c_sendThreadRecordPool.seizeId(sendThreadPtr, send_instance);
447       sendThreadPtr.p = new (sendThreadPtr.p) SendThreadRecord();
448       sendThreadPtr.p->m_send_thread_50ms_measurements.init();
449       sendThreadPtr.p->m_send_thread_1sec_measurements.init();
450       sendThreadPtr.p->m_send_thread_20sec_measurements.init();
451 
452       for (Uint32 i = 0; i < NUM_MEASUREMENTS; i++)
453       {
454         jam();
455         SendThreadMeasurementPtr sendThreadMeasurementPtr;
456 
457         c_sendThreadMeasurementPool.seize(sendThreadMeasurementPtr);
458         sendThreadMeasurementPtr.p =
459             new (sendThreadMeasurementPtr.p) SendThreadMeasurement();
460         {
461           jam();
462           Local_SendThreadMeasurement_fifo list_50ms(
463             c_sendThreadMeasurementPool,
464             sendThreadPtr.p->m_send_thread_50ms_measurements);
465           list_50ms.addFirst(sendThreadMeasurementPtr);
466         }
467 
468         c_sendThreadMeasurementPool.seize(sendThreadMeasurementPtr);
469         sendThreadMeasurementPtr.p =
470             new (sendThreadMeasurementPtr.p) SendThreadMeasurement();
471         {
472           jam();
473           Local_SendThreadMeasurement_fifo list_1sec(
474             c_sendThreadMeasurementPool,
475             sendThreadPtr.p->m_send_thread_1sec_measurements);
476           list_1sec.addFirst(sendThreadMeasurementPtr);
477         }
478 
479         c_sendThreadMeasurementPool.seize(sendThreadMeasurementPtr);
480         sendThreadMeasurementPtr.p =
481             new (sendThreadMeasurementPtr.p) SendThreadMeasurement();
482         {
483           jam();
484           Local_SendThreadMeasurement_fifo list_20sec(
485             c_sendThreadMeasurementPool,
486             sendThreadPtr.p->m_send_thread_20sec_measurements);
487           list_20sec.addFirst(sendThreadMeasurementPtr);
488         }
489       }
490     }
491   }
492 
493   mark_measurements_not_done();
494   /* Send return signal */
495   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
496   conf->senderRef = reference();
497   conf->senderData = senderData;
498   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
499              ReadConfigConf::SignalLength, JBB);
500 }
501 
502 void
execSTTOR(Signal * signal)503 Thrman::execSTTOR(Signal *signal)
504 {
505   int res;
506   jamEntry();
507 
508   const Uint32 startPhase  = signal->theData[1];
509 
510   switch (startPhase)
511   {
512   case 1:
513     jam();
514     memset(&m_last_50ms_base_measure, 0, sizeof(m_last_50ms_base_measure));
515     memset(&m_last_1sec_base_measure, 0, sizeof(m_last_1sec_base_measure));
516     memset(&m_last_20sec_base_measure, 0, sizeof(m_last_20sec_base_measure));
517     memset(&m_last_50ms_base_measure, 0, sizeof(m_last_50ms_rusage));
518     memset(&m_last_1sec_base_measure, 0, sizeof(m_last_1sec_rusage));
519     memset(&m_last_20sec_base_measure, 0, sizeof(m_last_20sec_rusage));
520     prev_50ms_tick = NdbTick_getCurrentTicks();
521     prev_20sec_tick = prev_50ms_tick;
522     prev_1sec_tick = prev_50ms_tick;
523     m_configured_spintime = getConfiguredSpintime();
524     m_current_spintime = 0;
525     m_gain_spintime_in_us = 25;
526 
527     /* Initialise overload control variables */
528     m_shared_environment = false;
529     m_overload_handling_activated = false;
530     m_current_overload_status = (OverloadStatus)LIGHT_LOAD_CONST;
531     m_warning_level = 0;
532     m_max_warning_level = 20;
533     m_burstiness = 0;
534     m_current_decision_stats = &c_1sec_stats;
535     m_send_thread_percentage = 0;
536     m_node_overload_level = 0;
537 
538     for (Uint32 i = 0; i < MAX_BLOCK_THREADS + 1; i++)
539     {
540       m_thread_overload_status[i].overload_status =
541         (OverloadStatus)MEDIUM_LOAD_CONST;
542       m_thread_overload_status[i].wakeup_instance = 0;
543     }
544 
545     /* Initialise measurements */
546     res = Ndb_GetRUsage(&m_last_50ms_rusage, false);
547     if (res == 0)
548     {
549       jam();
550       m_last_1sec_rusage = m_last_50ms_rusage;
551       m_last_20sec_rusage = m_last_50ms_rusage;
552     }
553     getPerformanceTimers(m_last_50ms_base_measure.m_sleep_time_thread,
554                          m_last_50ms_base_measure.m_spin_time_thread,
555                          m_last_50ms_base_measure.m_buffer_full_time_thread,
556                          m_last_50ms_base_measure.m_send_time_thread);
557     m_last_1sec_base_measure = m_last_50ms_base_measure;
558     m_last_20sec_base_measure = m_last_50ms_base_measure;
559 
560     if (instance() == MAIN_THRMAN_INSTANCE)
561     {
562       jam();
563       for (Uint32 send_instance = 0;
564            send_instance < m_num_send_threads;
565            send_instance++)
566       {
567         jam();
568         SendThreadPtr sendThreadPtr;
569         c_sendThreadRecordPool.getPtr(sendThreadPtr, send_instance);
570         Uint64 send_exec_time;
571         Uint64 send_sleep_time;
572         Uint64 send_spin_time;
573         Uint64 send_user_time_os;
574         Uint64 send_kernel_time_os;
575         Uint64 send_elapsed_time_os;
576         getSendPerformanceTimers(send_instance,
577                                  send_exec_time,
578                                  send_sleep_time,
579                                  send_spin_time,
580                                  send_user_time_os,
581                                  send_kernel_time_os,
582                                  send_elapsed_time_os);
583 
584         sendThreadPtr.p->m_last_50ms_send_thread_measure.m_exec_time =
585           send_exec_time;
586         sendThreadPtr.p->m_last_50ms_send_thread_measure.m_sleep_time =
587           send_sleep_time;
588         sendThreadPtr.p->m_last_50ms_send_thread_measure.m_spin_time =
589           send_spin_time;
590         sendThreadPtr.p->m_last_50ms_send_thread_measure.m_user_time_os =
591           send_user_time_os;
592         sendThreadPtr.p->m_last_50ms_send_thread_measure.m_kernel_time_os =
593           send_kernel_time_os;
594         sendThreadPtr.p->m_last_50ms_send_thread_measure.m_elapsed_time_os =
595           send_elapsed_time_os;
596 
597         sendThreadPtr.p->m_last_1sec_send_thread_measure.m_exec_time =
598           send_exec_time;
599         sendThreadPtr.p->m_last_1sec_send_thread_measure.m_sleep_time =
600           send_sleep_time;
601         sendThreadPtr.p->m_last_1sec_send_thread_measure.m_spin_time =
602           send_spin_time;
603         sendThreadPtr.p->m_last_1sec_send_thread_measure.m_user_time_os =
604           send_user_time_os;
605         sendThreadPtr.p->m_last_1sec_send_thread_measure.m_kernel_time_os =
606           send_kernel_time_os;
607         sendThreadPtr.p->m_last_1sec_send_thread_measure.m_elapsed_time_os =
608           send_elapsed_time_os;
609 
610         sendThreadPtr.p->m_last_20sec_send_thread_measure.m_exec_time =
611           send_exec_time;
612         sendThreadPtr.p->m_last_20sec_send_thread_measure.m_sleep_time =
613           send_sleep_time;
614         sendThreadPtr.p->m_last_20sec_send_thread_measure.m_spin_time =
615           send_spin_time;
616         sendThreadPtr.p->m_last_20sec_send_thread_measure.m_user_time_os =
617           send_user_time_os;
618         sendThreadPtr.p->m_last_20sec_send_thread_measure.m_kernel_time_os =
619           send_kernel_time_os;
620         sendThreadPtr.p->m_last_20sec_send_thread_measure.m_elapsed_time_os =
621           send_elapsed_time_os;
622       }
623     }
624     if (instance() == MAIN_THRMAN_INSTANCE)
625     {
626       if (getNumThreads() > 1 && NdbSpin_is_supported())
627       {
628         jam();
629         measure_wakeup_time(signal, 0);
630       }
631       else
632       {
633         jam();
634         if (NdbSpin_is_supported())
635         {
636           g_eventLogger->info("Set wakeup latency to 25 microseconds in"
637                               " single thread environment");
638         }
639         setWakeupLatency(m_gain_spintime_in_us);
640         sendSTTORRY(signal, false);
641       }
642       sendNextCONTINUEB(signal, 50, ZCONTINUEB_MEASURE_CPU_USAGE);
643       sendNextCONTINUEB(signal, 10, ZCONTINUEB_CHECK_SPINTIME);
644       return;
645     }
646     else
647     {
648       sendNextCONTINUEB(signal, 50, ZCONTINUEB_MEASURE_CPU_USAGE);
649       sendNextCONTINUEB(signal, 10, ZCONTINUEB_CHECK_SPINTIME);
650       sendSTTORRY(signal, false);
651     }
652     return;
653   case 2:
654   {
655     m_gain_spintime_in_us = getWakeupLatency();
656     if (instance() == MAIN_THRMAN_INSTANCE)
657     {
658       g_eventLogger->info("Set wakeup latency to %u microseconds",
659                           m_gain_spintime_in_us);
660     }
661     set_spin_stat(0, true);
662     sendSTTORRY(signal, true);
663     return;
664   }
665   default:
666     ndbabort();
667   }
668 }
669 
670 #define NUM_WAKEUP_MEASUREMENTS 50
671 #define MAX_FAILED_WAKEUP_MEASUREMENTS 50
672 void
measure_wakeup_time(Signal * signal,Uint32 count)673 Thrman::measure_wakeup_time(Signal *signal, Uint32 count)
674 {
675   NDB_TICKS now = NdbTick_getCurrentTicks();
676   if (count != 0)
677   {
678     /* Perform measurement */
679     Uint64 nanos_wait = NdbTick_Elapsed(m_measured_wait_time, now).nanoSec();
680     DEB_SPIN(("Elapsed time was %llu nanoseconds", nanos_wait));
681     if (nanos_wait < 100000 && nanos_wait != 0)
682     {
683       /* A proper measurement */
684       m_tot_nanos_wait += nanos_wait;
685       if (count == NUM_WAKEUP_MEASUREMENTS)
686       {
687         Uint64 mean_nanos_wait = m_tot_nanos_wait / NUM_WAKEUP_MEASUREMENTS;
688         Uint64 mean_micros_wait = (mean_nanos_wait + 500) / 1000;
689         m_gain_spintime_in_us = Uint32(mean_micros_wait);
690         DEB_SPIN(("Set wakeup latency to %llu microseconds",
691                   mean_micros_wait));
692         setWakeupLatency(m_gain_spintime_in_us);
693         /**
694          * We always start with no spinning and adjust to spinning when
695          * activitity is started.
696          */
697         sendSTTORRY(signal, false);
698         return;
699       }
700       count++;
701     }
702     else
703     {
704       m_failed_wakeup_measurements++;
705       if (m_failed_wakeup_measurements >= MAX_FAILED_WAKEUP_MEASUREMENTS)
706       {
707         g_eventLogger->info("Failed to measure wakeup latency, using 25 us");
708         sendSTTORRY(signal, false);
709         return;
710       }
711     }
712     do
713     {
714       for (Uint32 i = 0; i < 20; i++)
715       {
716         NdbSpin();
717       }
718       NDB_TICKS now2 = NdbTick_getCurrentTicks();
719       Uint64 micros_wait = NdbTick_Elapsed(now, now2).microSec();
720       if (micros_wait >= 50)
721       {
722         /**
723          * We wait for 50 microseconds until next attempt to ensure
724          * that the other thread has gone to sleep properly.
725          */
726         jam();
727         break;
728       }
729     } while (1);
730   }
731   else
732   {
733     /**
734      * Starting measurement, zero total to initialise and set spintime to
735      * 1000 microseconds to ensure that we don't go to sleep until we have
736      * completed these measurements that should take around a millisecond.
737      */
738     m_tot_nanos_wait = 0;
739     setSpintime(1000);
740     count++;
741   }
742   m_measured_wait_time = NdbTick_getCurrentTicks();
743   BlockReference ref = numberToRef(THRMAN,
744                                    MAIN_THRMAN_INSTANCE + 1, // rep thread
745                                    getOwnNodeId());
746   signal->theData[0] = count;
747   signal->theData[1] = reference();
748   /* Send measure signal from main thread to rep thread and back */
749   sendSignal(ref, GSN_MEASURE_WAKEUP_TIME_ORD, signal, 2, JBB);
750   return;
751 }
752 
753 void
execMEASURE_WAKEUP_TIME_ORD(Signal * signal)754 Thrman::execMEASURE_WAKEUP_TIME_ORD(Signal *signal)
755 {
756   Uint32 count = signal->theData[0];
757   BlockReference ref = signal->theData[1];
758   if (instance() == MAIN_THRMAN_INSTANCE)
759   {
760     measure_wakeup_time(signal, count);
761     return;
762   }
763   else
764   {
765     /* Return signal immediately to sender */
766     sendSignal(ref, GSN_MEASURE_WAKEUP_TIME_ORD, signal, 2, JBB);
767   }
768 }
769 
770 void
sendSTTORRY(Signal * signal,bool phase2_done)771 Thrman::sendSTTORRY(Signal* signal, bool phase2_done)
772 {
773   m_phase2_done = phase2_done;
774   signal->theData[0] = 0;
775   signal->theData[1] = 3;
776   signal->theData[2] = 0;
777   signal->theData[3] = 1;
778   signal->theData[4] = 2;
779   signal->theData[5] = 255; // No more start phases from missra
780   BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : THRMAN_REF;
781   sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
782 }
783 
784 void
execCONTINUEB(Signal * signal)785 Thrman::execCONTINUEB(Signal *signal)
786 {
787   jamEntry();
788   Uint32 tcase = signal->theData[0];
789   switch (tcase)
790   {
791     case ZCONTINUEB_MEASURE_CPU_USAGE:
792     {
793       jam();
794       measure_cpu_usage(signal);
795       sendNextCONTINUEB(signal, 50, ZCONTINUEB_MEASURE_CPU_USAGE);
796       break;
797     }
798     case ZWAIT_ALL_STOP:
799     {
800       jam();
801       wait_all_stop(signal);
802       break;
803     }
804     case ZWAIT_ALL_START:
805     {
806       jam();
807       wait_all_start(signal);
808       break;
809     }
810     case ZCONTINUEB_CHECK_SPINTIME:
811     {
812       check_spintime(true);
813       sendNextCONTINUEB(signal, 10, ZCONTINUEB_CHECK_SPINTIME);
814       break;
815     }
816     default:
817     {
818       ndbabort();
819     }
820   }
821 }
822 
823 void
sendNextCONTINUEB(Signal * signal,Uint32 delay,Uint32 type)824 Thrman::sendNextCONTINUEB(Signal *signal, Uint32 delay, Uint32 type)
825 {
826   signal->theData[0] = type;
827   sendSignalWithDelay(reference(),
828                       GSN_CONTINUEB,
829                       signal,
830                       delay,
831                       1);
832 }
833 
834 void
update_current_wakeup_instance(Uint32 * thread_list,Uint32 num_threads_found,Uint32 & index,Uint32 & current_wakeup_instance)835 Thrman::update_current_wakeup_instance(Uint32 * thread_list,
836                                        Uint32 num_threads_found,
837                                        Uint32 & index,
838                                        Uint32 & current_wakeup_instance)
839 {
840   index++;
841   if (num_threads_found == index)
842   {
843     jam();
844     index = 0;
845   }
846   current_wakeup_instance = thread_list[index];
847 }
848 
849 /**
850  * Each block thread has a thread assigned as its wakeup thread.
851  * this thread is woken up to assist with sending data whenever
852  * there is a need to quickly get things sent from the block
853  * thread. Only block threads that are almost idle can be assigned
854  * as wakeup threads.
855  */
856 void
assign_wakeup_threads(Signal * signal,Uint32 * thread_list,Uint32 num_threads_found)857 Thrman::assign_wakeup_threads(Signal *signal,
858                               Uint32 *thread_list,
859                               Uint32 num_threads_found)
860 {
861   Uint32 index = 0;
862   Uint32 instance_no;
863   Uint32 current_wakeup_instance = thread_list[index];
864 
865   for (instance_no = 1; instance_no <= m_num_threads; instance_no++)
866   {
867     jam();
868     if (m_thread_overload_status[instance_no].overload_status ==
869         (OverloadStatus)OVERLOAD_CONST)
870     {
871       jam();
872       /* Ensure that overloaded threads don't wakeup idle threads */
873       current_wakeup_instance = 0;
874     }
875 
876     /**
877      * We don't wake ourselves up, other than that we attempt to wake up
878      * the idle thread once per 200 microseconds from each thread.
879      */
880     if (instance_no == current_wakeup_instance)
881     {
882       if (num_threads_found > 1)
883       {
884         jam();
885         update_current_wakeup_instance(thread_list,
886                                        num_threads_found,
887                                        index,
888                                        current_wakeup_instance);
889       }
890       else
891       {
892         jam();
893         current_wakeup_instance = 0;
894       }
895     }
896     if (m_thread_overload_status[instance_no].wakeup_instance !=
897         current_wakeup_instance)
898     {
899       jam();
900       sendSET_WAKEUP_THREAD_ORD(signal,
901                                 instance_no,
902                                 current_wakeup_instance);
903     }
904     update_current_wakeup_instance(thread_list,
905                                    num_threads_found,
906                                    index,
907                                    current_wakeup_instance);
908   }
909 }
910 
911 void
get_idle_block_threads(Uint32 * thread_list,Uint32 & num_threads_found)912 Thrman::get_idle_block_threads(Uint32 *thread_list, Uint32 & num_threads_found)
913 {
914   /**
915    * We never use more than 4 threads as idle threads. It's highly unlikely
916    * that making use of more idle threads than this for sending is going to
917    * be worthwhile. By starting the search from 1 we will always find the most
918    * common idle threads, the main thread and the rep thread which are instance
919    * 1 and 2.
920    */
921   Uint32 instance_no;
922   for (instance_no = 1; instance_no <= m_num_threads; instance_no++)
923   {
924     if (m_thread_overload_status[instance_no].overload_status ==
925         (OverloadStatus)LIGHT_LOAD_CONST)
926     {
927       thread_list[num_threads_found] = instance_no;
928       num_threads_found++;
929       if (num_threads_found == 4)
930         return;
931     }
932   }
933 }
934 
935 /**
936  * Every time we decide to change the overload level we report this back to
937  * the main thread that contains the global state.
938  *
939  * This signal is only executed by main thread.
940  */
941 void
execOVERLOAD_STATUS_REP(Signal * signal)942 Thrman::execOVERLOAD_STATUS_REP(Signal *signal)
943 {
944   Uint32 thr_no = signal->theData[0];
945   Uint32 overload_status = signal->theData[1];
946   m_thread_overload_status[thr_no].overload_status = (OverloadStatus)overload_status;
947 
948   Uint32 node_overload_level = 0;
949   for (Uint32 instance_no = 1; instance_no <= m_num_threads; instance_no++)
950   {
951     if (m_thread_overload_status[instance_no].overload_status >=
952         (OverloadStatus)MEDIUM_LOAD_CONST)
953     {
954       node_overload_level = 1;
955     }
956   }
957   if (node_overload_level == m_node_overload_level)
958   {
959     jam();
960     m_node_overload_level = node_overload_level;
961     signal->theData[0] = node_overload_level;
962     for (Uint32 instance_no = 1; instance_no <= m_num_threads; instance_no++)
963     {
964       BlockReference ref = numberToRef(THRMAN,
965                                        instance_no,
966                                        getOwnNodeId());
967       sendSignal(ref, GSN_NODE_OVERLOAD_STATUS_ORD, signal, 1, JBB);
968     }
969   }
970 
971   Uint32 num_threads_found = 0;
972   Uint32 thread_list[4];
973   get_idle_block_threads(thread_list, num_threads_found);
974   if (num_threads_found == 0)
975   {
976     jam();
977     /**
978      * No idle threads found, so we make a list of one thread with
979      * id 0 (which here means no thread). We still need to check
980      * each thread to see if they need an update of the current
981      * wakeup instance. So this means that all threads that currently
982      * have a non-zero wakeup instance will receive an order to change
983      * their wakeup instance to 0.
984      */
985     num_threads_found = 1;
986     thread_list[0] = 0;
987     return;
988   }
989   assign_wakeup_threads(signal, thread_list, num_threads_found);
990   return;
991 }
992 
993 void
execNODE_OVERLOAD_STATUS_ORD(Signal * signal)994 Thrman::execNODE_OVERLOAD_STATUS_ORD(Signal *signal)
995 {
996   jamEntry();
997   Uint32 overload_status = signal->theData[0];
998   setNodeOverloadStatus((OverloadStatus)overload_status);
999 }
1000 
1001 void
execSEND_THREAD_STATUS_REP(Signal * signal)1002 Thrman::execSEND_THREAD_STATUS_REP(Signal *signal)
1003 {
1004   jamEntry();
1005   m_send_thread_percentage = signal->theData[0];
1006   return;
1007 }
1008 
1009 void
execSEND_WAKEUP_THREAD_ORD(Signal * signal)1010 Thrman::execSEND_WAKEUP_THREAD_ORD(Signal *signal)
1011 {
1012   /**
1013    * This signal is sent directly from do_send in mt.cpp, it's
1014    * only purpose is to send a wakeup signal to another thread
1015    * to ensure that this thread is awake to execute some
1016    * send assistance to the send thread.
1017    */
1018   Uint32 wakeup_instance = signal->theData[0];
1019   BlockReference ref = numberToRef(THRMAN,
1020                                    wakeup_instance,
1021                                    getOwnNodeId());
1022   sendSignal(ref, GSN_WAKEUP_THREAD_ORD, signal, 1, JBA);
1023 }
1024 
1025 void
execWAKEUP_THREAD_ORD(Signal * signal)1026 Thrman::execWAKEUP_THREAD_ORD(Signal *signal)
1027 {
1028   /**
1029    * This signal is sent to wake the thread up. We're using the send signal
1030    * semantics to wake the thread up. So no need to execute anything, the
1031    * purpose of waking the thread has already been achieved when getting here.
1032    */
1033   return;
1034 }
1035 void
execSET_WAKEUP_THREAD_ORD(Signal * signal)1036 Thrman::execSET_WAKEUP_THREAD_ORD(Signal *signal)
1037 {
1038   Uint32 wakeup_instance = signal->theData[0];
1039   setWakeupThread(wakeup_instance);
1040 }
1041 
1042 void
sendSET_WAKEUP_THREAD_ORD(Signal * signal,Uint32 instance_no,Uint32 wakeup_instance)1043 Thrman::sendSET_WAKEUP_THREAD_ORD(Signal *signal,
1044                                   Uint32 instance_no,
1045                                   Uint32 wakeup_instance)
1046 {
1047   signal->theData[0] = wakeup_instance;
1048   BlockReference ref = numberToRef(THRMAN,
1049                                    instance_no,
1050                                    getOwnNodeId());
1051   sendSignal(ref, GSN_SET_WAKEUP_THREAD_ORD, signal, 1, JBB);
1052 }
1053 
1054 void
set_spin_stat(Uint32 spin_time,bool local_call)1055 Thrman::set_spin_stat(Uint32 spin_time, bool local_call)
1056 {
1057   ndbrequire(spin_time <= MAX_SPIN_TIME);
1058   struct ndb_spin_stat spin_stat;
1059   Uint32 used_spin_time = spin_time;
1060   setSpintime(spin_time);
1061   if (!local_call)
1062   {
1063     jam();
1064     return;
1065   }
1066   if (spin_time == 0)
1067   {
1068     /**
1069      * We set spin time to 0, but we use the measure spin time
1070      * in our measurements. This ensures that we quickly get on
1071      * track again with statistics when spin is enabled again.
1072      * We would not arrive here if configured spin time was 0 as
1073      * well.
1074      */
1075     used_spin_time = MEASURE_SPIN_TIME;
1076   }
1077   /**
1078    * We measure in steps of 50%, this gives us the possibility to
1079    * efficiently measure stepping up or stepping down spinning in
1080    * steps of 50% at a time.
1081    */
1082   Uint32 midpoint = (NUM_SPIN_INTERVALS / 2) - 1;
1083   for (Uint32 i = 0; i < NUM_SPIN_INTERVALS; i++)
1084   {
1085     Uint64 spin_time_limit = used_spin_time;
1086     if (i == (NUM_SPIN_INTERVALS - 1))
1087     {
1088       spin_time_limit = UINT32_MAX;
1089     }
1090     else if (i < midpoint)
1091     {
1092       Uint64 mult_factor = 2;
1093       Uint64 div_factor = 3;
1094       for (Uint32 j = i + 1; j < midpoint; j++)
1095       {
1096         mult_factor *= 2;
1097         div_factor *= 3;
1098       }
1099       spin_time_limit = (mult_factor * used_spin_time) / div_factor;
1100     }
1101     else if (i > midpoint)
1102     {
1103       Uint64 mult_factor = 3;
1104       Uint64 div_factor = 2;
1105       for (Uint32 j = midpoint + 1; j < i; j++)
1106       {
1107         mult_factor *= 3;
1108         div_factor *= 2;
1109       }
1110       spin_time_limit = (mult_factor * used_spin_time) / div_factor;
1111     }
1112     else
1113     {
1114       ndbrequire(i == midpoint);
1115     }
1116     spin_stat.m_spin_interval[i] = Uint32(spin_time_limit);
1117   }
1118   mt_set_spin_stat(this, &spin_stat);
1119 }
1120 
calc_new_spin(ndb_spin_stat * spin_stat)1121 Uint32 Thrman::calc_new_spin(ndb_spin_stat *spin_stat)
1122 {
1123 #ifdef DEBUG_SPIN
1124   Uint64 calc_spin_cost[NUM_SPIN_INTERVALS - 1];
1125   Uint64 calc_spin_overhead[NUM_SPIN_INTERVALS - 1];
1126   memset(calc_spin_cost, 0, sizeof(calc_spin_cost));
1127   memset(calc_spin_overhead, 0, sizeof(calc_spin_overhead));
1128 #endif
1129   Uint32 num_events = spin_stat->m_num_waits;
1130   Uint32 remaining_events = num_events;
1131 
1132   Uint32 found = 0;
1133   Uint64 min_overhead = UINT64_MAX;
1134   for (Uint32 i = 0; i < (NUM_SPIN_INTERVALS - 1); i++)
1135   {
1136     Uint32 events_in_this_slot = spin_stat->m_micros_sleep_times[i];
1137     if (events_in_this_slot == 0 ||
1138         spin_stat->m_spin_interval[i] == 0 ||
1139         spin_stat->m_spin_interval[i] > m_configured_spintime)
1140     {
1141       /**
1142        * Ignore empty slots, they will not be choosen for sure.
1143        * Also ignore slots where we measure 0 spin time.
1144        * Also ignore slots with higher spintime than what is
1145        * configured as maximum spintime.
1146        */
1147       continue;
1148     }
1149     /**
1150      * Calculate each slot as if it will become new spintime.
1151      */
1152     remaining_events -= events_in_this_slot;
1153     Uint32 num_gained_spins = num_events - remaining_events;
1154 
1155     Uint32 this_spin_cost = spin_stat->m_spin_interval[i];
1156     Uint64 gained_time_in_us = Uint64(num_gained_spins) *
1157                                  Uint64(m_gain_spintime_in_us);
1158 
1159     Uint64 spin_cost = 0;
1160     Uint32 avg_spin_cost = spin_stat->m_spin_interval[0] / 2;
1161     spin_cost += Uint64(avg_spin_cost * spin_stat->m_micros_sleep_times[0]);
1162     for (Uint32 j = 1; j <= i; j++)
1163     {
1164       Uint32 diff_time = spin_stat->m_spin_interval[j] -
1165                            spin_stat->m_spin_interval[j - 1];
1166       diff_time /= 2;
1167       avg_spin_cost = diff_time + spin_stat->m_spin_interval[j - 1];
1168       spin_cost += Uint64(avg_spin_cost * spin_stat->m_micros_sleep_times[j]);
1169     }
1170 
1171     spin_cost += Uint64(this_spin_cost * remaining_events);
1172     ndbrequire(gained_time_in_us);
1173     Uint64 spin_overhead = Uint64(1000) * spin_cost / gained_time_in_us;
1174     spin_overhead += 5;
1175     spin_overhead /= 10;
1176 
1177     if (spin_overhead <= min_overhead ||
1178         spin_overhead < Uint64(100) ||
1179         (spin_overhead < Uint64(130) &&
1180          events_in_this_slot > 1))
1181     {
1182       /**
1183        * This was the lowest overhead so far. Will be picked unless overhead
1184        * is too high. Will always be picked for i == 0.
1185        *
1186        * If there is a sufficient amount of events in this slot and we keep
1187        * the cost below 130, we will always pick this one.
1188        */
1189       min_overhead = spin_overhead;
1190       found = i;
1191     }
1192     else if (spin_overhead < Uint64(m_allowed_spin_overhead) &&
1193              events_in_this_slot > 1)
1194     {
1195       /**
1196        * This wasn't the lowest overhead so far. We will evaluate the
1197        * conditional probability of it paying off to continue from here since
1198        * we are still in the allowed range for allowed spin overhead.
1199        * Conditioned on the fact that we have to wait for at least the
1200        * already waited overhead.
1201        *
1202        * This means that we calculate the time estimated to continue spinning
1203        * before an event occurs based on that we know that we spent already
1204        * this time spinning (m_spin_interval[i - 1]). We know that i >= 1 here.
1205        *
1206        * The extra gain we get by continuing to spin until m_spin_interval[i] is
1207        * sum of gains from found + 1 to i. The extra cost is the added extra
1208        * spin time imposed on all remaining_events. The added cost is
1209        * m_spin_interval[i] - m_spin_interval[found].
1210        *
1211        * We will ignore this check if there is only a single event in this
1212        * slot. This represents a too high risk of spinning for too long if the
1213        * circumstances changes only slightly.
1214        */
1215       ndbrequire(i > 0);
1216       Uint64 extra_gain = 0;
1217       Uint64 extra_cost = 0;
1218       for (Uint32 j = found + 1; j <= i; j++)
1219       {
1220         Uint64 events_in_slot = Uint64(spin_stat->m_micros_sleep_times[j]);
1221         extra_gain += events_in_slot;
1222         Uint32 diff_time = spin_stat->m_spin_interval[j] -
1223                              spin_stat->m_spin_interval[j - 1];
1224         diff_time /= 2;
1225         Uint64 avg_spin_cost = Uint64(diff_time) +
1226           Uint64(spin_stat->m_spin_interval[j - 1] -
1227              spin_stat->m_spin_interval[found]);
1228         extra_cost += Uint64(avg_spin_cost *
1229                         spin_stat->m_micros_sleep_times[j]);
1230       }
1231       extra_gain *= Uint64(m_gain_spintime_in_us);
1232       extra_gain *= Uint64(m_allowed_spin_overhead);
1233       extra_gain /= Uint64(100);
1234       extra_cost += Uint64(remaining_events) *
1235                       Uint64(this_spin_cost -
1236                              spin_stat->m_spin_interval[found]);
1237       if (extra_gain > extra_cost)
1238       {
1239         found = i;
1240         min_overhead = spin_overhead;
1241       }
1242     }
1243 #ifdef DEBUG_SPIN
1244     calc_spin_cost[i] = spin_cost;
1245     calc_spin_overhead[i] = spin_overhead;
1246 #endif
1247   }
1248 /**
1249  * When we are already spinning, we allow for a bit more overhead to avoid
1250  * jumping in and out of spinning too often. We need at least 4 observations
1251  * to make any judgement, only 2 events in 10ms doesn't seem to imply any
1252  * need of spinning.
1253  */
1254 #define EXTRA_OVERHEAD_ALLOWED_WHEN_ALREADY_SPINNING 20
1255 #define MIN_EVENTS_TO_BE_NOT_IDLE 20
1256 
1257   Uint32 midpoint = (NUM_SPIN_INTERVALS / 2) - 1;
1258   if (num_events <= 3 ||
1259       (min_overhead > Uint64(m_allowed_spin_overhead) &&
1260        (m_current_spintime == 0 ||
1261         min_overhead >
1262          Uint64(m_allowed_spin_overhead +
1263                 EXTRA_OVERHEAD_ALLOWED_WHEN_ALREADY_SPINNING))))
1264   {
1265     /* Quickly shut down spin environment when no longer beneficial. */
1266     if (m_current_spintime != 0)
1267     {
1268       DEB_SPIN(("(%u)New spintime = 0", instance()));
1269     }
1270     m_current_spintime = 0;
1271   }
1272   else if (m_current_spintime == 0 ||
1273            m_current_spintime !=
1274              spin_stat->m_spin_interval[midpoint])
1275   {
1276     /**
1277      * Immediately adjust to new spin environment when activity starts up
1278      * from a more idle state. We also arrive here the next timeout
1279      * after a quick activation of spintime. In this case we have set
1280      * the spintime, but still haven't changed the spin intervals, so
1281      * set it directly to the found spintime.
1282      */
1283     m_current_spintime = spin_stat->m_spin_interval[found];
1284     DEB_SPIN(("(%u)New spintime = %u", instance(), m_current_spintime));
1285   }
1286   else
1287   {
1288     /**
1289      * When we are already spinning AND we want to continue spinning,
1290      * adjust change to not change the spin behaviour too fast. In this
1291      * case we are likely to be a in a more stable environment, so no
1292      * need of the very fast adaption to the environment.
1293      */
1294     if (found < midpoint)
1295     {
1296       m_current_spintime = spin_stat->m_spin_interval[midpoint - 1];
1297     }
1298     else if (found > midpoint)
1299     {
1300       m_current_spintime = spin_stat->m_spin_interval[midpoint + 1];
1301     }
1302     DEB_SPIN(("(%u)2:New spintime = %u", instance(), m_current_spintime));
1303   }
1304   if (num_events > MIN_EVENTS_TO_BE_NOT_IDLE)
1305   {
1306     jam();
1307     m_is_idle = false;
1308   }
1309   else
1310   {
1311     jam();
1312     m_is_idle = true;
1313   }
1314   /* Never select a spintime less than 2 microseconds. */
1315   if (m_current_spintime != 0 && m_current_spintime < 2)
1316   {
1317     m_current_spintime = 2;
1318   }
1319   /**
1320    * Never go beyond the configured spin time. The adaptive part can only
1321    * decrease the spinning, not increase it.
1322    */
1323   Uint32 max_spintime = m_configured_spintime;
1324   if (m_current_cpu_usage > 90)
1325   {
1326     jam();
1327     max_spintime /= 4; // 25%
1328   }
1329   else if (m_current_cpu_usage > 80)
1330   {
1331     jam();
1332     max_spintime /= 3; // 33%
1333   }
1334   else if (m_current_cpu_usage > 70)
1335   {
1336     jam();
1337     max_spintime *= 45;
1338     max_spintime /= 100; // 45%
1339   }
1340   else if (m_current_cpu_usage > 60)
1341   {
1342     jam();
1343     max_spintime *= 60;
1344     max_spintime /= 100; // 60%
1345   }
1346   else if (m_current_cpu_usage > 50)
1347   {
1348     jam();
1349     max_spintime *= 75;
1350     max_spintime /= 100; // 75%
1351   }
1352   else if (m_current_cpu_usage > 40)
1353   {
1354     jam();
1355     max_spintime *= 90;
1356     max_spintime /= 100; // 90%
1357   }
1358 
1359   if (m_current_spintime > max_spintime)
1360   {
1361     m_current_spintime = max_spintime;
1362   }
1363   if (num_events >= 3)
1364   {
1365   DEB_SPIN(("(%u)SPIN events: %u, spintime selected: %u "
1366             ":ovh[0]=%llu,cost[0]=%llu"
1367             ":ovh[1]=%llu,cost[1]=%llu"
1368             ":ovh[2]=%llu,cost[2]=%llu"
1369             ":ovh[3]=%llu,cost[3]=%llu"
1370             ":ovh[4]=%llu,cost[4]=%llu"
1371             ":ovh[5]=%llu,cost[5]=%llu"
1372             ":ovh[6]=%llu,cost[6]=%llu"
1373             ":ovh[7]=%llu,cost[7]=%llu"
1374             ":ovh[8]=%llu,cost[8]=%llu"
1375             ":ovh[9]=%llu,cost[9]=%llu"
1376             ":ovh[10]=%llu,cost[10]=%llu"
1377             ":ovh[11]=%llu,cost[11]=%llu"
1378             ":ovh[12]=%llu,cost[12]=%llu"
1379             ":ovh[13]=%llu,cost[13]=%llu"
1380             ":ovh[14]=%llu,cost[14]=%llu",
1381             instance(),
1382             num_events,
1383             m_current_spintime,
1384             calc_spin_overhead[0],
1385             calc_spin_cost[0],
1386             calc_spin_overhead[1],
1387             calc_spin_cost[1],
1388             calc_spin_overhead[2],
1389             calc_spin_cost[2],
1390             calc_spin_overhead[3],
1391             calc_spin_cost[3],
1392             calc_spin_overhead[4],
1393             calc_spin_cost[4],
1394             calc_spin_overhead[5],
1395             calc_spin_cost[5],
1396             calc_spin_overhead[6],
1397             calc_spin_cost[6],
1398             calc_spin_overhead[7],
1399             calc_spin_cost[7],
1400             calc_spin_overhead[8],
1401             calc_spin_cost[8],
1402             calc_spin_overhead[9],
1403             calc_spin_cost[9],
1404             calc_spin_overhead[10],
1405             calc_spin_cost[10],
1406             calc_spin_overhead[11],
1407             calc_spin_cost[11],
1408             calc_spin_overhead[12],
1409             calc_spin_cost[12],
1410             calc_spin_overhead[13],
1411             calc_spin_cost[13],
1412             calc_spin_overhead[14],
1413             calc_spin_cost[14]));
1414   }
1415   return m_current_spintime;
1416 }
1417 
1418 void
check_spintime(bool local_call)1419 Thrman::check_spintime(bool local_call)
1420 {
1421   if (!m_phase2_done)
1422   {
1423     jam();
1424     return;
1425   }
1426   if (!local_call && !m_is_idle)
1427   {
1428     jam();
1429     return;
1430   }
1431   if (!m_enable_adaptive_spinning)
1432   {
1433     jam();
1434     return;
1435   }
1436   if (m_configured_spintime == 0)
1437   {
1438     /* No configured spinning on the thread, so ignore check of spin time. */
1439     jam();
1440     return;
1441   }
1442   struct ndb_spin_stat spin_stat;
1443   mt_get_spin_stat(this, &spin_stat);
1444 
1445   if (spin_stat.m_num_waits >= 3)
1446   {
1447   DEB_SPIN(("(%u)m_sleep_longer_spin_time: %u, "
1448             "m_sleep_shorter_spin_time: %u"
1449             ", local_call: %s",
1450             instance(),
1451             spin_stat.m_sleep_longer_spin_time,
1452             spin_stat.m_sleep_shorter_spin_time,
1453             local_call ? "true" : "false"));
1454   }
1455 
1456   if (m_shared_environment)
1457   {
1458     /**
1459      * We never spin in a shared environment, this would cause even more
1460      * overload on the CPUs to happen.
1461      */
1462     set_spin_stat(0, local_call);
1463     return;
1464   }
1465   if (spin_stat.m_num_waits >= 3)
1466   {
1467   DEB_SPIN(("(%u): <= %u: %u, <= %u: %u, <= %u: %u, <= %u: %u,"
1468             " <= %u: %u, <= %u: %u, <= %u: %u, <= %u: %u"
1469             " <= %u: %u, <= %u: %u, <= %u: %u, <= %u: %u"
1470             " <= %u: %u, <= %u: %u, <= %u: %u, <= MAX: %u",
1471             instance(),
1472             spin_stat.m_spin_interval[0],
1473             spin_stat.m_micros_sleep_times[0],
1474             spin_stat.m_spin_interval[1],
1475             spin_stat.m_micros_sleep_times[1],
1476             spin_stat.m_spin_interval[2],
1477             spin_stat.m_micros_sleep_times[2],
1478             spin_stat.m_spin_interval[3],
1479             spin_stat.m_micros_sleep_times[3],
1480             spin_stat.m_spin_interval[4],
1481             spin_stat.m_micros_sleep_times[4],
1482             spin_stat.m_spin_interval[5],
1483             spin_stat.m_micros_sleep_times[5],
1484             spin_stat.m_spin_interval[6],
1485             spin_stat.m_micros_sleep_times[6],
1486             spin_stat.m_spin_interval[7],
1487             spin_stat.m_micros_sleep_times[7],
1488             spin_stat.m_spin_interval[8],
1489             spin_stat.m_micros_sleep_times[8],
1490             spin_stat.m_spin_interval[9],
1491             spin_stat.m_micros_sleep_times[9],
1492             spin_stat.m_spin_interval[10],
1493             spin_stat.m_micros_sleep_times[10],
1494             spin_stat.m_spin_interval[11],
1495             spin_stat.m_micros_sleep_times[11],
1496             spin_stat.m_spin_interval[12],
1497             spin_stat.m_micros_sleep_times[12],
1498             spin_stat.m_spin_interval[13],
1499             spin_stat.m_micros_sleep_times[13],
1500             spin_stat.m_spin_interval[14],
1501             spin_stat.m_micros_sleep_times[14],
1502             spin_stat.m_micros_sleep_times[15]));
1503   }
1504   Uint32 spin_time = calc_new_spin(&spin_stat);
1505   set_spin_stat(spin_time, local_call);
1506   return;
1507 }
1508 
1509 /**
1510  * We call this function every 50 milliseconds.
1511  *
1512  * Load Information Gathering in THRMAN
1513  * ------------------------------------
1514  * We gather information from the operating system on how user time and
1515  * system time the thread has spent. We also get information from the
1516  * scheduler about how much time the thread has spent in sleep mode,
1517  * how much time spent sending and how much time spent doing the work
1518  * the thread is assigned (for most block threads this is executing
1519  * signals, for receive threads it is receiving and for send threads
1520  * it is sending.
1521  *
1522  * ndbinfo tables based on this gathered information
1523  * -------------------------------------------------
1524  * We collect this data such that we can report the last 1 second
1525  * information about status per 50 milliseconds.
1526  * We also collect information about reports for 20 seconds with
1527  * 1 second per collection point.
1528  * We also collect information about reports for 400 seconds with
1529  * 20 second per collection point.
1530  *
1531  * This data is reported in 3 different ndbinfo tables where each
1532  * thread reports its own data. Thus twenty rows per thread per node
1533  * in each of those tables. These tables represent similar information
1534  * as we can get from top, but here it is reported per ndbmtd
1535  * block thread and also ndbmtd send thread. Currently we don't
1536  * cover NDBFS threads and transporter connection threads.
1537  *
1538  * We also have a smaller table that reports one row per thread per
1539  * node and this row represents the load information for the last
1540  * second.
1541  *
1542  * Use of the data for adaptive load regulation of LCPs
1543  * ----------------------------------------------------
1544  * This data is also used in adaptive load regulation algorithms in
1545  * MySQL Cluster data nodes. The intention is to increase this usage
1546  * with time. The first use case was in 7.4 for adaptive speed of
1547  * LCPs.
1548  *
1549  * Use of data for adaptive send assistance of block threads
1550  * ---------------------------------------------------------
1551  * The next use case is to control the sending from various threads.
1552  * Using send threads we are able to send from any block thread, the
1553  * receive threads and finally also the send threads.
1554  *
1555  * We want to know the load status of each thread to decide how active
1556  * each thread should be in assisting send threads in sending. The send
1557  * threads can always send at highest speed.
1558  *
1559  * Description of overload states
1560  * ------------------------------
1561  * The idea is that when a thread reaches a level where it needs more
1562  * than 75% of the time to execute then it should offload all send
1563  * activities to all other threads. However even before we reach this
1564  * critical level we should adjust our assistance to send threads.
1565  *
1566  * As with any adaptive algorithm it is good to have a certain level of
1567  * hysteresis in the changes. So we should not adjust the levels too
1568  * fast. One reason for this is that as we increase our level of send
1569  * assistance we will obviously become more loaded, we want to keep
1570  * this extra load on a level such that the block thread still can
1571  * deliver reponses to its main activities within reasonable limits.
1572  *
1573  * So we will have at least 3 different levels of load for a thread.
1574  * STATE: Overload
1575  * ---------------
1576  * It can be overloaded when it has passed 75% usage for normal thread
1577  * activity without send activities.
1578  *
1579  * STATE: Medium
1580  * -------------
1581  * It can be at medium load when it has reached 30% normal thread activity.
1582  * In this case we should still handle a bit of send assistance, but also
1583  * offload a part to the send threads.
1584  *
1585  * STATE: Light
1586  * ------------
1587  * The final level is light load where we are below 30% time spent for normal
1588  * thread activities. In this case we will for the most part handle our own
1589  * sending and also assist others in sending.
1590  *
1591  * A more detailed description of the send algorithms and how they interact
1592  * is found in mt.cpp around the method do_send.
1593  *
1594  * Global node state for send assistance
1595  * -------------------------------------
1596  * One more thing is that we also need global information about the node
1597  * state. This is provided by the THRMAN with instance number 1 which is
1598  * non-proxy block executing in the main thread. The scheduler needs to
1599  * know if any thread is currently in overload mode. If one thread is
1600  * is in overload mode we should change the sleep interval in all threads.
1601  * So when there are overloaded threads in the node then we should ensure
1602  * that all threads wakeup more often to assist in sending. So we change
1603  * the sleep interval for all threads to 1 milliseconds when we are in
1604  * this state.
1605  *
1606  * The information gathered in instance 1 about send threads is reported to
1607  * all threads to ensure that all threads can use the mean percentage of
1608  * usage for send threads in the algorithm to decide when to change overload
1609  * level. The aim is that overload is defined as 85% instead of 75% when
1610  * send threads are at more than 75% load level.
1611  *
1612  * THRMAN with instance number has one more responsibility, this is to
1613  * gather the statistics from the send threads.
1614  *
1615  * So each thread is responsible to gather information and decide which
1616  * level of overload it currently is at. It will however report to
1617  * THRMAN instance 1 about any decision to change of its overload state.
1618  * So this THRMAN instance have the global node state and have a bit
1619  * more information about the global state. Based on this information
1620  * it could potentially make decisions to change the overload state
1621  * for a certain thread.
1622  *
1623  * Reasons for global node state for send assistance
1624  * -------------------------------------------------
1625  * One reason to change the state is if we are in a state where we are in
1626  * a global overload state, this means that the local decisions are not
1627  * sufficient since the send threads are not capable to keep up with the
1628  * load even with the assistance they get.
1629  *
1630  * The algorithms in THRMAN are to a great extent designed to protect the
1631  * LDM threads from overload, but at the same time it is possible that
1632  * the thread configuration is setup such that we have either constant or
1633  * temporary overload on other threads. Even more in a cloud environment
1634  * we could easily be affected by other activities in other cloud apps
1635  * and we thus need to have a bit of flexibility in moving load to other
1636  * threads currently not so overloaded and thus ensure that we make best
1637  * use of all CPU resources in the machine assigned to us.
1638  *
1639  * Potential future usage of this load information
1640  * -----------------------------------------------
1641  * We can provide load control to ensure that the cluster continues to
1642  * deliver the basic services and in this case we might decrease certain
1643  * types of query types. We could introduce different priority levels for
1644  * queries and use those to decide which transactions that are allowed to
1645  * continue in an overloaded state.
1646  *
1647  * The best place to stop any activities is when a transaction starts, so
1648  * either at normal transaction start in DBTC or DBSPJ or in schema
1649  * transaction start in DBDICT. Refusing to start a transaction has no
1650  * impact on already performed work, so this is the best manner to ensure
1651  * that we don't get into feedback problems where we have to redo the
1652  * work more than once which is likely to make the overload situation even
1653  * more severe.
1654  *
1655  * Another future development is that threads provide receive thread
1656  * assistance in the same manner so as to protect the receive threads
1657  * from overload. This will however require us to ensure that we don't
1658  * create signalling order issues since signals will be routed different
1659  * ways dependent on which block thread performs the receive operation.
1660  */
1661 void
measure_cpu_usage(Signal * signal)1662 Thrman::measure_cpu_usage(Signal *signal)
1663 {
1664   struct ndb_rusage curr_rusage;
1665 
1666   /**
1667    * Start by making a new CPU usage measurement. After that we will
1668    * measure how much time has passed since last measurement and from
1669    * this we can calculate a percentage of CPU usage that this thread
1670    * has had for the last second or so.
1671    */
1672 
1673   MeasurementRecordPtr measurePtr;
1674   MeasurementRecordPtr measure_1sec_Ptr;
1675   MeasurementRecordPtr measure_20sec_Ptr;
1676 
1677   NDB_TICKS curr_time = NdbTick_getCurrentTicks();
1678   Uint64 elapsed_50ms = NdbTick_Elapsed(prev_50ms_tick, curr_time).microSec();
1679   Uint64 elapsed_1sec = NdbTick_Elapsed(prev_1sec_tick, curr_time).microSec();
1680   Uint64 elapsed_20sec = NdbTick_Elapsed(prev_20sec_tick, curr_time).microSec();
1681   MeasurementRecord loc_measure;
1682 
1683   /* Get performance timers from scheduler. */
1684   getPerformanceTimers(loc_measure.m_sleep_time_thread,
1685                        loc_measure.m_spin_time_thread,
1686                        loc_measure.m_buffer_full_time_thread,
1687                        loc_measure.m_send_time_thread);
1688 
1689   bool check_1sec = false;
1690   bool check_20sec = false;
1691 
1692   int res = Ndb_GetRUsage(&curr_rusage, false);
1693   if (res != 0)
1694   {
1695     jam();
1696 #ifdef DEBUG_CPU_USAGE
1697     g_eventLogger->info("instance: %u failed Ndb_GetRUsage, res: %d",
1698                         instance(),
1699                         -res);
1700 #endif
1701     memset(&curr_rusage, 0, sizeof(curr_rusage));
1702   }
1703   {
1704     jam();
1705     c_next_50ms_measure.first(measurePtr);
1706     calculate_measurement(measurePtr,
1707                           &curr_rusage,
1708                           &m_last_50ms_rusage,
1709                           &loc_measure,
1710                           &m_last_50ms_base_measure,
1711                           elapsed_50ms);
1712     Uint64 exec_time = measurePtr.p->m_exec_time_thread -
1713                        measurePtr.p->m_spin_time_thread;
1714     Uint64 elapsed_time = measurePtr.p->m_elapsed_time;
1715     if (elapsed_time > 0)
1716     {
1717       Uint64 exec_perc = exec_time * 1000 + 500;
1718       exec_perc /= (10 * elapsed_time);
1719       if (exec_perc <= 100)
1720       {
1721         jam();
1722         m_current_cpu_usage = Uint32(exec_perc);
1723       }
1724       else
1725       {
1726         jam();
1727         m_current_cpu_usage = 0;
1728       }
1729     }
1730     else
1731     {
1732       jam();
1733       m_current_cpu_usage = 0;
1734     }
1735     if (m_current_cpu_usage >= 40)
1736     {
1737     DEB_SPIN(("(%u)Current CPU usage is %u percent",
1738               instance(),
1739               m_current_cpu_usage));
1740     }
1741     c_next_50ms_measure.remove(measurePtr);
1742     c_next_50ms_measure.addLast(measurePtr);
1743     prev_50ms_tick = curr_time;
1744   }
1745   if (elapsed_1sec > Uint64(1000 * 1000))
1746   {
1747     jam();
1748     check_1sec = true;
1749     c_next_1sec_measure.first(measurePtr);
1750     calculate_measurement(measurePtr,
1751                           &curr_rusage,
1752                           &m_last_1sec_rusage,
1753                           &loc_measure,
1754                           &m_last_1sec_base_measure,
1755                           elapsed_1sec);
1756     c_next_1sec_measure.remove(measurePtr);
1757     c_next_1sec_measure.addLast(measurePtr);
1758     prev_1sec_tick = curr_time;
1759   }
1760   if (elapsed_20sec > Uint64(20 * 1000 * 1000))
1761   {
1762     jam();
1763     check_20sec = true;
1764     c_next_20sec_measure.first(measurePtr);
1765     calculate_measurement(measurePtr,
1766                           &curr_rusage,
1767                           &m_last_20sec_rusage,
1768                           &loc_measure,
1769                           &m_last_20sec_base_measure,
1770                           elapsed_20sec);
1771     c_next_20sec_measure.remove(measurePtr);
1772     c_next_20sec_measure.addLast(measurePtr);
1773     prev_20sec_tick = curr_time;
1774   }
1775   if (instance() == MAIN_THRMAN_INSTANCE)
1776   {
1777     jam();
1778     for (Uint32 send_instance = 0;
1779          send_instance < m_num_send_threads;
1780          send_instance++)
1781     {
1782       jam();
1783       SendThreadPtr sendThreadPtr;
1784       SendThreadMeasurementPtr sendThreadMeasurementPtr;
1785       SendThreadMeasurement curr_send_thread_measure;
1786 
1787       getSendPerformanceTimers(send_instance,
1788                        curr_send_thread_measure.m_exec_time,
1789                        curr_send_thread_measure.m_sleep_time,
1790                        curr_send_thread_measure.m_spin_time,
1791                        curr_send_thread_measure.m_user_time_os,
1792                        curr_send_thread_measure.m_kernel_time_os,
1793                        curr_send_thread_measure.m_elapsed_time_os);
1794 
1795       c_sendThreadRecordPool.getPtr(sendThreadPtr, send_instance);
1796       {
1797         jam();
1798         Local_SendThreadMeasurement_fifo list_50ms(c_sendThreadMeasurementPool,
1799                             sendThreadPtr.p->m_send_thread_50ms_measurements);
1800         list_50ms.first(sendThreadMeasurementPtr);
1801         calculate_send_measurement(sendThreadMeasurementPtr,
1802                        &curr_send_thread_measure,
1803                        &sendThreadPtr.p->m_last_50ms_send_thread_measure,
1804                        elapsed_50ms,
1805                        send_instance);
1806         list_50ms.remove(sendThreadMeasurementPtr);
1807         list_50ms.addLast(sendThreadMeasurementPtr);
1808       }
1809       if (elapsed_1sec > Uint64(1000 * 1000))
1810       {
1811         jam();
1812         Local_SendThreadMeasurement_fifo list_1sec(c_sendThreadMeasurementPool,
1813                             sendThreadPtr.p->m_send_thread_1sec_measurements);
1814         list_1sec.first(sendThreadMeasurementPtr);
1815         calculate_send_measurement(sendThreadMeasurementPtr,
1816                        &curr_send_thread_measure,
1817                        &sendThreadPtr.p->m_last_1sec_send_thread_measure,
1818                        elapsed_1sec,
1819                        send_instance);
1820         list_1sec.remove(sendThreadMeasurementPtr);
1821         list_1sec.addLast(sendThreadMeasurementPtr);
1822       }
1823       if (elapsed_20sec > Uint64(20 * 1000 * 1000))
1824       {
1825         jam();
1826         Local_SendThreadMeasurement_fifo list_20sec(c_sendThreadMeasurementPool,
1827                             sendThreadPtr.p->m_send_thread_20sec_measurements);
1828         list_20sec.first(sendThreadMeasurementPtr);
1829         calculate_send_measurement(sendThreadMeasurementPtr,
1830                        &curr_send_thread_measure,
1831                        &sendThreadPtr.p->m_last_20sec_send_thread_measure,
1832                        elapsed_20sec,
1833                        send_instance);
1834         list_20sec.remove(sendThreadMeasurementPtr);
1835         list_20sec.addLast(sendThreadMeasurementPtr);
1836       }
1837     }
1838     if (check_1sec)
1839     {
1840       Uint32 send_thread_percentage =
1841         calculate_mean_send_thread_load();
1842       sendSEND_THREAD_STATUS_REP(signal, send_thread_percentage);
1843     }
1844   }
1845   check_overload_status(signal, check_1sec, check_20sec);
1846 }
1847 
1848 void
calculate_measurement(MeasurementRecordPtr measurePtr,struct ndb_rusage * curr_rusage,struct ndb_rusage * base_rusage,MeasurementRecord * curr_measure,MeasurementRecord * base_measure,Uint64 elapsed_micros)1849 Thrman::calculate_measurement(MeasurementRecordPtr measurePtr,
1850                               struct ndb_rusage *curr_rusage,
1851                               struct ndb_rusage *base_rusage,
1852                               MeasurementRecord *curr_measure,
1853                               MeasurementRecord *base_measure,
1854                               Uint64 elapsed_micros)
1855 {
1856   Uint64 user_micros;
1857   Uint64 kernel_micros;
1858   Uint64 total_micros;
1859 
1860 
1861   measurePtr.p->m_first_measure_done = true;
1862 
1863   measurePtr.p->m_send_time_thread = curr_measure->m_send_time_thread -
1864                                      base_measure->m_send_time_thread;
1865 
1866   measurePtr.p->m_sleep_time_thread = curr_measure->m_sleep_time_thread -
1867                                       base_measure->m_sleep_time_thread;
1868 
1869   measurePtr.p->m_spin_time_thread = curr_measure->m_spin_time_thread -
1870                                       base_measure->m_spin_time_thread;
1871 
1872 
1873   measurePtr.p->m_buffer_full_time_thread =
1874     curr_measure->m_buffer_full_time_thread -
1875     base_measure->m_buffer_full_time_thread;
1876 
1877   measurePtr.p->m_exec_time_thread =
1878     elapsed_micros - measurePtr.p->m_sleep_time_thread;
1879 
1880   measurePtr.p->m_elapsed_time = elapsed_micros;
1881 
1882   if ((curr_rusage->ru_utime == 0 &&
1883        curr_rusage->ru_stime == 0) ||
1884       (base_rusage->ru_utime == 0 &&
1885        base_rusage->ru_stime == 0))
1886   {
1887     jam();
1888     measurePtr.p->m_user_time_os = 0;
1889     measurePtr.p->m_kernel_time_os = 0;
1890     measurePtr.p->m_idle_time_os = 0;
1891   }
1892   else
1893   {
1894     jam();
1895     user_micros = curr_rusage->ru_utime - base_rusage->ru_utime;
1896     kernel_micros = curr_rusage->ru_stime - base_rusage->ru_stime;
1897     total_micros = user_micros + kernel_micros;
1898 
1899     measurePtr.p->m_user_time_os = user_micros;
1900     measurePtr.p->m_kernel_time_os = kernel_micros;
1901     if (elapsed_micros >= total_micros)
1902     {
1903       jam();
1904       measurePtr.p->m_idle_time_os = elapsed_micros - total_micros;
1905     }
1906     else
1907     {
1908       jam();
1909       measurePtr.p->m_idle_time_os = 0;
1910     }
1911   }
1912 
1913 #ifdef DEBUG_CPU_USAGE
1914 #ifndef HIGH_DEBUG_CPU_USAGE
1915   if (elapsed_micros > Uint64(1000 * 1000))
1916 #endif
1917   g_eventLogger->info("(%u)name: %s, ut_os: %u, kt_os: %u,"
1918                       " idle_os: %u"
1919                       ", elapsed_time: %u, exec_time: %u,"
1920                       " sleep_time: %u, spin_time: %u, send_time: %u",
1921                       instance(),
1922                       m_thread_name,
1923                       Uint32(measurePtr.p->m_user_time_os),
1924                       Uint32(measurePtr.p->m_kernel_time_os),
1925                       Uint32(measurePtr.p->m_idle_time_os),
1926                       Uint32(measurePtr.p->m_elapsed_time),
1927                       Uint32(measurePtr.p->m_exec_time_thread),
1928                       Uint32(measurePtr.p->m_sleep_time_thread),
1929                       Uint32(measurePtr.p->m_spin_time_thread),
1930                       Uint32(measurePtr.p->m_send_time_thread));
1931 #endif
1932   base_rusage->ru_utime = curr_rusage->ru_utime;
1933   base_rusage->ru_stime = curr_rusage->ru_stime;
1934 
1935   base_measure->m_send_time_thread = curr_measure->m_send_time_thread;
1936   base_measure->m_sleep_time_thread = curr_measure->m_sleep_time_thread;
1937   base_measure->m_spin_time_thread = curr_measure->m_spin_time_thread;
1938   base_measure->m_buffer_full_time_thread = curr_measure->m_buffer_full_time_thread;
1939 }
1940 
1941 void
calculate_send_measurement(SendThreadMeasurementPtr sendThreadMeasurementPtr,SendThreadMeasurement * curr_send_thread_measure,SendThreadMeasurement * last_send_thread_measure,Uint64 elapsed_time,Uint32 send_instance)1942 Thrman::calculate_send_measurement(
1943   SendThreadMeasurementPtr sendThreadMeasurementPtr,
1944   SendThreadMeasurement *curr_send_thread_measure,
1945   SendThreadMeasurement *last_send_thread_measure,
1946   Uint64 elapsed_time,
1947   Uint32 send_instance)
1948 {
1949   (void)elapsed_time;
1950   sendThreadMeasurementPtr.p->m_first_measure_done = true;
1951 
1952 
1953   sendThreadMeasurementPtr.p->m_exec_time =
1954                      curr_send_thread_measure->m_exec_time -
1955                      last_send_thread_measure->m_exec_time;
1956 
1957   sendThreadMeasurementPtr.p->m_sleep_time =
1958                      curr_send_thread_measure->m_sleep_time -
1959                      last_send_thread_measure->m_sleep_time;
1960 
1961   sendThreadMeasurementPtr.p->m_spin_time =
1962                      curr_send_thread_measure->m_spin_time -
1963                      last_send_thread_measure->m_spin_time;
1964 
1965   /**
1966    * Elapsed time on measurements done is exec_time + sleep_time
1967    * as exec_time is first measured as elapsed time and then the
1968    * sleep time is subtracted from elapsed time to get exec time.
1969    *
1970    * See run_send_thread main loop for details.
1971    */
1972   sendThreadMeasurementPtr.p->m_elapsed_time =
1973     sendThreadMeasurementPtr.p->m_exec_time +
1974     sendThreadMeasurementPtr.p->m_sleep_time;
1975   elapsed_time = sendThreadMeasurementPtr.p->m_elapsed_time;
1976 
1977   if ((curr_send_thread_measure->m_user_time_os == 0 &&
1978        curr_send_thread_measure->m_kernel_time_os == 0 &&
1979        curr_send_thread_measure->m_elapsed_time_os == 0) ||
1980       (last_send_thread_measure->m_user_time_os == 0 &&
1981        last_send_thread_measure->m_kernel_time_os == 0 &&
1982        last_send_thread_measure->m_elapsed_time_os == 0))
1983   {
1984     jam();
1985     sendThreadMeasurementPtr.p->m_user_time_os = 0;
1986     sendThreadMeasurementPtr.p->m_kernel_time_os = 0;
1987     sendThreadMeasurementPtr.p->m_elapsed_time_os = 0;
1988     sendThreadMeasurementPtr.p->m_idle_time_os = 0;
1989   }
1990   else
1991   {
1992     jam();
1993     sendThreadMeasurementPtr.p->m_user_time_os =
1994                      curr_send_thread_measure->m_user_time_os -
1995                      last_send_thread_measure->m_user_time_os;
1996 
1997     sendThreadMeasurementPtr.p->m_kernel_time_os =
1998                      curr_send_thread_measure->m_kernel_time_os -
1999                      last_send_thread_measure->m_kernel_time_os;
2000 
2001     sendThreadMeasurementPtr.p->m_elapsed_time_os =
2002                      curr_send_thread_measure->m_elapsed_time_os -
2003                      last_send_thread_measure->m_elapsed_time_os;
2004     sendThreadMeasurementPtr.p->m_idle_time_os =
2005       sendThreadMeasurementPtr.p->m_elapsed_time_os -
2006       (sendThreadMeasurementPtr.p->m_user_time_os +
2007        sendThreadMeasurementPtr.p->m_kernel_time_os);
2008   }
2009 #ifdef DEBUG_CPU_USAGE
2010 #ifndef HIGH_DEBUG_CPU_USAGE
2011   if (elapsed_time > Uint64(1000 * 1000))
2012   {
2013 #endif
2014     Uint32 sleep = sendThreadMeasurementPtr.p->m_sleep_time;
2015     Uint32 exec = sendThreadMeasurementPtr.p->m_exec_time;
2016     int diff = elapsed_time - (sleep + exec);
2017     g_eventLogger->info("send_instance: %u, exec_time: %u, sleep_time: %u,"
2018                         " spin_tim: %u, elapsed_time: %u, diff: %d"
2019                         ", user_time_os: %u, kernel_time_os: %u,"
2020                         " elapsed_time_os: %u",
2021                         send_instance,
2022                         (Uint32)sendThreadMeasurementPtr.p->m_exec_time,
2023                         (Uint32)sendThreadMeasurementPtr.p->m_sleep_time,
2024                         (Uint32)sendThreadMeasurementPtr.p->m_spin_time,
2025                         (Uint32)sendThreadMeasurementPtr.p->m_elapsed_time,
2026                         diff,
2027                         (Uint32)sendThreadMeasurementPtr.p->m_user_time_os,
2028                         (Uint32)sendThreadMeasurementPtr.p->m_kernel_time_os,
2029                         (Uint32)sendThreadMeasurementPtr.p->m_elapsed_time_os);
2030 #ifndef HIGH_DEBUG_CPU_USAGE
2031   }
2032 #endif
2033 #else
2034   (void)send_instance;
2035 #endif
2036 
2037   last_send_thread_measure->m_exec_time =
2038     curr_send_thread_measure->m_exec_time;
2039 
2040   last_send_thread_measure->m_sleep_time =
2041     curr_send_thread_measure->m_sleep_time;
2042 
2043   last_send_thread_measure->m_spin_time =
2044     curr_send_thread_measure->m_spin_time;
2045 
2046   last_send_thread_measure->m_user_time_os =
2047     curr_send_thread_measure->m_user_time_os;
2048 
2049   last_send_thread_measure->m_kernel_time_os =
2050     curr_send_thread_measure->m_kernel_time_os;
2051 
2052   last_send_thread_measure->m_elapsed_time_os =
2053     curr_send_thread_measure->m_elapsed_time_os;
2054 }
2055 
2056 void
sum_measures(MeasurementRecord * dest,MeasurementRecord * source)2057 Thrman::sum_measures(MeasurementRecord *dest,
2058                      MeasurementRecord *source)
2059 {
2060   dest->m_user_time_os += source->m_user_time_os;
2061   dest->m_kernel_time_os += source->m_kernel_time_os;
2062   dest->m_idle_time_os += source->m_idle_time_os;
2063   dest->m_exec_time_thread += source->m_exec_time_thread;
2064   dest->m_sleep_time_thread += source->m_sleep_time_thread;
2065   dest->m_spin_time_thread += source->m_spin_time_thread;
2066   dest->m_send_time_thread += source->m_send_time_thread;
2067   dest->m_buffer_full_time_thread += source->m_buffer_full_time_thread;
2068   dest->m_elapsed_time += source->m_elapsed_time;
2069 }
2070 
2071 bool
calculate_cpu_load_last_second(MeasurementRecord * measure)2072 Thrman::calculate_cpu_load_last_second(MeasurementRecord *measure)
2073 {
2074   MeasurementRecordPtr measurePtr;
2075 
2076   memset(measure, 0, sizeof(MeasurementRecord));
2077 
2078   c_next_50ms_measure.first(measurePtr);
2079   if (measurePtr.p->m_first_measure_done)
2080   {
2081     do
2082     {
2083       jam();
2084       sum_measures(measure, measurePtr.p);
2085       c_next_50ms_measure.next(measurePtr);
2086     } while (measurePtr.i != RNIL &&
2087              measure->m_elapsed_time <
2088              Uint64(NUM_MEASUREMENTS * 50 * 1000));
2089     STATIC_ASSERT(NUM_MEASUREMENTS * 50 * 1000 == 1000 * 1000);
2090     return true;
2091   }
2092   jam();
2093   return false;
2094 }
2095 
2096 bool
calculate_cpu_load_last_20seconds(MeasurementRecord * measure)2097 Thrman::calculate_cpu_load_last_20seconds(MeasurementRecord *measure)
2098 {
2099   MeasurementRecordPtr measurePtr;
2100 
2101   memset(measure, 0, sizeof(MeasurementRecord));
2102 
2103   c_next_1sec_measure.first(measurePtr);
2104   if (measurePtr.p->m_first_measure_done)
2105   {
2106     do
2107     {
2108       jam();
2109       sum_measures(measure, measurePtr.p);
2110       c_next_1sec_measure.next(measurePtr);
2111     } while (measurePtr.i != RNIL &&
2112              measure->m_elapsed_time <
2113              Uint64(NUM_MEASUREMENTS * NUM_MEASUREMENTS * 50 * 1000));
2114     STATIC_ASSERT(NUM_MEASUREMENTS *
2115                   NUM_MEASUREMENTS *
2116                   50 * 1000 == 20 * 1000 * 1000);
2117     return true;
2118   }
2119   jam();
2120   return false;
2121 }
2122 
2123 bool
calculate_cpu_load_last_400seconds(MeasurementRecord * measure)2124 Thrman::calculate_cpu_load_last_400seconds(MeasurementRecord *measure)
2125 {
2126   MeasurementRecordPtr measurePtr;
2127 
2128   memset(measure, 0, sizeof(MeasurementRecord));
2129 
2130   c_next_20sec_measure.first(measurePtr);
2131   if (measurePtr.p->m_first_measure_done)
2132   {
2133     do
2134     {
2135       jam();
2136       sum_measures(measure, measurePtr.p);
2137       c_next_20sec_measure.next(measurePtr);
2138     } while (measurePtr.i != RNIL &&
2139              measure->m_elapsed_time <
2140              Uint64(NUM_MEASUREMENTS *
2141                     NUM_MEASUREMENTS *
2142                     NUM_MEASUREMENTS * 50 * 1000));
2143     STATIC_ASSERT(NUM_MEASUREMENTS *
2144                   NUM_MEASUREMENTS *
2145                   NUM_MEASUREMENTS *
2146                   50 * 1000 == 400 * 1000 * 1000);
2147     return true;
2148   }
2149   jam();
2150   return false;
2151 }
2152 
2153 void
init_stats(MeasureStats * stats)2154 Thrman::init_stats(MeasureStats *stats)
2155 {
2156   stats->min_os_percentage = 100;
2157   stats->min_next_os_percentage = 100;
2158 
2159   stats->max_os_percentage = 0;
2160   stats->max_next_os_percentage = 0;
2161 
2162   stats->avg_os_percentage = 0;
2163 
2164   stats->min_thread_percentage = 100;
2165   stats->min_next_thread_percentage = 100;
2166 
2167   stats->max_thread_percentage = 0;
2168   stats->max_next_thread_percentage = 0;
2169   stats->avg_thread_percentage = 0;
2170 
2171   stats->avg_send_percentage = 0;
2172 }
2173 
2174 void
calc_stats(MeasureStats * stats,MeasurementRecord * measure)2175 Thrman::calc_stats(MeasureStats *stats,
2176                    MeasurementRecord *measure)
2177 {
2178   Uint64 thread_percentage = 0;
2179   {
2180     if (measure->m_elapsed_time > 0)
2181     {
2182       Uint64 not_used_exec_time =
2183         measure->m_buffer_full_time_thread +
2184           measure->m_spin_time_thread;
2185       Uint64 used_exec_time = 0;
2186       if (measure->m_exec_time_thread > not_used_exec_time)
2187       {
2188         used_exec_time = measure->m_exec_time_thread - not_used_exec_time;
2189       }
2190       thread_percentage = Uint64(1000) * used_exec_time /
2191          measure->m_elapsed_time;
2192     }
2193     thread_percentage += 5;
2194     thread_percentage /= 10;
2195     if (thread_percentage > 100)
2196     {
2197       thread_percentage = 100;
2198     }
2199 
2200     if (thread_percentage < stats->min_thread_percentage)
2201     {
2202       jam();
2203       stats->min_next_thread_percentage = stats->min_thread_percentage;
2204       stats->min_thread_percentage = thread_percentage;
2205     }
2206     else if (thread_percentage < stats->min_next_thread_percentage)
2207     {
2208       jam();
2209       stats->min_next_thread_percentage = thread_percentage;
2210     }
2211     else if (thread_percentage > stats->max_thread_percentage)
2212     {
2213       jam();
2214       stats->max_next_thread_percentage = stats->max_thread_percentage;
2215       stats->max_thread_percentage = thread_percentage;
2216     }
2217     else if (thread_percentage > stats->max_next_thread_percentage)
2218     {
2219       jam();
2220       stats->max_next_thread_percentage = thread_percentage;
2221     }
2222     stats->avg_thread_percentage += thread_percentage;
2223   }
2224 
2225   Uint64 divider = 1;
2226   Uint64 multiplier = 1;
2227   Uint64 spin_percentage = 0;
2228   if (measure->m_elapsed_time > 0)
2229   {
2230     spin_percentage = (Uint64(1000) * measure->m_spin_time_thread) /
2231                        measure->m_elapsed_time;
2232     spin_percentage += 5;
2233     spin_percentage /= 10;
2234   }
2235   if (spin_percentage > 1)
2236   {
2237     jam();
2238     /**
2239      * We take spin time into account for OS time when it is at least
2240      * spinning 2% of the time. Otherwise we will ignore it. What we
2241      * do is that we assume that the time spent in OS time is equally
2242      * divided as the measured time, so e.g. if we spent 60% of the
2243      * time in exec and 30% spinning, then we will multiply os
2244      * percentage by 2/3 since we assume that a third of the time
2245      * in the OS time was spent spinning and we don't want spin time
2246      * to be counted as execution time, it is a form of busy sleep
2247      * time.
2248      */
2249     multiplier = thread_percentage;
2250     divider = (spin_percentage + thread_percentage);
2251   }
2252 
2253   {
2254     Uint64 os_percentage = 0;
2255     if (measure->m_elapsed_time > 0)
2256     {
2257       os_percentage = Uint64(1000) *
2258        (measure->m_user_time_os + measure->m_kernel_time_os) /
2259        measure->m_elapsed_time;
2260     }
2261     /* Take spin time into account */
2262     os_percentage *= multiplier;
2263     os_percentage /= divider;
2264 
2265     /**
2266      * We calculated percentage * 10, so by adding 5 we ensure that
2267      * rounding is ok. Integer division always round 99.9 to 99, so
2268      * we need to add 0.5% to get proper rounding.
2269      */
2270     os_percentage += 5;
2271     os_percentage /= 10;
2272 
2273     if (os_percentage < stats->min_os_percentage)
2274     {
2275       jam();
2276       stats->min_next_os_percentage = stats->min_os_percentage;
2277       stats->min_os_percentage = os_percentage;
2278     }
2279     else if (os_percentage < stats->min_next_os_percentage)
2280     {
2281       jam();
2282       stats->min_next_os_percentage = os_percentage;
2283     }
2284     else if (os_percentage > stats->max_os_percentage)
2285     {
2286       jam();
2287       stats->max_next_os_percentage = stats->max_os_percentage;
2288       stats->max_os_percentage = os_percentage;
2289     }
2290     else if (os_percentage > stats->max_next_os_percentage)
2291     {
2292       jam();
2293       stats->max_next_os_percentage = os_percentage;
2294     }
2295     stats->avg_os_percentage += os_percentage;
2296   }
2297   Uint64 send_percentage = 0;
2298   if (measure->m_elapsed_time > 0)
2299   {
2300     send_percentage = (Uint64(1000) *
2301      measure->m_send_time_thread) / measure->m_elapsed_time;
2302   }
2303   send_percentage += 5;
2304   send_percentage /= 10;
2305   stats->avg_send_percentage += send_percentage;
2306 }
2307 
2308 void
calc_avgs(MeasureStats * stats,Uint32 num_stats)2309 Thrman::calc_avgs(MeasureStats *stats, Uint32 num_stats)
2310 {
2311   stats->avg_os_percentage /= num_stats;
2312   stats->avg_thread_percentage /= num_stats;
2313   stats->avg_send_percentage /= num_stats;
2314 }
2315 
2316 bool
calculate_stats_last_100ms(MeasureStats * stats)2317 Thrman::calculate_stats_last_100ms(MeasureStats *stats)
2318 {
2319   MeasurementRecordPtr measurePtr;
2320   Uint32 num_stats = 0;
2321   Uint64 elapsed_time = 0;
2322 
2323   init_stats(stats);
2324   c_next_50ms_measure.first(measurePtr);
2325   if (!measurePtr.p->m_first_measure_done)
2326   {
2327     jam();
2328     return false;
2329   }
2330   do
2331   {
2332     jam();
2333     calc_stats(stats, measurePtr.p);
2334     num_stats++;
2335     elapsed_time += measurePtr.p->m_elapsed_time;
2336     c_next_50ms_measure.next(measurePtr);
2337   } while (measurePtr.i != RNIL &&
2338            elapsed_time < Uint64(100 * 1000));
2339   calc_avgs(stats, num_stats);
2340   return true;
2341 }
2342 
2343 bool
calculate_stats_last_second(MeasureStats * stats)2344 Thrman::calculate_stats_last_second(MeasureStats *stats)
2345 {
2346   MeasurementRecordPtr measurePtr;
2347   Uint32 num_stats = 0;
2348   Uint64 elapsed_time = 0;
2349 
2350   init_stats(stats);
2351   c_next_50ms_measure.first(measurePtr);
2352   if (!measurePtr.p->m_first_measure_done)
2353   {
2354     jam();
2355     return false;
2356   }
2357   do
2358   {
2359     jam();
2360     calc_stats(stats, measurePtr.p);
2361     num_stats++;
2362     elapsed_time += measurePtr.p->m_elapsed_time;
2363     c_next_50ms_measure.next(measurePtr);
2364   } while (measurePtr.i != RNIL &&
2365            elapsed_time < Uint64(NUM_MEASUREMENTS * 50 * 1000));
2366   STATIC_ASSERT(NUM_MEASUREMENTS * 50 * 1000 == 1000 * 1000);
2367   calc_avgs(stats, num_stats);
2368   return true;
2369 }
2370 
2371 bool
calculate_stats_last_20seconds(MeasureStats * stats)2372 Thrman::calculate_stats_last_20seconds(MeasureStats *stats)
2373 {
2374   MeasurementRecordPtr measurePtr;
2375   Uint32 num_stats = 0;
2376   Uint64 elapsed_time = 0;
2377 
2378   init_stats(stats);
2379   c_next_1sec_measure.first(measurePtr);
2380   if (!measurePtr.p->m_first_measure_done)
2381   {
2382     jam();
2383     return false;
2384   }
2385   do
2386   {
2387     jam();
2388     calc_stats(stats, measurePtr.p);
2389     num_stats++;
2390     elapsed_time += measurePtr.p->m_elapsed_time;
2391     c_next_1sec_measure.next(measurePtr);
2392   } while (measurePtr.i != RNIL &&
2393            elapsed_time <
2394            Uint64(NUM_MEASUREMENTS * NUM_MEASUREMENTS * 50 * 1000));
2395   STATIC_ASSERT(NUM_MEASUREMENTS *
2396                 NUM_MEASUREMENTS *
2397                 50 * 1000 == 20 * 1000 * 1000);
2398   calc_avgs(stats, num_stats);
2399   return true;
2400 }
2401 
2402 bool
calculate_stats_last_400seconds(MeasureStats * stats)2403 Thrman::calculate_stats_last_400seconds(MeasureStats *stats)
2404 {
2405   MeasurementRecordPtr measurePtr;
2406   Uint32 num_stats = 0;
2407   Uint64 elapsed_time = 0;
2408 
2409   init_stats(stats);
2410   c_next_20sec_measure.first(measurePtr);
2411   if (!measurePtr.p->m_first_measure_done)
2412   {
2413     jam();
2414     return false;
2415   }
2416   do
2417   {
2418     jam();
2419     calc_stats(stats, measurePtr.p);
2420     num_stats++;
2421     elapsed_time += measurePtr.p->m_elapsed_time;
2422     c_next_20sec_measure.next(measurePtr);
2423   } while (measurePtr.i != RNIL &&
2424            elapsed_time <
2425            Uint64(NUM_MEASUREMENTS *
2426                   NUM_MEASUREMENTS *
2427                   NUM_MEASUREMENTS * 50 * 1000));
2428   STATIC_ASSERT(NUM_MEASUREMENTS *
2429                 NUM_MEASUREMENTS *
2430                 NUM_MEASUREMENTS *
2431                 50 * 1000 == 400 * 1000 * 1000);
2432   calc_avgs(stats, num_stats);
2433   return true;
2434 }
2435 
2436 bool
calculate_send_thread_load_last_second(Uint32 send_instance,SendThreadMeasurement * measure)2437 Thrman::calculate_send_thread_load_last_second(Uint32 send_instance,
2438                                                SendThreadMeasurement *measure)
2439 {
2440   SendThreadPtr sendThreadPtr;
2441   SendThreadMeasurementPtr sendThreadMeasurementPtr;
2442 
2443   memset(measure, 0, sizeof(SendThreadMeasurement));
2444 
2445   c_sendThreadRecordPool.getPtr(sendThreadPtr, send_instance);
2446 
2447   Local_SendThreadMeasurement_fifo list_50ms(c_sendThreadMeasurementPool,
2448                          sendThreadPtr.p->m_send_thread_50ms_measurements);
2449   list_50ms.first(sendThreadMeasurementPtr);
2450 
2451   if (sendThreadMeasurementPtr.p->m_first_measure_done)
2452   {
2453     do
2454     {
2455       jam();
2456       measure->m_exec_time += sendThreadMeasurementPtr.p->m_exec_time;
2457       measure->m_sleep_time += sendThreadMeasurementPtr.p->m_sleep_time;
2458       measure->m_spin_time += sendThreadMeasurementPtr.p->m_spin_time;
2459       measure->m_elapsed_time += (sendThreadMeasurementPtr.p->m_exec_time +
2460                                   sendThreadMeasurementPtr.p->m_sleep_time);
2461       measure->m_user_time_os += sendThreadMeasurementPtr.p->m_user_time_os;
2462       measure->m_kernel_time_os += sendThreadMeasurementPtr.p->m_kernel_time_os;
2463       measure->m_elapsed_time_os += sendThreadMeasurementPtr.p->m_elapsed_time_os;
2464       measure->m_idle_time_os += sendThreadMeasurementPtr.p->m_idle_time_os;
2465       list_50ms.next(sendThreadMeasurementPtr);
2466     } while (sendThreadMeasurementPtr.i != RNIL &&
2467              measure->m_elapsed_time < Uint64(1000 * 1000));
2468     return true;
2469   }
2470   jam();
2471   return false;
2472 }
2473 
2474 Uint32
calculate_mean_send_thread_load()2475 Thrman::calculate_mean_send_thread_load()
2476 {
2477   SendThreadMeasurement measure;
2478   Uint32 tot_percentage = 0;
2479   if (m_num_send_threads == 0)
2480   {
2481     return 0;
2482   }
2483   for (Uint32 i = 0; i < m_num_send_threads; i++)
2484   {
2485     jam();
2486     bool succ = calculate_send_thread_load_last_second(i, &measure);
2487     if (!succ)
2488     {
2489       jam();
2490       return 0;
2491     }
2492 
2493     Uint64 send_thread_percentage = 0;
2494     if (measure.m_elapsed_time)
2495     {
2496       send_thread_percentage = Uint64(1000) *
2497         (measure.m_exec_time - measure.m_spin_time) /
2498         measure.m_elapsed_time;
2499     }
2500     send_thread_percentage += 5;
2501     send_thread_percentage /= 10;
2502 
2503     Uint64 send_spin_percentage = 0;
2504     Uint64 multiplier = 1;
2505     Uint64 divider = 1;
2506     if (measure.m_elapsed_time)
2507     {
2508       send_spin_percentage =
2509         (Uint64(1000) * measure.m_spin_time) / measure.m_elapsed_time;
2510       send_spin_percentage += 5;
2511       send_spin_percentage /= 10;
2512     }
2513 
2514     if (send_spin_percentage > 1)
2515     {
2516       jam();
2517       multiplier = send_thread_percentage;
2518       divider = (send_thread_percentage + send_spin_percentage);
2519     }
2520 
2521     Uint64 send_os_percentage = 0;
2522     if (measure.m_elapsed_time_os)
2523     {
2524       send_os_percentage =
2525         (Uint64(1000) * (measure.m_user_time_os + measure.m_kernel_time_os) /
2526           measure.m_elapsed_time_os);
2527     }
2528     send_os_percentage *= multiplier;
2529     send_os_percentage /= divider;
2530 
2531     send_os_percentage += 5;
2532     send_os_percentage /= 10;
2533 
2534     if (send_os_percentage > send_thread_percentage)
2535     {
2536       jam();
2537       send_thread_percentage = send_os_percentage;
2538     }
2539     tot_percentage += Uint32(send_thread_percentage);
2540   }
2541   tot_percentage /= m_num_send_threads;
2542   return tot_percentage;
2543 }
2544 
2545 void
execGET_CPU_USAGE_REQ(Signal * signal)2546 Thrman::execGET_CPU_USAGE_REQ(Signal *signal)
2547 {
2548   MeasurementRecord curr_measure;
2549   if (calculate_cpu_load_last_second(&curr_measure))
2550   {
2551     jam();
2552     Uint64 percentage = (Uint64(100) *
2553                         curr_measure.m_exec_time_thread) /
2554                           curr_measure.m_elapsed_time;
2555     signal->theData[0] = Uint32(percentage);
2556   }
2557   else
2558   {
2559     jam();
2560     signal->theData[0] = default_cpu_load;
2561   }
2562 }
2563 
2564 void
handle_decisions()2565 Thrman::handle_decisions()
2566 {
2567   MeasureStats *stats = m_current_decision_stats;
2568 
2569   if (stats->avg_thread_percentage > (stats->avg_os_percentage + 25))
2570   {
2571     jam();
2572     if (!m_shared_environment)
2573     {
2574       jam();
2575       g_eventLogger->info("Setting ourselves in shared environment,"
2576                           " instance: %u, thread pct: %u"
2577                           ", os_pct: %u, intervals os: [%u, %u] thread: [%u, %u]",
2578                           instance(),
2579                           Uint32(stats->avg_thread_percentage),
2580                           Uint32(stats->avg_os_percentage),
2581                           Uint32(stats->min_next_os_percentage),
2582                           Uint32(stats->max_next_os_percentage),
2583                           Uint32(stats->min_next_thread_percentage),
2584                           Uint32(stats->max_next_thread_percentage));
2585     }
2586     m_shared_environment = true;
2587     m_max_warning_level = 200;
2588   }
2589   else if (stats->avg_thread_percentage < (stats->avg_os_percentage + 15))
2590   {
2591     /**
2592      * We use a hysteresis to avoid swapping between shared environment and
2593      * exclusive environment to quick when conditions quickly change.
2594      */
2595     jam();
2596     if (m_shared_environment)
2597     {
2598       jam();
2599       g_eventLogger->info("Setting ourselves in exclusive environment,"
2600                           " instance: %u, thread pct: %u"
2601                           ", os_pct: %u, intervals os: [%u, %u] thread: [%u, %u]",
2602                           instance(),
2603                           Uint32(stats->avg_thread_percentage),
2604                           Uint32(stats->avg_os_percentage),
2605                           Uint32(stats->min_next_os_percentage),
2606                           Uint32(stats->max_next_os_percentage),
2607                           Uint32(stats->min_next_thread_percentage),
2608                           Uint32(stats->max_next_thread_percentage));
2609     }
2610     m_shared_environment = false;
2611     m_max_warning_level = 20;
2612   }
2613 }
2614 
2615 Uint32
calculate_load(MeasureStats & stats,Uint32 & burstiness)2616 Thrman::calculate_load(MeasureStats  & stats, Uint32 & burstiness)
2617 {
2618   if (stats.avg_os_percentage >= stats.avg_thread_percentage)
2619   {
2620     burstiness = 0;
2621     jam();
2622     /* Always pick OS reported average unless thread reports higher. */
2623     return Uint32(stats.avg_os_percentage);
2624   }
2625   jam();
2626   burstiness = Uint32(stats.avg_thread_percentage - stats.avg_os_percentage);
2627   return Uint32(stats.avg_thread_percentage);
2628 }
2629 
2630 #define LIGHT_LOAD_LEVEL 30
2631 #define MEDIUM_LOAD_LEVEL 75
2632 #define CRITICAL_SEND_LEVEL 75
2633 #define CRITICAL_OVERLOAD_LEVEL 85
2634 
2635 Int32
get_load_status(Uint32 load,Uint32 send_load)2636 Thrman::get_load_status(Uint32 load, Uint32 send_load)
2637 {
2638   Uint32 base_load = 0;
2639   if (load > send_load)
2640   {
2641     jam();
2642     base_load = load - send_load;
2643   }
2644 
2645   if (base_load < LIGHT_LOAD_LEVEL &&
2646       load < CRITICAL_OVERLOAD_LEVEL)
2647   {
2648     jam();
2649     return (OverloadStatus)LIGHT_LOAD_CONST;
2650   }
2651   else if (base_load < MEDIUM_LOAD_LEVEL)
2652   {
2653     jam();
2654     return (OverloadStatus)MEDIUM_LOAD_CONST;
2655   }
2656   else if (base_load < CRITICAL_OVERLOAD_LEVEL)
2657   {
2658     if (m_send_thread_percentage >= CRITICAL_SEND_LEVEL)
2659     {
2660       jam();
2661       return (OverloadStatus)MEDIUM_LOAD_CONST;
2662     }
2663     else
2664     {
2665       jam();
2666       return (OverloadStatus)OVERLOAD_CONST;
2667     }
2668   }
2669   else
2670   {
2671     jam();
2672     return (OverloadStatus)OVERLOAD_CONST;
2673   }
2674 }
2675 
2676 void
change_warning_level(Int32 diff_status,Uint32 factor)2677 Thrman::change_warning_level(Int32 diff_status, Uint32 factor)
2678 {
2679   switch (diff_status)
2680   {
2681     case Int32(-2):
2682       jam();
2683       inc_warning(3 * factor);
2684       break;
2685     case Int32(-1):
2686       jam();
2687       inc_warning(factor);
2688       break;
2689     case Int32(0):
2690       jam();
2691       down_warning(factor);
2692       break;
2693     case Int32(1):
2694       jam();
2695       dec_warning(factor);
2696       break;
2697     case Int32(2):
2698       jam();
2699       dec_warning(3 * factor);
2700       break;
2701     default:
2702       ndbabort();
2703   }
2704 }
2705 
2706 void
handle_overload_stats_1sec()2707 Thrman::handle_overload_stats_1sec()
2708 {
2709   Uint32 burstiness;
2710   bool decision_stats = m_current_decision_stats == &c_1sec_stats;
2711 
2712   if (decision_stats)
2713   {
2714     jam();
2715     handle_decisions();
2716   }
2717   Uint32 load = calculate_load(c_1sec_stats, burstiness);
2718   m_burstiness += burstiness;
2719 
2720   Int32 load_status = get_load_status(load,
2721                                       c_1sec_stats.avg_send_percentage);
2722   Int32 diff_status = Int32(m_current_overload_status) - load_status;
2723   Uint32 factor = 10;
2724   change_warning_level(diff_status, factor);
2725 }
2726 
2727 
2728 void
handle_overload_stats_20sec()2729 Thrman::handle_overload_stats_20sec()
2730 {
2731   Uint32 burstiness;
2732   bool decision_stats = m_current_decision_stats == &c_20sec_stats;
2733 
2734   if (decision_stats)
2735   {
2736     jam();
2737     handle_decisions();
2738   }
2739   /* Burstiness only incremented for 1 second stats */
2740   Uint32 load = calculate_load(c_20sec_stats, burstiness);
2741   check_burstiness();
2742 
2743   Int32 load_status = get_load_status(load,
2744                                       c_20sec_stats.avg_send_percentage);
2745   Int32 diff_status = Int32(m_current_overload_status) - load_status;
2746   Uint32 factor = 3;
2747   change_warning_level(diff_status, factor);
2748 }
2749 
2750 void
handle_overload_stats_400sec()2751 Thrman::handle_overload_stats_400sec()
2752 {
2753   /**
2754    * We only use 400 second stats for long-term decisions, not to affect
2755    * the ongoing decisions.
2756    */
2757   handle_decisions();
2758 }
2759 
2760 /**
2761  * Sum burstiness for 20 seconds and if burstiness is at very high levels
2762  * we report it to the user in the node log. It is rather unlikely that
2763  * a reliable service can be delivered in very bursty environments.
2764  */
2765 void
check_burstiness()2766 Thrman::check_burstiness()
2767 {
2768   if (m_burstiness > NUM_MEASUREMENTS * 25)
2769   {
2770     jam();
2771     g_eventLogger->info("Bursty environment, mean burstiness of %u pct"
2772                         ", some risk of congestion issues",
2773                         m_burstiness / NUM_MEASUREMENTS);
2774   }
2775   else if (m_burstiness > NUM_MEASUREMENTS * 50)
2776   {
2777     jam();
2778     g_eventLogger->info("Very bursty environment, mean burstiness of %u pct"
2779                         ", risk for congestion issues",
2780                         m_burstiness / NUM_MEASUREMENTS);
2781   }
2782   else if (m_burstiness > NUM_MEASUREMENTS * 75)
2783   {
2784     jam();
2785     g_eventLogger->info("Extremely bursty environment, mean burstiness of %u pct"
2786                         ", very high risk for congestion issues",
2787                         m_burstiness / NUM_MEASUREMENTS);
2788   }
2789   m_burstiness = 0;
2790 }
2791 
2792 /**
2793  * This function is used to indicate that we're moving towards higher overload
2794  * states, so we will unconditionally move the warning level up.
2795  */
2796 void
inc_warning(Uint32 inc_factor)2797 Thrman::inc_warning(Uint32 inc_factor)
2798 {
2799   m_warning_level += inc_factor;
2800 }
2801 
2802 /**
2803  * This function is used to indicate that we're moving towards lower overload
2804  * states, so we will unconditionally move the warning level down.
2805  */
2806 void
dec_warning(Uint32 dec_factor)2807 Thrman::dec_warning(Uint32 dec_factor)
2808 {
2809   m_warning_level -= dec_factor;
2810 }
2811 
2812 /**
2813  * This function is used to indicate that we're at the correct overload state.
2814  * We will therefore decrease warning levels towards zero independent of whether
2815  * we are at high warning levels or low levels.
2816  */
2817 void
down_warning(Uint32 down_factor)2818 Thrman::down_warning(Uint32 down_factor)
2819 {
2820   if (m_warning_level > Int32(down_factor))
2821   {
2822     jam();
2823     m_warning_level -= down_factor;
2824   }
2825   else if (m_warning_level < (-Int32(down_factor)))
2826   {
2827     jam();
2828     m_warning_level += down_factor;
2829   }
2830   else
2831   {
2832     jam();
2833     m_warning_level = 0;
2834   }
2835 }
2836 
2837 void
sendOVERLOAD_STATUS_REP(Signal * signal)2838 Thrman::sendOVERLOAD_STATUS_REP(Signal *signal)
2839 {
2840   signal->theData[0] = instance();
2841   signal->theData[1] = m_current_overload_status;
2842   BlockReference ref = numberToRef(THRMAN,
2843                                    MAIN_THRMAN_INSTANCE,
2844                                    getOwnNodeId());
2845   sendSignal(ref, GSN_OVERLOAD_STATUS_REP, signal, 2, JBB);
2846 }
2847 
2848 void
sendSEND_THREAD_STATUS_REP(Signal * signal,Uint32 percentage)2849 Thrman::sendSEND_THREAD_STATUS_REP(Signal *signal, Uint32 percentage)
2850 {
2851   signal->theData[0] = percentage;
2852   for (Uint32 instance_no = 1; instance_no <= m_num_threads; instance_no++)
2853   {
2854     BlockReference ref = numberToRef(THRMAN,
2855                                      instance_no,
2856                                      getOwnNodeId());
2857     sendSignal(ref, GSN_SEND_THREAD_STATUS_REP, signal, 1, JBB);
2858   }
2859 }
2860 
2861 void
handle_state_change(Signal * signal)2862 Thrman::handle_state_change(Signal *signal)
2863 {
2864   if (m_warning_level > Int32(m_max_warning_level))
2865   {
2866     /**
2867      * Warning has reached a threshold and we need to increase the overload
2868      * status.
2869      */
2870     if (m_current_overload_status == (OverloadStatus)LIGHT_LOAD_CONST)
2871     {
2872       jam();
2873       m_current_overload_status = (OverloadStatus)MEDIUM_LOAD_CONST;
2874     }
2875     else if (m_current_overload_status == (OverloadStatus)MEDIUM_LOAD_CONST)
2876     {
2877       jam();
2878       m_current_overload_status = (OverloadStatus)OVERLOAD_CONST;
2879     }
2880     else
2881     {
2882       ndbabort();
2883     }
2884     jam();
2885 #ifdef DEBUG_CPU_USAGE
2886     g_eventLogger->info("instance: %u change to new state: %u, warning: %d",
2887                         instance(),
2888                         m_current_overload_status,
2889                         m_warning_level);
2890 #endif
2891     setOverloadStatus(m_current_overload_status);
2892     m_warning_level = 0;
2893     sendOVERLOAD_STATUS_REP(signal);
2894     return;
2895   }
2896   else if (m_warning_level < (-Int32(m_max_warning_level)))
2897   {
2898     /**
2899      * Warning has reached a threshold and we need to decrease the overload
2900      * status.
2901      */
2902     if (m_current_overload_status == (OverloadStatus)LIGHT_LOAD_CONST)
2903     {
2904       ndbabort();
2905     }
2906     else if (m_current_overload_status == (OverloadStatus)MEDIUM_LOAD_CONST)
2907     {
2908       jam();
2909       m_current_overload_status = (OverloadStatus)LIGHT_LOAD_CONST;
2910     }
2911     else if (m_current_overload_status == (OverloadStatus)OVERLOAD_CONST)
2912     {
2913       jam();
2914       m_current_overload_status = (OverloadStatus)MEDIUM_LOAD_CONST;
2915     }
2916     else
2917     {
2918       ndbabort();
2919     }
2920     jam();
2921 #ifdef DEBUG_CPU_USAGE
2922     g_eventLogger->info("instance: %u change to new state: %u, warning: %d",
2923                         instance(),
2924                         m_current_overload_status,
2925                         m_warning_level);
2926 #endif
2927     setOverloadStatus(m_current_overload_status);
2928     m_warning_level = 0;
2929     sendOVERLOAD_STATUS_REP(signal);
2930     return;
2931   }
2932   jam();
2933 #ifdef HIGH_DEBUG_CPU_USAGE
2934   g_eventLogger->info("instance: %u stay at state: %u, warning: %d",
2935                       instance(),
2936                       m_current_overload_status,
2937                       m_warning_level);
2938 #endif
2939   /* Warning level is within bounds, no need to change anything. */
2940   return;
2941 }
2942 
2943 void
check_overload_status(Signal * signal,bool check_1sec,bool check_20sec)2944 Thrman::check_overload_status(Signal *signal,
2945                               bool check_1sec,
2946                               bool check_20sec)
2947 {
2948   /**
2949    * This function checks the current overload status and makes a decision if
2950    * the status should change or if it is to remain at the current status.
2951    *
2952    * We have two measurements that we use to decide on overload status.
2953    * The first is the measurement based on the actual data reported by the OS.
2954    * This data is considered as correct when it comes to how much CPU time our
2955    * thread has used. However it will not say anything about the environment
2956    * we are executing in.
2957    *
2958    * So in order to get a feel for this environment we estimate also the time
2959    * we are spending in execution mode, how much time we are spending in
2960    * sleep mode. We also take into account if the thread has been spinning,
2961    * this time is added to the sleep time and subtracted fromt the exec time
2962    * of a thread.
2963    *
2964    * We can calculate idle time in two ways.
2965    * 1) m_elapsed_time - (m_user_time_os + m_kernel_time_os)
2966    * This is the actual idle time for the thread. We can only really use
2967    * this measurement in the absence of spin time, spinning time will be
2968    * added to OS time, but isn't really execution time.
2969    * 2) m_sleep_time_thread + m_spin_time_thread
2970    * This is the time that we actually decided to be idle because we had
2971    * no work to do. There are two possible reasons why these could differ.
2972    * One is if we have much mutex contentions that makes the OS put us into
2973    * idle mode since we need the mutex to proceed. The second is when we go
2974    * to sleep based on that we cannot proceed because we're out of buffers
2975    * somewhere. This is actually tracked by m_buffer_full_time_thread, so
2976    * we can add m_sleep_time_thread and m_buffer_full_time_thread to see
2977    * the total time we have decided to go to sleep.
2978    *
2979    * Finally we can also be descheduled by the OS by other threads that
2980    * compete for CPU resources. This kind of environment is much harder
2981    * to control since the variance of the load can be significant.
2982    *
2983    * So we want to measure this background load to see how much CPU resources
2984    * we actually have access to. If we operate in this type of environment we
2985    * need to change the overload status in a much slower speed. If we operate
2986    * in an environment where we get all the resources we need and more or less
2987    * always have access to a CPU when we need to, in this case we can react
2988    * much faster to changes. Still we don't want to react too fast since the
2989    * application behaviour can be a bit bursty as well, and we don't want to
2990    * go back to default behaviour too quick in these cases.
2991    *
2992    * We decide which environment we are processing in once every 20 seconds.
2993    * If we decide that we are in an environment where we don't have access
2994    * to dedicated CPU resources we will set the change speed to 10 seconds.
2995    * This means that the warning level need to reach 200 before we actually
2996    * change to a new overload level.
2997    *
2998    * If we operate in a nice environment where we have very little problems
2999    * with competition for CPU resources we will set the warning level to 20
3000    * before we change the overload level.
3001    *
3002    * So every 20 seconds we will calculate the following parameters for our
3003    * thread.
3004    *
3005    * 1) Mean CPU percentage as defined by (m_user_time_os + m_kernel_time_os)/
3006    *    m_elapsed_time_os.
3007    * 2) 95% confidence interval for this measurement (thus given that it is
3008    *    calculated by 20 estimates we drop the highest and the lowest
3009    *    percentage numbers. We will store the smallest percentage and the
3010    *    highest percentage of this interval.
3011    * 3) We calculate the same 3 values based on (m_exec_time_thread -
3012    *    (m_buffer_full_time_thread + m_spin_time_thread)) / m_elapsed_time.
3013    * 4) In addition we also calculate the mean value of
3014    *    m_send_time_thread / m_elapsed_time.
3015    *
3016    * Finally we take the mean numbers calculated in 1) and 3) and compare
3017    * them. If 3) is more than 10% higher than 1) than we consider ourselves
3018    * to be in a "shared" environment. Otherwise we decide that we are in an
3019    * "exclusive" environment.
3020    *
3021    * If we haven't got 400 seconds of statistics we will make a first estimate
3022    * based on 1 second of data and then again after 20 seconds of execution.
3023    * So the first 20 seconds we will check once per second the above. Then
3024    * we will check once per 20 seconds but only check the last 20 seconds of
3025    * data. After 400 seconds we will go over to checking all statistics back
3026    * 400 seconds.
3027    *
3028    * We will track the overload level by using warning level which is an
3029    * integer. So when it reaches either -20 or +20 we will decide to decrease/
3030    * increase the overload level in an exclusive environment.
3031    * In addition once every 1 seconds we will calculate the average over the
3032    * period and once every 20 seconds we will calculate the average over this
3033    * period.
3034    *
3035    * In general the overload levels are aimed at the following:
3036    * LIGHT_LOAD:
3037    * Light load is defined as using less than 30% of the capacity.
3038    *
3039    * MEDIUM_LOAD:
3040    * Medium load is defined as using less than 75% of the capacity, but
3041    * more than or equal to 30% of the capacity.
3042    *
3043    * OVERLOAD:
3044    * Overload is defined as when one is using more than 75% of the capacity.
3045    *
3046    * The capacity is the CPU resources we have access to, they can differ
3047    * based on which environment we are in.
3048    *
3049    * We define OVERLOAD_STATUS as being at more than 75% load level. At this level
3050    * we want to avoid sending anything from our node. We will definitely stay at
3051    * this level if we can show that any of the following is true for the last
3052    * 50 milliseconds:
3053    * 1) m_user_time_os + m_kernel_time_os is at least 75% of m_elapsed_time
3054    * OR
3055    * 2) m_exec_time_thread is at least 75% of m_elapsed_time
3056    *
3057    * At this level the influence of doing sends should not matter since we
3058    * are not performing any sends at this overload level.
3059    *
3060    * If performance drops down into the 30-75% range for any of this values
3061    * then we will increment a warning counter. This warning counter will be
3062    * decreased by reaching above 75%. If the warning counter reaches 20 we
3063    * will go down to MEDIUM overload level. In shared environment with bursty
3064    * behaviour we will wait until the warning level reaches 200.
3065    */
3066   if (check_1sec)
3067   {
3068     jam();
3069     if (calculate_stats_last_second(&c_1sec_stats))
3070     {
3071       jam();
3072       m_overload_handling_activated = true;
3073       handle_overload_stats_1sec();
3074     }
3075   }
3076   if (check_20sec)
3077   {
3078     jam();
3079     if (calculate_stats_last_400seconds(&c_400sec_stats))
3080     {
3081       jam();
3082       m_overload_handling_activated = true;
3083       m_current_decision_stats = &c_400sec_stats;
3084       handle_overload_stats_400sec();
3085       ndbrequire(calculate_stats_last_20seconds(&c_20sec_stats));
3086     }
3087     else if (calculate_stats_last_20seconds(&c_20sec_stats))
3088     {
3089       jam();
3090       if (m_current_decision_stats != &c_400sec_stats)
3091       {
3092         jam();
3093         m_current_decision_stats = &c_20sec_stats;
3094       }
3095       m_overload_handling_activated = true;
3096       handle_overload_stats_20sec();
3097     }
3098   }
3099   if (!m_overload_handling_activated)
3100   {
3101     jam();
3102     return;
3103   }
3104 
3105   MeasureStats stats;
3106   Uint32 burstiness;
3107   calculate_stats_last_100ms(&stats);
3108   Uint32 load = calculate_load(stats, burstiness);
3109 
3110   Int32 load_status = get_load_status(load,
3111                                       stats.avg_send_percentage);
3112   Int32 diff_status = Int32(m_current_overload_status) - load_status;
3113   Uint32 factor = 1;
3114   change_warning_level(diff_status, factor);
3115 
3116   handle_state_change(signal);
3117 }
3118 
3119 void
execDBINFO_SCANREQ(Signal * signal)3120 Thrman::execDBINFO_SCANREQ(Signal* signal)
3121 {
3122   jamEntry();
3123 
3124   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
3125   const Ndbinfo::ScanCursor* cursor =
3126     CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
3127   Ndbinfo::Ratelimit rl;
3128 
3129   switch(req.tableId) {
3130   case Ndbinfo::THREADS_TABLEID: {
3131     Uint32 pos = cursor->data[0];
3132     for (;;)
3133     {
3134       if (pos == 0)
3135       {
3136         jam();
3137         Ndbinfo::Row row(signal, req);
3138         row.write_uint32(getOwnNodeId());
3139         row.write_uint32(getThreadId()); // thr_no
3140         row.write_string(m_thread_name);
3141         row.write_string(m_thread_description);
3142         ndbinfo_send_row(signal, req, row, rl);
3143       }
3144       if (instance() != MAIN_THRMAN_INSTANCE)
3145       {
3146         jam();
3147         break;
3148       }
3149       pos++;
3150       if (pos > m_num_send_threads)
3151       {
3152         jam();
3153         break;
3154       }
3155       {
3156         jam();
3157         Ndbinfo::Row row(signal, req);
3158         row.write_uint32(getOwnNodeId());
3159         row.write_uint32(m_num_threads + (pos - 1)); // thr_no
3160         row.write_string(m_send_thread_name);
3161         row.write_string(m_send_thread_description);
3162         ndbinfo_send_row(signal, req, row, rl);
3163       }
3164 
3165       if (pos >= m_num_send_threads)
3166       {
3167         jam();
3168         break;
3169       }
3170 
3171       if (rl.need_break(req))
3172       {
3173         jam();
3174         ndbinfo_send_scan_break(signal, req, rl, pos);
3175         return;
3176       }
3177     }
3178     break;
3179   }
3180   case Ndbinfo::THREADBLOCKS_TABLEID: {
3181     Uint32 arr[NO_OF_BLOCKS];
3182     Uint32 len = mt_get_blocklist(this, arr, NDB_ARRAY_SIZE(arr));
3183     Uint32 pos = cursor->data[0];
3184     for (; ; )
3185     {
3186       Ndbinfo::Row row(signal, req);
3187       row.write_uint32(getOwnNodeId());
3188       row.write_uint32(getThreadId());             // thr_no
3189       row.write_uint32(blockToMain(arr[pos]));     // block_number
3190       row.write_uint32(blockToInstance(arr[pos])); // block_instance
3191       ndbinfo_send_row(signal, req, row, rl);
3192 
3193       pos++;
3194       if (pos == len)
3195       {
3196         jam();
3197         break;
3198       }
3199       else if (rl.need_break(req))
3200       {
3201         jam();
3202         ndbinfo_send_scan_break(signal, req, rl, pos);
3203         return;
3204       }
3205     }
3206     break;
3207   }
3208   case Ndbinfo::THREADSTAT_TABLEID:{
3209     ndb_thr_stat stat;
3210     mt_get_thr_stat(this, &stat);
3211     Ndbinfo::Row row(signal, req);
3212     row.write_uint32(getOwnNodeId());
3213     row.write_uint32(getThreadId());  // thr_no
3214     row.write_string(stat.name);
3215     row.write_uint64(stat.loop_cnt);
3216     row.write_uint64(stat.exec_cnt);
3217     row.write_uint64(stat.wait_cnt);
3218     row.write_uint64(stat.local_sent_prioa);
3219     row.write_uint64(stat.local_sent_priob);
3220     row.write_uint64(stat.remote_sent_prioa);
3221     row.write_uint64(stat.remote_sent_priob);
3222 
3223     row.write_uint64(stat.os_tid);
3224     row.write_uint64(NdbTick_CurrentMillisecond());
3225 
3226     struct ndb_rusage os_rusage;
3227     Ndb_GetRUsage(&os_rusage, false);
3228     row.write_uint64(os_rusage.ru_utime);
3229     row.write_uint64(os_rusage.ru_stime);
3230     row.write_uint64(os_rusage.ru_minflt);
3231     row.write_uint64(os_rusage.ru_majflt);
3232     row.write_uint64(os_rusage.ru_nvcsw);
3233     row.write_uint64(os_rusage.ru_nivcsw);
3234     ndbinfo_send_row(signal, req, row, rl);
3235     break;
3236   }
3237   case Ndbinfo::CPUSTAT_50MS_TABLEID:
3238   case Ndbinfo::CPUSTAT_1SEC_TABLEID:
3239   case Ndbinfo::CPUSTAT_20SEC_TABLEID:
3240   {
3241 
3242     Uint32 pos = cursor->data[0];
3243 
3244     SendThreadMeasurementPtr sendThreadMeasurementPtr;
3245     MeasurementRecordPtr measurePtr;
3246 
3247     for ( ; ; )
3248     {
3249       jam();
3250       Uint32 pos_thread_id = ((pos >> 8) & 255);
3251       Uint32 pos_index = (pos & 255);
3252       Uint32 pos_ptrI = (pos >> 16);
3253       sendThreadMeasurementPtr.i = RNIL;
3254       sendThreadMeasurementPtr.p = NULL;
3255       measurePtr.i = RNIL;
3256       measurePtr.p = NULL;
3257       if (pos_index >= NUM_MEASUREMENTS)
3258       {
3259         jam();
3260         ndbassert(false);
3261         g_eventLogger->info("pos_index out of range in ndbinfo table %u",
3262                             req.tableId);
3263         ndbinfo_send_scan_conf(signal, req, rl);
3264         return;
3265       }
3266 
3267       if (pos == 0)
3268       {
3269         /**
3270          * This is the first row to start. We start with the rows from our
3271          * own thread. The pos variable is divided in 3 fields.
3272          * Bit 0-7 contains index number from 0 up to 19.
3273          * Bit 8-15 contains thread number
3274          * Bit 16-31 is a pointer to the next SendThreadMeasurement record.
3275          *
3276          * Thread number 0 is our own thread always. Thread 1 is send thread
3277          * instance 0 and thread 2 send thread instance 1 and so forth. We
3278          * will only worry about send thread data in the main thread where
3279          * we keep track of this information.
3280          *
3281          * The latest measurement is at the end of the linked list and so we
3282          * proceed backwards in the list.
3283          */
3284         if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3285         {
3286           jam();
3287           c_next_50ms_measure.last(measurePtr);
3288         }
3289         else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3290         {
3291           jam();
3292           c_next_1sec_measure.last(measurePtr);
3293         }
3294         else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3295         {
3296           jam();
3297           c_next_20sec_measure.last(measurePtr);
3298         }
3299         else
3300         {
3301           ndbabort();
3302           return;
3303         }
3304         /* Start at index 0, thread 0, measurePtr.i */
3305         pos = measurePtr.i << 16;
3306       }
3307       else if (pos_thread_id != 0)
3308       {
3309         /**
3310          * We are working on the send thread measurement as we are the
3311          * main thread.
3312          */
3313         jam();
3314         if (instance() != MAIN_THRMAN_INSTANCE)
3315         {
3316           g_eventLogger->info("pos_thread_id = %u in non-main thread",
3317                               pos_thread_id);
3318           ndbassert(false);
3319           ndbinfo_send_scan_conf(signal, req, rl);
3320           return;
3321         }
3322         c_sendThreadMeasurementPool.getPtr(sendThreadMeasurementPtr, pos_ptrI);
3323       }
3324       else
3325       {
3326         jam();
3327         c_measurementRecordPool.getPtr(measurePtr, pos_ptrI);
3328       }
3329 
3330       Ndbinfo::Row row(signal, req);
3331       if (pos_thread_id == 0 && measurePtr.p->m_first_measure_done)
3332       {
3333         jam();
3334         /**
3335          * We report buffer_full_time, spin_time and exec_time as
3336          * separate times. So exec time does not include buffer_full_time
3337          * when we report it to the user and it also does not include
3338          * spin time when we report it to the user and finally it does
3339          * also not include send time of the thread. So essentially
3340          * the sum of exec_time, sleep_time, spin_time, send_time and
3341          * buffer_full_time should be very close to the elapsed time.
3342          */
3343         Uint32 exec_time = measurePtr.p->m_exec_time_thread;
3344         Uint32 spin_time = measurePtr.p->m_spin_time_thread;
3345         Uint32 buffer_full_time = measurePtr.p->m_buffer_full_time_thread;
3346         Uint32 send_time = measurePtr.p->m_send_time_thread;
3347 
3348         if (exec_time < (buffer_full_time + send_time + spin_time))
3349         {
3350           exec_time = 0;
3351         }
3352         else
3353         {
3354           exec_time -= buffer_full_time;
3355           exec_time -= spin_time;
3356           exec_time -= send_time;
3357         }
3358         row.write_uint32(getOwnNodeId());
3359         row.write_uint32 (getThreadId());
3360         row.write_uint32(Uint32(measurePtr.p->m_user_time_os));
3361         row.write_uint32(Uint32(measurePtr.p->m_kernel_time_os));
3362         row.write_uint32(Uint32(measurePtr.p->m_idle_time_os));
3363         row.write_uint32(Uint32(exec_time));
3364         row.write_uint32(Uint32(measurePtr.p->m_sleep_time_thread));
3365         row.write_uint32(Uint32(measurePtr.p->m_spin_time_thread));
3366         row.write_uint32(Uint32(measurePtr.p->m_send_time_thread));
3367         row.write_uint32(Uint32(measurePtr.p->m_buffer_full_time_thread));
3368         row.write_uint32(Uint32(measurePtr.p->m_elapsed_time));
3369         ndbinfo_send_row(signal, req, row, rl);
3370       }
3371       else if (pos_thread_id != 0 && sendThreadMeasurementPtr.p->m_first_measure_done)
3372       {
3373         jam();
3374         row.write_uint32(getOwnNodeId());
3375         row.write_uint32 (m_num_threads + (pos_thread_id - 1));
3376 
3377         Uint32 exec_time = sendThreadMeasurementPtr.p->m_exec_time;
3378         Uint32 sleep_time = sendThreadMeasurementPtr.p->m_sleep_time;
3379 
3380         row.write_uint32(Uint32(sendThreadMeasurementPtr.p->m_user_time_os));
3381         row.write_uint32(Uint32(sendThreadMeasurementPtr.p->m_kernel_time_os));
3382         row.write_uint32(Uint32(sendThreadMeasurementPtr.p->m_idle_time_os));
3383         row.write_uint32(exec_time);
3384         row.write_uint32(sleep_time);
3385         row.write_uint32(0);
3386         row.write_uint32(exec_time);
3387         row.write_uint32(Uint32(0));
3388         Uint32 elapsed_time =
3389           sendThreadMeasurementPtr.p->m_exec_time +
3390           sendThreadMeasurementPtr.p->m_sleep_time;
3391         row.write_uint32(elapsed_time);
3392         ndbinfo_send_row(signal, req, row, rl);
3393       }
3394       else
3395       {
3396         // Procede to next thread at first undone measurement
3397         pos_index = NUM_MEASUREMENTS - 1;
3398       }
3399 
3400       if ((pos_index + 1) == NUM_MEASUREMENTS)
3401       {
3402         /**
3403          * We are done with this thread, we need to either move on to next
3404          * send thread or stop.
3405          */
3406         if (instance() != MAIN_THRMAN_INSTANCE)
3407         {
3408           jam();
3409           break;
3410         }
3411         /* This check will also ensure that we break without send threads */
3412         if (pos_thread_id == m_num_send_threads)
3413         {
3414           jam();
3415           break;
3416         }
3417         jam();
3418         pos_thread_id++;
3419         SendThreadPtr sendThreadPtr;
3420         c_sendThreadRecordPool.getPtr(sendThreadPtr, pos_thread_id - 1);
3421 
3422         if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3423         {
3424           jam();
3425           Local_SendThreadMeasurement_fifo list_50ms(
3426             c_sendThreadMeasurementPool,
3427             sendThreadPtr.p->m_send_thread_50ms_measurements);
3428           list_50ms.last(sendThreadMeasurementPtr);
3429         }
3430         else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3431         {
3432           jam();
3433           Local_SendThreadMeasurement_fifo list_1sec(
3434             c_sendThreadMeasurementPool,
3435             sendThreadPtr.p->m_send_thread_1sec_measurements);
3436           list_1sec.last(sendThreadMeasurementPtr);
3437         }
3438         else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3439         {
3440           jam();
3441           Local_SendThreadMeasurement_fifo list_20sec(
3442             c_sendThreadMeasurementPool,
3443             sendThreadPtr.p->m_send_thread_20sec_measurements);
3444           list_20sec.last(sendThreadMeasurementPtr);
3445         }
3446         else
3447         {
3448           ndbabort();
3449           return;
3450         }
3451 
3452         pos = (sendThreadMeasurementPtr.i << 16) +
3453               (pos_thread_id << 8) +
3454               0;
3455       }
3456       else if (pos_thread_id == 0)
3457       {
3458         if (measurePtr.i == RNIL)
3459         {
3460           jam();
3461           g_eventLogger->info("measurePtr.i = RNIL");
3462           ndbassert(false);
3463           ndbinfo_send_scan_conf(signal, req, rl);
3464           return;
3465         }
3466         if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3467         {
3468           jam();
3469           c_next_50ms_measure.prev(measurePtr);
3470           if (measurePtr.i == RNIL)
3471           {
3472             jam();
3473             c_next_50ms_measure.first(measurePtr);
3474           }
3475         }
3476         else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3477         {
3478           jam();
3479           c_next_1sec_measure.prev(measurePtr);
3480           if (measurePtr.i == RNIL)
3481           {
3482             jam();
3483             c_next_1sec_measure.first(measurePtr);
3484           }
3485         }
3486         else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3487         {
3488           jam();
3489           c_next_20sec_measure.prev(measurePtr);
3490           if (measurePtr.i == RNIL)
3491           {
3492             jam();
3493             c_next_20sec_measure.first(measurePtr);
3494           }
3495         }
3496         else
3497         {
3498           ndbabort();
3499           return;
3500         }
3501         pos = (measurePtr.i << 16) +
3502               (0 << 8) +
3503               pos_index + 1;
3504       }
3505       else
3506       {
3507         SendThreadPtr sendThreadPtr;
3508         c_sendThreadRecordPool.getPtr(sendThreadPtr, pos_thread_id - 1);
3509 
3510         ndbrequire(sendThreadMeasurementPtr.i != RNIL);
3511         if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3512         {
3513           Local_SendThreadMeasurement_fifo list_50ms(
3514             c_sendThreadMeasurementPool,
3515             sendThreadPtr.p->m_send_thread_50ms_measurements);
3516           list_50ms.prev(sendThreadMeasurementPtr);
3517           if (sendThreadMeasurementPtr.i == RNIL)
3518           {
3519             jam();
3520             list_50ms.first(sendThreadMeasurementPtr);
3521           }
3522         }
3523         else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3524         {
3525           Local_SendThreadMeasurement_fifo list_1sec(
3526             c_sendThreadMeasurementPool,
3527             sendThreadPtr.p->m_send_thread_1sec_measurements);
3528           list_1sec.prev(sendThreadMeasurementPtr);
3529           if (sendThreadMeasurementPtr.i == RNIL)
3530           {
3531             jam();
3532             list_1sec.first(sendThreadMeasurementPtr);
3533           }
3534         }
3535         else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3536         {
3537           Local_SendThreadMeasurement_fifo list_20sec(
3538             c_sendThreadMeasurementPool,
3539             sendThreadPtr.p->m_send_thread_20sec_measurements);
3540           list_20sec.prev(sendThreadMeasurementPtr);
3541           if (sendThreadMeasurementPtr.i == RNIL)
3542           {
3543             jam();
3544             list_20sec.first(sendThreadMeasurementPtr);
3545           }
3546         }
3547         else
3548         {
3549           ndbabort();
3550           return;
3551         }
3552         pos = (sendThreadMeasurementPtr.i << 16) +
3553               (pos_thread_id << 8) +
3554               pos_index + 1;
3555       }
3556 
3557       if (rl.need_break(req))
3558       {
3559         jam();
3560         ndbinfo_send_scan_break(signal, req, rl, pos);
3561         return;
3562       }
3563     }
3564     break;
3565   }
3566   case Ndbinfo::CPUSTAT_TABLEID:
3567   {
3568 
3569     Uint32 pos = cursor->data[0];
3570 
3571     SendThreadMeasurementPtr sendThreadMeasurementPtr;
3572     MeasurementRecordPtr measurePtr;
3573 
3574     for ( ; ; )
3575     {
3576       if (pos == 0)
3577       {
3578         jam();
3579         MeasurementRecord measure;
3580         bool success = calculate_cpu_load_last_second(&measure);
3581         ndbrequire(success);
3582         Ndbinfo::Row row(signal, req);
3583         row.write_uint32(getOwnNodeId());
3584         row.write_uint32 (getThreadId());
3585 
3586         if (measure.m_elapsed_time)
3587         {
3588           jam();
3589           Uint64 user_os_percentage =
3590                         ((Uint64(100) *
3591                         measure.m_user_time_os) +
3592                         Uint64(500 * 1000)) /
3593                         measure.m_elapsed_time;
3594 
3595           Uint64 kernel_percentage =
3596                         ((Uint64(100) *
3597                         measure.m_kernel_time_os) +
3598                         Uint64(500 * 1000)) /
3599                         measure.m_elapsed_time;
3600 
3601           /* Ensure that total percentage reported is always 100% */
3602           if (user_os_percentage + kernel_percentage > Uint64(100))
3603           {
3604             kernel_percentage = Uint64(100) - user_os_percentage;
3605           }
3606           Uint64 idle_os_percentage =
3607             Uint64(100) - (user_os_percentage + kernel_percentage);
3608           row.write_uint32(Uint32(user_os_percentage));
3609           row.write_uint32(Uint32(kernel_percentage));
3610           row.write_uint32(Uint32(idle_os_percentage));
3611 
3612           Uint64 exec_time = measure.m_exec_time_thread;
3613           Uint64 spin_time = measure.m_spin_time_thread;
3614           Uint64 buffer_full_time = measure.m_buffer_full_time_thread;
3615           Uint64 send_time = measure.m_send_time_thread;
3616 
3617           Uint64 non_exec_time = spin_time + send_time + buffer_full_time;
3618           if (unlikely(non_exec_time > exec_time))
3619           {
3620             exec_time = 0;
3621           }
3622           else
3623           {
3624             exec_time -= non_exec_time;
3625           }
3626 
3627           Uint64 exec_percentage =
3628                         ((Uint64(100) * exec_time) +
3629                         Uint64(500 * 1000)) /
3630                         measure.m_elapsed_time;
3631 
3632           Uint64 spin_percentage =
3633                         ((Uint64(100) * spin_time) +
3634                         Uint64(500 * 1000)) /
3635                         measure.m_elapsed_time;
3636 
3637           Uint64 send_percentage =
3638                         ((Uint64(100) * send_time) +
3639                         Uint64(500 * 1000)) /
3640                         measure.m_elapsed_time;
3641 
3642           Uint64 buffer_full_percentage =
3643                         ((Uint64(100) * buffer_full_time) +
3644                         Uint64(500 * 1000)) /
3645                         measure.m_elapsed_time;
3646 
3647           /* Ensure that total percentage reported is always 100% */
3648           Uint64 exec_full_percentage = exec_percentage +
3649                                         buffer_full_percentage;
3650           Uint64 exec_full_send_percentage = exec_percentage +
3651                                              buffer_full_percentage +
3652                                              send_percentage;
3653           Uint64 all_exec_percentage = exec_percentage +
3654                                        buffer_full_percentage +
3655                                        send_percentage +
3656                                        spin_percentage;
3657           Uint64 sleep_percentage = 0;
3658           if (buffer_full_percentage > Uint64(100))
3659           {
3660             buffer_full_percentage = Uint64(100);
3661             exec_percentage = 0;
3662             send_percentage = 0;
3663             spin_percentage = 0;
3664           }
3665           else if (exec_full_percentage > Uint64(100))
3666           {
3667             exec_percentage = Uint64(100) - buffer_full_percentage;
3668             send_percentage = 0;
3669             spin_percentage = 0;
3670           }
3671           else if (exec_full_send_percentage > Uint64(100))
3672           {
3673             exec_percentage = Uint64(100) - exec_full_percentage;
3674             spin_percentage = 0;
3675           }
3676           else if (all_exec_percentage > Uint64(100))
3677           {
3678             exec_percentage = Uint64(100) - exec_full_send_percentage;
3679           }
3680           else
3681           {
3682             sleep_percentage = Uint64(100) - all_exec_percentage;
3683           }
3684           ndbrequire(exec_percentage +
3685                      buffer_full_percentage +
3686                      send_percentage +
3687                      spin_percentage +
3688                      sleep_percentage == Uint64(100));
3689 
3690           row.write_uint32(Uint32(exec_percentage));
3691           row.write_uint32(Uint32(sleep_percentage));
3692           row.write_uint32(Uint32(spin_percentage));
3693           row.write_uint32(Uint32(send_percentage));
3694           row.write_uint32(Uint32(buffer_full_percentage));
3695 
3696           row.write_uint32(Uint32(measure.m_elapsed_time));
3697         }
3698         else
3699         {
3700           jam();
3701           row.write_uint32(0);
3702           row.write_uint32(0);
3703           row.write_uint32(0);
3704           row.write_uint32(0);
3705           row.write_uint32(0);
3706           row.write_uint32(0);
3707           row.write_uint32(0);
3708           row.write_uint32(0);
3709           row.write_uint32(0);
3710           row.write_uint32(0);
3711         }
3712 
3713         ndbinfo_send_row(signal, req, row, rl);
3714         if (instance() != MAIN_THRMAN_INSTANCE ||
3715             m_num_send_threads == 0)
3716         {
3717           jam();
3718           break;
3719         }
3720         pos++;
3721       }
3722       else
3723       {
3724         /* Send thread CPU load */
3725         jam();
3726         if ((pos - 1) >= m_num_send_threads)
3727         {
3728           jam();
3729           g_eventLogger->info("send instance out of range");
3730           ndbassert(false);
3731           ndbinfo_send_scan_conf(signal, req, rl);
3732           return;
3733         }
3734         SendThreadMeasurement measure;
3735         bool success = calculate_send_thread_load_last_second(pos - 1,
3736                                                               &measure);
3737         if (!success)
3738         {
3739           g_eventLogger->info("Failed calculate_send_thread_load_last_second");
3740           ndbassert(false);
3741           ndbinfo_send_scan_conf(signal, req, rl);
3742           return;
3743         }
3744         Ndbinfo::Row row(signal, req);
3745         row.write_uint32(getOwnNodeId());
3746         row.write_uint32 (m_num_threads + (pos - 1));
3747 
3748         if (measure.m_elapsed_time_os == 0)
3749         {
3750           jam();
3751           row.write_uint32(0);
3752           row.write_uint32(0);
3753           row.write_uint32(0);
3754         }
3755         else
3756         {
3757           Uint64 user_time_os_percentage = ((Uint64(100) *
3758                       measure.m_user_time_os) +
3759                       Uint64(500 * 1000)) /
3760                       measure.m_elapsed_time_os;
3761 
3762           row.write_uint32(Uint32(user_time_os_percentage));
3763 
3764           Uint64 kernel_time_os_percentage = ((Uint64(100) *
3765                       measure.m_kernel_time_os) +
3766                       Uint64(500 * 1000)) /
3767                       measure.m_elapsed_time_os;
3768 
3769           row.write_uint32(Uint32(kernel_time_os_percentage));
3770 
3771           Uint64 idle_time_os_percentage = ((Uint64(100) *
3772                       measure.m_idle_time_os) +
3773                       Uint64(500 * 1000)) /
3774                       measure.m_elapsed_time_os;
3775 
3776           row.write_uint32(Uint32(idle_time_os_percentage));
3777         }
3778 
3779         if (measure.m_elapsed_time > 0)
3780         {
3781           Uint64 exec_time = measure.m_exec_time;
3782           Uint64 spin_time = measure.m_spin_time;
3783           Uint64 sleep_time = measure.m_sleep_time;
3784 
3785           exec_time -= spin_time;
3786 
3787           Uint64 exec_percentage = ((Uint64(100) * exec_time) +
3788                       Uint64(500 * 1000)) /
3789                       measure.m_elapsed_time;
3790 
3791           Uint64 sleep_percentage = ((Uint64(100) * sleep_time) +
3792                       Uint64(500 * 1000)) /
3793                       measure.m_elapsed_time;
3794 
3795           Uint64 spin_percentage = ((Uint64(100) * spin_time) +
3796                       Uint64(500 * 1000)) /
3797                       measure.m_elapsed_time;
3798 
3799           row.write_uint32(Uint32(exec_percentage));
3800           row.write_uint32(Uint32(sleep_percentage));
3801           row.write_uint32(Uint32(spin_percentage));
3802           row.write_uint32(Uint32(exec_percentage));
3803           row.write_uint32(Uint32(0));
3804           row.write_uint32(Uint32(measure.m_elapsed_time));
3805         }
3806         else
3807         {
3808           jam();
3809           row.write_uint32(0);
3810           row.write_uint32(0);
3811           row.write_uint32(0);
3812           row.write_uint32(0);
3813           row.write_uint32(0);
3814           row.write_uint32(0);
3815         }
3816         ndbinfo_send_row(signal, req, row, rl);
3817 
3818         if (pos == m_num_send_threads)
3819         {
3820           jam();
3821           break;
3822         }
3823         pos++;
3824       }
3825       if (rl.need_break(req))
3826       {
3827         jam();
3828         ndbinfo_send_scan_break(signal, req, rl, pos);
3829         return;
3830       }
3831     }
3832     break;
3833   }
3834   default:
3835     break;
3836   }
3837 
3838   ndbinfo_send_scan_conf(signal, req, rl);
3839 }
3840 
3841 static void
release_wait_freeze()3842 release_wait_freeze()
3843 {
3844   NdbMutex_Lock(g_freeze_mutex);
3845   g_freeze_waiters--;
3846   if (g_freeze_waiters == 0)
3847   {
3848     g_freeze_wakeup = false;
3849   }
3850   NdbMutex_Unlock(g_freeze_mutex);
3851 }
3852 
3853 static Uint32
check_freeze_waiters()3854 check_freeze_waiters()
3855 {
3856   NdbMutex_Lock(g_freeze_mutex);
3857   Uint32 sync_waiters = g_freeze_waiters;
3858   NdbMutex_Unlock(g_freeze_mutex);
3859   return sync_waiters;
3860 }
3861 
3862 void
execFREEZE_THREAD_REQ(Signal * signal)3863 Thrman::execFREEZE_THREAD_REQ(Signal *signal)
3864 {
3865   FreezeThreadReq* req = (FreezeThreadReq*)&signal->theData[0];
3866   m_freeze_req = *req;
3867   /**
3868    * We are requested to stop executing in this thread here. When all
3869    * threads have stopped here we are ready to perform the change
3870    * operation requested.
3871    *
3872    * The current change operations supported are:
3873    * Switch between inactive transporters and active transporters.
3874    * This is used when we increase the number of transporters on a link
3875    * from a single transporter to multiple transporters sharing the
3876    * load. It is important synchronize this to ensure that signals
3877    * continue to arrive to the destination threads in signal order.
3878    */
3879   if (instance() != 1)
3880   {
3881     flush_send_buffers();
3882     wait_freeze(false);
3883     return;
3884   }
3885   wait_freeze(true);
3886   wait_all_stop(signal);
3887 }
3888 
3889 void
wait_freeze(bool ret)3890 Thrman::wait_freeze(bool ret)
3891 {
3892   NdbMutex_Lock(g_freeze_mutex);
3893   g_freeze_waiters++;
3894   if (ret)
3895   {
3896     NdbMutex_Unlock(g_freeze_mutex);
3897     jam();
3898     return;
3899   }
3900   while (true)
3901   {
3902     NdbCondition_WaitTimeout(g_freeze_condition,
3903                              g_freeze_mutex,
3904                              10);
3905     set_watchdog_counter();
3906     if (g_freeze_wakeup)
3907     {
3908       g_freeze_waiters--;
3909       if (g_freeze_waiters == 0)
3910       {
3911         g_freeze_wakeup = false;
3912       }
3913       NdbMutex_Unlock(g_freeze_mutex);
3914       jam();
3915       return;
3916     }
3917   }
3918   return;
3919 }
3920 
3921 void
wait_all_stop(Signal * signal)3922 Thrman::wait_all_stop(Signal *signal)
3923 {
3924   if (check_freeze_waiters() == m_num_threads)
3925   {
3926     jam();
3927     FreezeActionReq* req = CAST_PTR(FreezeActionReq, signal->getDataPtrSend());
3928     BlockReference ref = m_freeze_req.senderRef;
3929     req->nodeId = m_freeze_req.nodeId;
3930     req->senderRef = reference();
3931     sendSignal(ref, GSN_FREEZE_ACTION_REQ, signal,
3932                FreezeActionReq::SignalLength, JBA);
3933     return;
3934   }
3935   signal->theData[0] = ZWAIT_ALL_STOP;
3936   sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
3937 }
3938 
3939 void
execFREEZE_ACTION_CONF(Signal * signal)3940 Thrman::execFREEZE_ACTION_CONF(Signal *signal)
3941 {
3942   /**
3943    * The action is performed, we have completed this action.
3944    * We can now release all threads and ensure that they are
3945    * woken up again. We wait for all threads to wakeup before
3946    * we proceed to ensure that the functionality is available
3947    * for a new synchronize action.
3948    */
3949   NdbMutex_Lock(g_freeze_mutex);
3950   g_freeze_wakeup = true;
3951   NdbCondition_Broadcast(g_freeze_condition);
3952   NdbMutex_Unlock(g_freeze_mutex);
3953   release_wait_freeze();
3954   wait_all_start(signal);
3955 }
3956 
wait_all_start(Signal * signal)3957 void Thrman::wait_all_start(Signal *signal)
3958 {
3959   if (check_freeze_waiters() == 0)
3960   {
3961     jam();
3962     FreezeThreadConf* conf = CAST_PTR(FreezeThreadConf, signal->getDataPtrSend());
3963     BlockReference ref = m_freeze_req.senderRef;
3964     conf->nodeId = m_freeze_req.nodeId;
3965     sendSignal(ref, GSN_FREEZE_THREAD_CONF, signal,
3966                FreezeThreadConf::SignalLength, JBA);
3967     return;
3968   }
3969   signal->theData[0] = ZWAIT_ALL_START;
3970   sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
3971 }
3972 
3973 void
execDUMP_STATE_ORD(Signal * signal)3974 Thrman::execDUMP_STATE_ORD(Signal *signal)
3975 {
3976   DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0];
3977   Uint32 arg = dumpState->args[0];
3978   Uint32 val1 = dumpState->args[1];
3979   if (arg == DumpStateOrd::SetSchedulerSpinTimerAll)
3980   {
3981     if (signal->length() != 2)
3982     {
3983       if (instance() == MAIN_THRMAN_INSTANCE)
3984       {
3985         g_eventLogger->info("Use: DUMP 104000 spintime");
3986       }
3987       return;
3988     }
3989     set_configured_spintime(val1, false);
3990   }
3991   else if (arg == DumpStateOrd::SetSchedulerSpinTimerThread)
3992   {
3993     if (signal->length() != 3)
3994     {
3995       if (instance() == MAIN_THRMAN_INSTANCE)
3996       {
3997         g_eventLogger->info("Use: DUMP 104001 thr_no spintime");
3998       }
3999       return;
4000     }
4001     Uint32 val2 = dumpState->args[2];
4002     if (val1 + 1 == instance())
4003     {
4004       jam();
4005       set_configured_spintime(val2, true);
4006     }
4007   }
4008   else if (arg == DumpStateOrd::SetAllowedSpinOverhead)
4009   {
4010     if (signal->length() != 2)
4011     {
4012       if (instance() == MAIN_THRMAN_INSTANCE)
4013       {
4014         g_eventLogger->info("Use: DUMP 104002 AllowedSpinOverhead");
4015       }
4016       return;
4017     }
4018     set_allowed_spin_overhead(val1);
4019   }
4020   else if (arg == DumpStateOrd::SetSpintimePerCall)
4021   {
4022     if (signal->length() != 2)
4023     {
4024       if (instance() == MAIN_THRMAN_INSTANCE)
4025       {
4026         g_eventLogger->info("Use: DUMP 104003 SpintimePerCall");
4027       }
4028       return;
4029     }
4030     set_spintime_per_call(val1);
4031   }
4032   else if (arg == DumpStateOrd::EnableAdaptiveSpinning)
4033   {
4034     if (signal->length() != 2)
4035     {
4036       if (instance() == MAIN_THRMAN_INSTANCE)
4037       {
4038         g_eventLogger->info("Use: DUMP 104004 0/1"
4039                             " (Enable/Disable Adaptive Spinning");
4040       }
4041       return;
4042     }
4043     set_enable_adaptive_spinning(val1 != 0);
4044   }
4045   return;
4046 }
4047 
ThrmanProxy(Block_context & ctx)4048 ThrmanProxy::ThrmanProxy(Block_context & ctx) :
4049   LocalProxy(THRMAN, ctx)
4050 {
4051   addRecSignal(GSN_FREEZE_THREAD_REQ, &ThrmanProxy::execFREEZE_THREAD_REQ);
4052 }
4053 
~ThrmanProxy()4054 ThrmanProxy::~ThrmanProxy()
4055 {
4056 }
4057 
4058 SimulatedBlock*
newWorker(Uint32 instanceNo)4059 ThrmanProxy::newWorker(Uint32 instanceNo)
4060 {
4061   return new Thrman(m_ctx, instanceNo);
4062 }
4063 
BLOCK_FUNCTIONS(ThrmanProxy)4064 BLOCK_FUNCTIONS(ThrmanProxy)
4065 
4066 void
4067 ThrmanProxy::execFREEZE_THREAD_REQ(Signal* signal)
4068 {
4069   /**
4070    * This signal is always sent from the main thread. Thus we should not
4071    * send the signal to the first instance in THRMAN which is the main
4072    * thread since this would block the main thread from moving forward.
4073    *
4074    * The work to be done is done by the main thread, the other threads
4075    * only need to stop and wait to be woken up again to proceed with
4076    * normal processing.
4077    */
4078   for (Uint32 i = 0; i < c_workers; i++)
4079   {
4080     jam();
4081     Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
4082     sendSignal(ref, GSN_FREEZE_THREAD_REQ, signal, signal->getLength(), JBA);
4083   }
4084 }
4085