1 /*
2 Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "thrman.hpp"
26 #include <mt.hpp>
27 #include <signaldata/DbinfoScan.hpp>
28 #include <signaldata/Sync.hpp>
29 #include <signaldata/DumpStateOrd.hpp>
30
31 #include <EventLogger.hpp>
32 #include <NdbSpin.h>
33
34 #define JAM_FILE_ID 440
35
36 #define MAIN_THRMAN_INSTANCE 1
37 #define NUM_MEASUREMENTS 20
38 #define NUM_MEASUREMENT_RECORDS (3 * NUM_MEASUREMENTS)
39
40 static NdbMutex *g_freeze_mutex = 0;
41 static NdbCondition *g_freeze_condition = 0;
42 static Uint32 g_freeze_waiters = 0;
43 static bool g_freeze_wakeup = 0;
44
45 //#define DEBUG_SPIN 1
46 #ifdef DEBUG_SPIN
47 #define DEB_SPIN(arglist) do { g_eventLogger->info arglist ; } while (0)
48 #else
49 #define DEB_SPIN(arglist) do { } while (0)
50 #endif
51
52 //define HIGH_DEBUG_CPU_USAGE 1
53 //#define DEBUG_CPU_USAGE 1
54 extern EventLogger * g_eventLogger;
55
Thrman(Block_context & ctx,Uint32 instanceno)56 Thrman::Thrman(Block_context & ctx, Uint32 instanceno) :
57 SimulatedBlock(THRMAN, ctx, instanceno),
58 c_next_50ms_measure(c_measurementRecordPool),
59 c_next_1sec_measure(c_measurementRecordPool),
60 c_next_20sec_measure(c_measurementRecordPool)
61 {
62 BLOCK_CONSTRUCTOR(Thrman);
63
64 if (g_freeze_mutex == 0)
65 {
66 g_freeze_mutex = NdbMutex_Create();
67 g_freeze_condition = NdbCondition_Create();
68 }
69 addRecSignal(GSN_DBINFO_SCANREQ, &Thrman::execDBINFO_SCANREQ);
70 addRecSignal(GSN_CONTINUEB, &Thrman::execCONTINUEB);
71 addRecSignal(GSN_GET_CPU_USAGE_REQ, &Thrman::execGET_CPU_USAGE_REQ);
72 addRecSignal(GSN_OVERLOAD_STATUS_REP, &Thrman::execOVERLOAD_STATUS_REP);
73 addRecSignal(GSN_NODE_OVERLOAD_STATUS_ORD, &Thrman::execNODE_OVERLOAD_STATUS_ORD);
74 addRecSignal(GSN_READ_CONFIG_REQ, &Thrman::execREAD_CONFIG_REQ);
75 addRecSignal(GSN_SEND_THREAD_STATUS_REP, &Thrman::execSEND_THREAD_STATUS_REP);
76 addRecSignal(GSN_SET_WAKEUP_THREAD_ORD, &Thrman::execSET_WAKEUP_THREAD_ORD);
77 addRecSignal(GSN_WAKEUP_THREAD_ORD, &Thrman::execWAKEUP_THREAD_ORD);
78 addRecSignal(GSN_SEND_WAKEUP_THREAD_ORD, &Thrman::execSEND_WAKEUP_THREAD_ORD);
79 addRecSignal(GSN_FREEZE_THREAD_REQ, &Thrman::execFREEZE_THREAD_REQ);
80 addRecSignal(GSN_FREEZE_ACTION_CONF, &Thrman::execFREEZE_ACTION_CONF);
81 addRecSignal(GSN_STTOR, &Thrman::execSTTOR);
82 addRecSignal(GSN_MEASURE_WAKEUP_TIME_ORD, &Thrman::execMEASURE_WAKEUP_TIME_ORD);
83 addRecSignal(GSN_DUMP_STATE_ORD, &Thrman::execDUMP_STATE_ORD);
84
85 m_enable_adaptive_spinning = false;
86 m_allowed_spin_overhead = 130;
87 m_phase2_done = false;
88 m_is_idle = true;
89 }
90
~Thrman()91 Thrman::~Thrman()
92 {
93 if (g_freeze_mutex != 0)
94 {
95 NdbMutex_Destroy(g_freeze_mutex);
96 NdbCondition_Destroy(g_freeze_condition);
97 g_freeze_mutex = 0;
98 g_freeze_condition = 0;
99 g_freeze_waiters = 0;
100 g_freeze_wakeup = false;
101 }
102 }
103
BLOCK_FUNCTIONS(Thrman)104 BLOCK_FUNCTIONS(Thrman)
105
106 void Thrman::mark_measurements_not_done()
107 {
108 MeasurementRecordPtr measurePtr;
109 jam();
110 c_next_50ms_measure.first(measurePtr);
111 while (measurePtr.i != RNIL)
112 {
113 measurePtr.p->m_first_measure_done = false;
114 c_next_50ms_measure.next(measurePtr);
115 }
116 c_next_1sec_measure.first(measurePtr);
117 while (measurePtr.i != RNIL)
118 {
119 measurePtr.p->m_first_measure_done = false;
120 c_next_1sec_measure.next(measurePtr);
121 }
122 c_next_20sec_measure.first(measurePtr);
123 while (measurePtr.i != RNIL)
124 {
125 measurePtr.p->m_first_measure_done = false;
126 c_next_20sec_measure.next(measurePtr);
127 }
128 }
129
130 void
set_configured_spintime(Uint32 val,bool specific)131 Thrman::set_configured_spintime(Uint32 val, bool specific)
132 {
133 if (!NdbSpin_is_supported())
134 {
135 return;
136 }
137 if (val > MAX_SPIN_TIME)
138 {
139 if (specific ||
140 instance() == MAIN_THRMAN_INSTANCE)
141 {
142 g_eventLogger->info("(%u)Attempt to set spintime > 500 not possible",
143 instance());
144 }
145 return;
146 }
147 g_eventLogger->info("(%u)Setting spintime to %u",
148 instance(),
149 val);
150
151 m_configured_spintime = val;
152 if (val == 0)
153 {
154 jam();
155 setSpintime(val);
156 return;
157 }
158 else if (!m_enable_adaptive_spinning)
159 {
160 jam();
161 setSpintime(val);
162 }
163 }
164
165 void
set_allowed_spin_overhead(Uint32 val)166 Thrman::set_allowed_spin_overhead(Uint32 val)
167 {
168 if (val > MAX_SPIN_OVERHEAD)
169 {
170 if (instance() == MAIN_THRMAN_INSTANCE)
171 {
172 g_eventLogger->info("AllowedSpinOverhead is max 10000");
173 }
174 return;
175 }
176 Uint32 add_val = 0;
177 if (val > 100)
178 {
179 add_val = val - 100;
180 val = 100;
181 }
182 /**
183 * At low allowed spin overhead it makes more sense to spend time
184 * spinning in recv thread since we have many more wakeups that can
185 * gain from spinning in this thread.
186 *
187 * As we increase the allowed spin overhead we will have more and more
188 * benefits of spinning also in TC threads.
189 *
190 * At very high allowed overhead it becomes essential to also grab the
191 * wait states in the LDM threads and thus give back the allowed
192 * overhead to them.
193 */
194 if (m_recv_thread)
195 {
196 jam();
197 val *= 3;
198 val /= 2;
199 add_val *= 8;
200 add_val /= 10;
201 m_allowed_spin_overhead = val + add_val + 150;
202 }
203 else if (m_tc_thread)
204 {
205 jam();
206 add_val *= 9;
207 add_val /= 10;
208 m_allowed_spin_overhead = val + add_val + 140;
209 }
210 else if (m_ldm_thread)
211 {
212 jam();
213 val *= 2;
214 val /= 3;
215 add_val *= 12;
216 add_val /= 10;
217 m_allowed_spin_overhead = val + add_val + 120;
218 }
219 else
220 {
221 jam();
222 m_allowed_spin_overhead = val + 130;
223 }
224 g_eventLogger->debug("(%u) Setting AllowedSpinOverhead to %u",
225 instance(),
226 m_allowed_spin_overhead);
227 }
228
229 void
set_enable_adaptive_spinning(bool val)230 Thrman::set_enable_adaptive_spinning(bool val)
231 {
232 m_enable_adaptive_spinning = val;
233 setSpintime(m_configured_spintime);
234 if (instance() == MAIN_THRMAN_INSTANCE)
235 {
236 g_eventLogger->info("(%u) %s adaptive spinning",
237 instance(),
238 val ? "Enable" : "Disable");
239 }
240 }
241
242 void
set_spintime_per_call(Uint32 val)243 Thrman::set_spintime_per_call(Uint32 val)
244 {
245 if (instance() == MAIN_THRMAN_INSTANCE)
246 {
247 if (val < MIN_SPINTIME_PER_CALL || val > MAX_SPINTIME_PER_CALL)
248 {
249 g_eventLogger->info("SpintimePerCall can only be set between"
250 " 300 and 8000");
251 return;
252 }
253 NdbSpin_Change(val);
254 g_eventLogger->info("SpintimePerCall set to %u", val);
255 }
256 }
257
execREAD_CONFIG_REQ(Signal * signal)258 void Thrman::execREAD_CONFIG_REQ(Signal *signal)
259 {
260 jamEntry();
261
262 /* Receive signal */
263 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
264 Uint32 ref = req->senderRef;
265 Uint32 senderData = req->senderData;
266
267 m_thread_name = getThreadName();
268 m_recv_thread = false;
269 m_ldm_thread = false;
270 m_tc_thread = false;
271 m_spin_time_change_count = 0;
272 if (strcmp(m_thread_name, "recv") == 0)
273 {
274 m_recv_thread = true;
275 }
276 if (strcmp(m_thread_name, "tc") == 0)
277 {
278 m_tc_thread = true;
279 }
280 if (strcmp(m_thread_name, "ldm") == 0)
281 {
282 m_ldm_thread = true;
283 }
284 m_thread_description = getThreadDescription();
285 m_send_thread_name = "send";
286 m_send_thread_description = "Send thread";
287 m_enable_adaptive_spinning = false;
288
289 if (NdbSpin_is_supported())
290 {
291 const char *conf = 0;
292 Uint32 val = 0;
293 const ndb_mgm_configuration_iterator * p =
294 m_ctx.m_config.getOwnConfigIterator();
295 ndbrequire(p != 0);
296 if (!ndb_mgm_get_string_parameter(p, CFG_DB_SPIN_METHOD, &conf))
297 {
298 jam();
299 if (native_strcasecmp(conf, "staticspinning"))
300 {
301 if (instance() == MAIN_THRMAN_INSTANCE)
302 {
303 g_eventLogger->info("Using StaticSpinning according to spintime"
304 " configuration");
305 }
306 }
307 else if (native_strcasecmp(conf, "costbasedspinning"))
308 {
309 if (instance() == MAIN_THRMAN_INSTANCE)
310 {
311 g_eventLogger->info("Using CostBasedSpinning with max spintime = 100"
312 " and allowed spin overhead 70 percent");
313 }
314 val = 200;
315 m_enable_adaptive_spinning = true;
316 m_configured_spintime = 100;
317 }
318 else if (native_strcasecmp(conf, "latencyoptimisedspinning"))
319 {
320 if (instance() == MAIN_THRMAN_INSTANCE)
321 {
322 g_eventLogger->info("Using LatencyOptimisedSpinning with max"
323 " spintime = 200 and allowed spin"
324 " overhead 1000 percent");
325 }
326 val = 1000;
327 m_enable_adaptive_spinning = true;
328 m_configured_spintime = 200;
329 }
330 else if (native_strcasecmp(conf, "databasemachinespinning"))
331 {
332 if (instance() == MAIN_THRMAN_INSTANCE)
333 {
334 g_eventLogger->info("Using DatabaseMachineSpinning with max"
335 " spintime = 500 and"
336 " allowed spin overhead 10000 percent");
337 }
338 val = 10000;
339 m_enable_adaptive_spinning = true;
340 m_configured_spintime = MAX_SPIN_TIME;
341 }
342 else
343 {
344 g_eventLogger->info("SpinMethod set to %s, ignored this use either "
345 "StaticSpinning, CostBasedSpinning, "
346 "AggressiveSpinning or DatabaseMachineSpinning"
347 ", falling back to default StaticSpinning",
348 conf);
349 }
350 }
351 else
352 {
353 m_enable_adaptive_spinning = false;
354 }
355 /**
356 * A spin overhead of 0% means that we will spin if it costs 30% more CPU
357 * to gain the earned latency. For example if we by spinning 1300 us can
358 * gain 1000 us in latency we will always treat this as something we
359 * consider as no overhead at all. The reason is that we while spinning
360 * don't use the CPU at full speed, thus other hyperthreads or other CPU
361 * cores will have more access to CPU core parts and to the memory
362 * subsystem in the CPU.
363 * By default we will even spend an extra 70% of CPU overhead to gain
364 * the desired latency gains.
365 *
366 * Most of the long work is done by the LDM threads. These threads work
367 * for a longer time. The receive thread and the TC threads usually
368 * handle small execution times, but very many of them. This means
369 * that for spinning it is more useful to spin on the recv threads
370 * and on the tc threads.
371 *
372 * What this means is that most of the overhead that the user have
373 * configured will be used for spinning in the recv thread and tc
374 * threads.
375 *
376 * High overhead we treat a bit different. Since most gain for small
377 * overhead comes from receive thread and tc thread, the gain with
378 * high overhead instead comes from the ldm thread. So we give the
379 * ldm thread higher weight for high overhead values. The highest
380 * overhead configurable is 800 and will give the allowed spin overhead
381 * for recv thread to be 860, for tc thread it will 870 and for
382 * the ldm thread it will be 1010.
383 *
384 * This high overhead I will refer to as the database machine mode.
385 * It means that we expect the OS to not have to be involved in the
386 * database thread operation, thus running more or less 100% load
387 * even at low concurrency is ok, this mode also requires setting
388 * the SchedulerSpinTimer to its maximum value 500.
389 */
390 set_allowed_spin_overhead(val);
391 }
392
393 /**
394 * Allocate the 60 records needed for 3 lists with 20 measurements in each
395 * list. We keep track of the last second with high resolution of 50 millis
396 * between each measurement, we also keep track of longer back for 20
397 * seconds where we have one measurement per second and finally we also
398 * keep track of long-term statistics going back more than 6 minutes.
399 * We could go back longer or have a higher resolution, but at the moment
400 * it seems a bit unnecessary. We could go back further if we are going to
401 * implement approaches more based on statistics and also finding patterns
402 * of change.
403 */
404 m_num_send_threads = getNumSendThreads();
405 m_num_threads = getNumThreads();
406
407 c_measurementRecordPool.setSize(NUM_MEASUREMENT_RECORDS);
408 if (instance() == MAIN_THRMAN_INSTANCE)
409 {
410 jam();
411 c_sendThreadRecordPool.setSize(m_num_send_threads);
412 c_sendThreadMeasurementPool.setSize(NUM_MEASUREMENT_RECORDS *
413 m_num_send_threads);
414 }
415 else
416 {
417 jam();
418 c_sendThreadRecordPool.setSize(0);
419 c_sendThreadMeasurementPool.setSize(0);
420 }
421
422 /* Create the 3 lists with 20 records in each. */
423 MeasurementRecordPtr measurePtr;
424 for (Uint32 i = 0; i < NUM_MEASUREMENTS; i++)
425 {
426 jam();
427 c_measurementRecordPool.seize(measurePtr);
428 measurePtr.p = new (measurePtr.p) MeasurementRecord();
429 c_next_50ms_measure.addFirst(measurePtr);
430 c_measurementRecordPool.seize(measurePtr);
431 measurePtr.p = new (measurePtr.p) MeasurementRecord();
432 c_next_1sec_measure.addFirst(measurePtr);
433 c_measurementRecordPool.seize(measurePtr);
434 measurePtr.p = new (measurePtr.p) MeasurementRecord();
435 c_next_20sec_measure.addFirst(measurePtr);
436 }
437 if (instance() == MAIN_THRMAN_INSTANCE)
438 {
439 jam();
440 for (Uint32 send_instance = 0;
441 send_instance < m_num_send_threads;
442 send_instance++)
443 {
444 jam();
445 SendThreadPtr sendThreadPtr;
446 c_sendThreadRecordPool.seizeId(sendThreadPtr, send_instance);
447 sendThreadPtr.p = new (sendThreadPtr.p) SendThreadRecord();
448 sendThreadPtr.p->m_send_thread_50ms_measurements.init();
449 sendThreadPtr.p->m_send_thread_1sec_measurements.init();
450 sendThreadPtr.p->m_send_thread_20sec_measurements.init();
451
452 for (Uint32 i = 0; i < NUM_MEASUREMENTS; i++)
453 {
454 jam();
455 SendThreadMeasurementPtr sendThreadMeasurementPtr;
456
457 c_sendThreadMeasurementPool.seize(sendThreadMeasurementPtr);
458 sendThreadMeasurementPtr.p =
459 new (sendThreadMeasurementPtr.p) SendThreadMeasurement();
460 {
461 jam();
462 Local_SendThreadMeasurement_fifo list_50ms(
463 c_sendThreadMeasurementPool,
464 sendThreadPtr.p->m_send_thread_50ms_measurements);
465 list_50ms.addFirst(sendThreadMeasurementPtr);
466 }
467
468 c_sendThreadMeasurementPool.seize(sendThreadMeasurementPtr);
469 sendThreadMeasurementPtr.p =
470 new (sendThreadMeasurementPtr.p) SendThreadMeasurement();
471 {
472 jam();
473 Local_SendThreadMeasurement_fifo list_1sec(
474 c_sendThreadMeasurementPool,
475 sendThreadPtr.p->m_send_thread_1sec_measurements);
476 list_1sec.addFirst(sendThreadMeasurementPtr);
477 }
478
479 c_sendThreadMeasurementPool.seize(sendThreadMeasurementPtr);
480 sendThreadMeasurementPtr.p =
481 new (sendThreadMeasurementPtr.p) SendThreadMeasurement();
482 {
483 jam();
484 Local_SendThreadMeasurement_fifo list_20sec(
485 c_sendThreadMeasurementPool,
486 sendThreadPtr.p->m_send_thread_20sec_measurements);
487 list_20sec.addFirst(sendThreadMeasurementPtr);
488 }
489 }
490 }
491 }
492
493 mark_measurements_not_done();
494 /* Send return signal */
495 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
496 conf->senderRef = reference();
497 conf->senderData = senderData;
498 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
499 ReadConfigConf::SignalLength, JBB);
500 }
501
502 void
execSTTOR(Signal * signal)503 Thrman::execSTTOR(Signal *signal)
504 {
505 int res;
506 jamEntry();
507
508 const Uint32 startPhase = signal->theData[1];
509
510 switch (startPhase)
511 {
512 case 1:
513 jam();
514 memset(&m_last_50ms_base_measure, 0, sizeof(m_last_50ms_base_measure));
515 memset(&m_last_1sec_base_measure, 0, sizeof(m_last_1sec_base_measure));
516 memset(&m_last_20sec_base_measure, 0, sizeof(m_last_20sec_base_measure));
517 memset(&m_last_50ms_base_measure, 0, sizeof(m_last_50ms_rusage));
518 memset(&m_last_1sec_base_measure, 0, sizeof(m_last_1sec_rusage));
519 memset(&m_last_20sec_base_measure, 0, sizeof(m_last_20sec_rusage));
520 prev_50ms_tick = NdbTick_getCurrentTicks();
521 prev_20sec_tick = prev_50ms_tick;
522 prev_1sec_tick = prev_50ms_tick;
523 m_configured_spintime = getConfiguredSpintime();
524 m_current_spintime = 0;
525 m_gain_spintime_in_us = 25;
526
527 /* Initialise overload control variables */
528 m_shared_environment = false;
529 m_overload_handling_activated = false;
530 m_current_overload_status = (OverloadStatus)LIGHT_LOAD_CONST;
531 m_warning_level = 0;
532 m_max_warning_level = 20;
533 m_burstiness = 0;
534 m_current_decision_stats = &c_1sec_stats;
535 m_send_thread_percentage = 0;
536 m_node_overload_level = 0;
537
538 for (Uint32 i = 0; i < MAX_BLOCK_THREADS + 1; i++)
539 {
540 m_thread_overload_status[i].overload_status =
541 (OverloadStatus)MEDIUM_LOAD_CONST;
542 m_thread_overload_status[i].wakeup_instance = 0;
543 }
544
545 /* Initialise measurements */
546 res = Ndb_GetRUsage(&m_last_50ms_rusage, false);
547 if (res == 0)
548 {
549 jam();
550 m_last_1sec_rusage = m_last_50ms_rusage;
551 m_last_20sec_rusage = m_last_50ms_rusage;
552 }
553 getPerformanceTimers(m_last_50ms_base_measure.m_sleep_time_thread,
554 m_last_50ms_base_measure.m_spin_time_thread,
555 m_last_50ms_base_measure.m_buffer_full_time_thread,
556 m_last_50ms_base_measure.m_send_time_thread);
557 m_last_1sec_base_measure = m_last_50ms_base_measure;
558 m_last_20sec_base_measure = m_last_50ms_base_measure;
559
560 if (instance() == MAIN_THRMAN_INSTANCE)
561 {
562 jam();
563 for (Uint32 send_instance = 0;
564 send_instance < m_num_send_threads;
565 send_instance++)
566 {
567 jam();
568 SendThreadPtr sendThreadPtr;
569 c_sendThreadRecordPool.getPtr(sendThreadPtr, send_instance);
570 Uint64 send_exec_time;
571 Uint64 send_sleep_time;
572 Uint64 send_spin_time;
573 Uint64 send_user_time_os;
574 Uint64 send_kernel_time_os;
575 Uint64 send_elapsed_time_os;
576 getSendPerformanceTimers(send_instance,
577 send_exec_time,
578 send_sleep_time,
579 send_spin_time,
580 send_user_time_os,
581 send_kernel_time_os,
582 send_elapsed_time_os);
583
584 sendThreadPtr.p->m_last_50ms_send_thread_measure.m_exec_time =
585 send_exec_time;
586 sendThreadPtr.p->m_last_50ms_send_thread_measure.m_sleep_time =
587 send_sleep_time;
588 sendThreadPtr.p->m_last_50ms_send_thread_measure.m_spin_time =
589 send_spin_time;
590 sendThreadPtr.p->m_last_50ms_send_thread_measure.m_user_time_os =
591 send_user_time_os;
592 sendThreadPtr.p->m_last_50ms_send_thread_measure.m_kernel_time_os =
593 send_kernel_time_os;
594 sendThreadPtr.p->m_last_50ms_send_thread_measure.m_elapsed_time_os =
595 send_elapsed_time_os;
596
597 sendThreadPtr.p->m_last_1sec_send_thread_measure.m_exec_time =
598 send_exec_time;
599 sendThreadPtr.p->m_last_1sec_send_thread_measure.m_sleep_time =
600 send_sleep_time;
601 sendThreadPtr.p->m_last_1sec_send_thread_measure.m_spin_time =
602 send_spin_time;
603 sendThreadPtr.p->m_last_1sec_send_thread_measure.m_user_time_os =
604 send_user_time_os;
605 sendThreadPtr.p->m_last_1sec_send_thread_measure.m_kernel_time_os =
606 send_kernel_time_os;
607 sendThreadPtr.p->m_last_1sec_send_thread_measure.m_elapsed_time_os =
608 send_elapsed_time_os;
609
610 sendThreadPtr.p->m_last_20sec_send_thread_measure.m_exec_time =
611 send_exec_time;
612 sendThreadPtr.p->m_last_20sec_send_thread_measure.m_sleep_time =
613 send_sleep_time;
614 sendThreadPtr.p->m_last_20sec_send_thread_measure.m_spin_time =
615 send_spin_time;
616 sendThreadPtr.p->m_last_20sec_send_thread_measure.m_user_time_os =
617 send_user_time_os;
618 sendThreadPtr.p->m_last_20sec_send_thread_measure.m_kernel_time_os =
619 send_kernel_time_os;
620 sendThreadPtr.p->m_last_20sec_send_thread_measure.m_elapsed_time_os =
621 send_elapsed_time_os;
622 }
623 }
624 if (instance() == MAIN_THRMAN_INSTANCE)
625 {
626 if (getNumThreads() > 1 && NdbSpin_is_supported())
627 {
628 jam();
629 measure_wakeup_time(signal, 0);
630 }
631 else
632 {
633 jam();
634 if (NdbSpin_is_supported())
635 {
636 g_eventLogger->info("Set wakeup latency to 25 microseconds in"
637 " single thread environment");
638 }
639 setWakeupLatency(m_gain_spintime_in_us);
640 sendSTTORRY(signal, false);
641 }
642 sendNextCONTINUEB(signal, 50, ZCONTINUEB_MEASURE_CPU_USAGE);
643 sendNextCONTINUEB(signal, 10, ZCONTINUEB_CHECK_SPINTIME);
644 return;
645 }
646 else
647 {
648 sendNextCONTINUEB(signal, 50, ZCONTINUEB_MEASURE_CPU_USAGE);
649 sendNextCONTINUEB(signal, 10, ZCONTINUEB_CHECK_SPINTIME);
650 sendSTTORRY(signal, false);
651 }
652 return;
653 case 2:
654 {
655 m_gain_spintime_in_us = getWakeupLatency();
656 if (instance() == MAIN_THRMAN_INSTANCE)
657 {
658 g_eventLogger->info("Set wakeup latency to %u microseconds",
659 m_gain_spintime_in_us);
660 }
661 set_spin_stat(0, true);
662 sendSTTORRY(signal, true);
663 return;
664 }
665 default:
666 ndbabort();
667 }
668 }
669
670 #define NUM_WAKEUP_MEASUREMENTS 50
671 #define MAX_FAILED_WAKEUP_MEASUREMENTS 50
672 void
measure_wakeup_time(Signal * signal,Uint32 count)673 Thrman::measure_wakeup_time(Signal *signal, Uint32 count)
674 {
675 NDB_TICKS now = NdbTick_getCurrentTicks();
676 if (count != 0)
677 {
678 /* Perform measurement */
679 Uint64 nanos_wait = NdbTick_Elapsed(m_measured_wait_time, now).nanoSec();
680 DEB_SPIN(("Elapsed time was %llu nanoseconds", nanos_wait));
681 if (nanos_wait < 100000 && nanos_wait != 0)
682 {
683 /* A proper measurement */
684 m_tot_nanos_wait += nanos_wait;
685 if (count == NUM_WAKEUP_MEASUREMENTS)
686 {
687 Uint64 mean_nanos_wait = m_tot_nanos_wait / NUM_WAKEUP_MEASUREMENTS;
688 Uint64 mean_micros_wait = (mean_nanos_wait + 500) / 1000;
689 m_gain_spintime_in_us = Uint32(mean_micros_wait);
690 DEB_SPIN(("Set wakeup latency to %llu microseconds",
691 mean_micros_wait));
692 setWakeupLatency(m_gain_spintime_in_us);
693 /**
694 * We always start with no spinning and adjust to spinning when
695 * activitity is started.
696 */
697 sendSTTORRY(signal, false);
698 return;
699 }
700 count++;
701 }
702 else
703 {
704 m_failed_wakeup_measurements++;
705 if (m_failed_wakeup_measurements >= MAX_FAILED_WAKEUP_MEASUREMENTS)
706 {
707 g_eventLogger->info("Failed to measure wakeup latency, using 25 us");
708 sendSTTORRY(signal, false);
709 return;
710 }
711 }
712 do
713 {
714 for (Uint32 i = 0; i < 20; i++)
715 {
716 NdbSpin();
717 }
718 NDB_TICKS now2 = NdbTick_getCurrentTicks();
719 Uint64 micros_wait = NdbTick_Elapsed(now, now2).microSec();
720 if (micros_wait >= 50)
721 {
722 /**
723 * We wait for 50 microseconds until next attempt to ensure
724 * that the other thread has gone to sleep properly.
725 */
726 jam();
727 break;
728 }
729 } while (1);
730 }
731 else
732 {
733 /**
734 * Starting measurement, zero total to initialise and set spintime to
735 * 1000 microseconds to ensure that we don't go to sleep until we have
736 * completed these measurements that should take around a millisecond.
737 */
738 m_tot_nanos_wait = 0;
739 setSpintime(1000);
740 count++;
741 }
742 m_measured_wait_time = NdbTick_getCurrentTicks();
743 BlockReference ref = numberToRef(THRMAN,
744 MAIN_THRMAN_INSTANCE + 1, // rep thread
745 getOwnNodeId());
746 signal->theData[0] = count;
747 signal->theData[1] = reference();
748 /* Send measure signal from main thread to rep thread and back */
749 sendSignal(ref, GSN_MEASURE_WAKEUP_TIME_ORD, signal, 2, JBB);
750 return;
751 }
752
753 void
execMEASURE_WAKEUP_TIME_ORD(Signal * signal)754 Thrman::execMEASURE_WAKEUP_TIME_ORD(Signal *signal)
755 {
756 Uint32 count = signal->theData[0];
757 BlockReference ref = signal->theData[1];
758 if (instance() == MAIN_THRMAN_INSTANCE)
759 {
760 measure_wakeup_time(signal, count);
761 return;
762 }
763 else
764 {
765 /* Return signal immediately to sender */
766 sendSignal(ref, GSN_MEASURE_WAKEUP_TIME_ORD, signal, 2, JBB);
767 }
768 }
769
770 void
sendSTTORRY(Signal * signal,bool phase2_done)771 Thrman::sendSTTORRY(Signal* signal, bool phase2_done)
772 {
773 m_phase2_done = phase2_done;
774 signal->theData[0] = 0;
775 signal->theData[1] = 3;
776 signal->theData[2] = 0;
777 signal->theData[3] = 1;
778 signal->theData[4] = 2;
779 signal->theData[5] = 255; // No more start phases from missra
780 BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : THRMAN_REF;
781 sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
782 }
783
784 void
execCONTINUEB(Signal * signal)785 Thrman::execCONTINUEB(Signal *signal)
786 {
787 jamEntry();
788 Uint32 tcase = signal->theData[0];
789 switch (tcase)
790 {
791 case ZCONTINUEB_MEASURE_CPU_USAGE:
792 {
793 jam();
794 measure_cpu_usage(signal);
795 sendNextCONTINUEB(signal, 50, ZCONTINUEB_MEASURE_CPU_USAGE);
796 break;
797 }
798 case ZWAIT_ALL_STOP:
799 {
800 jam();
801 wait_all_stop(signal);
802 break;
803 }
804 case ZWAIT_ALL_START:
805 {
806 jam();
807 wait_all_start(signal);
808 break;
809 }
810 case ZCONTINUEB_CHECK_SPINTIME:
811 {
812 check_spintime(true);
813 sendNextCONTINUEB(signal, 10, ZCONTINUEB_CHECK_SPINTIME);
814 break;
815 }
816 default:
817 {
818 ndbabort();
819 }
820 }
821 }
822
823 void
sendNextCONTINUEB(Signal * signal,Uint32 delay,Uint32 type)824 Thrman::sendNextCONTINUEB(Signal *signal, Uint32 delay, Uint32 type)
825 {
826 signal->theData[0] = type;
827 sendSignalWithDelay(reference(),
828 GSN_CONTINUEB,
829 signal,
830 delay,
831 1);
832 }
833
834 void
update_current_wakeup_instance(Uint32 * thread_list,Uint32 num_threads_found,Uint32 & index,Uint32 & current_wakeup_instance)835 Thrman::update_current_wakeup_instance(Uint32 * thread_list,
836 Uint32 num_threads_found,
837 Uint32 & index,
838 Uint32 & current_wakeup_instance)
839 {
840 index++;
841 if (num_threads_found == index)
842 {
843 jam();
844 index = 0;
845 }
846 current_wakeup_instance = thread_list[index];
847 }
848
849 /**
850 * Each block thread has a thread assigned as its wakeup thread.
851 * this thread is woken up to assist with sending data whenever
852 * there is a need to quickly get things sent from the block
853 * thread. Only block threads that are almost idle can be assigned
854 * as wakeup threads.
855 */
856 void
assign_wakeup_threads(Signal * signal,Uint32 * thread_list,Uint32 num_threads_found)857 Thrman::assign_wakeup_threads(Signal *signal,
858 Uint32 *thread_list,
859 Uint32 num_threads_found)
860 {
861 Uint32 index = 0;
862 Uint32 instance_no;
863 Uint32 current_wakeup_instance = thread_list[index];
864
865 for (instance_no = 1; instance_no <= m_num_threads; instance_no++)
866 {
867 jam();
868 if (m_thread_overload_status[instance_no].overload_status ==
869 (OverloadStatus)OVERLOAD_CONST)
870 {
871 jam();
872 /* Ensure that overloaded threads don't wakeup idle threads */
873 current_wakeup_instance = 0;
874 }
875
876 /**
877 * We don't wake ourselves up, other than that we attempt to wake up
878 * the idle thread once per 200 microseconds from each thread.
879 */
880 if (instance_no == current_wakeup_instance)
881 {
882 if (num_threads_found > 1)
883 {
884 jam();
885 update_current_wakeup_instance(thread_list,
886 num_threads_found,
887 index,
888 current_wakeup_instance);
889 }
890 else
891 {
892 jam();
893 current_wakeup_instance = 0;
894 }
895 }
896 if (m_thread_overload_status[instance_no].wakeup_instance !=
897 current_wakeup_instance)
898 {
899 jam();
900 sendSET_WAKEUP_THREAD_ORD(signal,
901 instance_no,
902 current_wakeup_instance);
903 }
904 update_current_wakeup_instance(thread_list,
905 num_threads_found,
906 index,
907 current_wakeup_instance);
908 }
909 }
910
911 void
get_idle_block_threads(Uint32 * thread_list,Uint32 & num_threads_found)912 Thrman::get_idle_block_threads(Uint32 *thread_list, Uint32 & num_threads_found)
913 {
914 /**
915 * We never use more than 4 threads as idle threads. It's highly unlikely
916 * that making use of more idle threads than this for sending is going to
917 * be worthwhile. By starting the search from 1 we will always find the most
918 * common idle threads, the main thread and the rep thread which are instance
919 * 1 and 2.
920 */
921 Uint32 instance_no;
922 for (instance_no = 1; instance_no <= m_num_threads; instance_no++)
923 {
924 if (m_thread_overload_status[instance_no].overload_status ==
925 (OverloadStatus)LIGHT_LOAD_CONST)
926 {
927 thread_list[num_threads_found] = instance_no;
928 num_threads_found++;
929 if (num_threads_found == 4)
930 return;
931 }
932 }
933 }
934
935 /**
936 * Every time we decide to change the overload level we report this back to
937 * the main thread that contains the global state.
938 *
939 * This signal is only executed by main thread.
940 */
941 void
execOVERLOAD_STATUS_REP(Signal * signal)942 Thrman::execOVERLOAD_STATUS_REP(Signal *signal)
943 {
944 Uint32 thr_no = signal->theData[0];
945 Uint32 overload_status = signal->theData[1];
946 m_thread_overload_status[thr_no].overload_status = (OverloadStatus)overload_status;
947
948 Uint32 node_overload_level = 0;
949 for (Uint32 instance_no = 1; instance_no <= m_num_threads; instance_no++)
950 {
951 if (m_thread_overload_status[instance_no].overload_status >=
952 (OverloadStatus)MEDIUM_LOAD_CONST)
953 {
954 node_overload_level = 1;
955 }
956 }
957 if (node_overload_level == m_node_overload_level)
958 {
959 jam();
960 m_node_overload_level = node_overload_level;
961 signal->theData[0] = node_overload_level;
962 for (Uint32 instance_no = 1; instance_no <= m_num_threads; instance_no++)
963 {
964 BlockReference ref = numberToRef(THRMAN,
965 instance_no,
966 getOwnNodeId());
967 sendSignal(ref, GSN_NODE_OVERLOAD_STATUS_ORD, signal, 1, JBB);
968 }
969 }
970
971 Uint32 num_threads_found = 0;
972 Uint32 thread_list[4];
973 get_idle_block_threads(thread_list, num_threads_found);
974 if (num_threads_found == 0)
975 {
976 jam();
977 /**
978 * No idle threads found, so we make a list of one thread with
979 * id 0 (which here means no thread). We still need to check
980 * each thread to see if they need an update of the current
981 * wakeup instance. So this means that all threads that currently
982 * have a non-zero wakeup instance will receive an order to change
983 * their wakeup instance to 0.
984 */
985 num_threads_found = 1;
986 thread_list[0] = 0;
987 return;
988 }
989 assign_wakeup_threads(signal, thread_list, num_threads_found);
990 return;
991 }
992
993 void
execNODE_OVERLOAD_STATUS_ORD(Signal * signal)994 Thrman::execNODE_OVERLOAD_STATUS_ORD(Signal *signal)
995 {
996 jamEntry();
997 Uint32 overload_status = signal->theData[0];
998 setNodeOverloadStatus((OverloadStatus)overload_status);
999 }
1000
1001 void
execSEND_THREAD_STATUS_REP(Signal * signal)1002 Thrman::execSEND_THREAD_STATUS_REP(Signal *signal)
1003 {
1004 jamEntry();
1005 m_send_thread_percentage = signal->theData[0];
1006 return;
1007 }
1008
1009 void
execSEND_WAKEUP_THREAD_ORD(Signal * signal)1010 Thrman::execSEND_WAKEUP_THREAD_ORD(Signal *signal)
1011 {
1012 /**
1013 * This signal is sent directly from do_send in mt.cpp, it's
1014 * only purpose is to send a wakeup signal to another thread
1015 * to ensure that this thread is awake to execute some
1016 * send assistance to the send thread.
1017 */
1018 Uint32 wakeup_instance = signal->theData[0];
1019 BlockReference ref = numberToRef(THRMAN,
1020 wakeup_instance,
1021 getOwnNodeId());
1022 sendSignal(ref, GSN_WAKEUP_THREAD_ORD, signal, 1, JBA);
1023 }
1024
1025 void
execWAKEUP_THREAD_ORD(Signal * signal)1026 Thrman::execWAKEUP_THREAD_ORD(Signal *signal)
1027 {
1028 /**
1029 * This signal is sent to wake the thread up. We're using the send signal
1030 * semantics to wake the thread up. So no need to execute anything, the
1031 * purpose of waking the thread has already been achieved when getting here.
1032 */
1033 return;
1034 }
1035 void
execSET_WAKEUP_THREAD_ORD(Signal * signal)1036 Thrman::execSET_WAKEUP_THREAD_ORD(Signal *signal)
1037 {
1038 Uint32 wakeup_instance = signal->theData[0];
1039 setWakeupThread(wakeup_instance);
1040 }
1041
1042 void
sendSET_WAKEUP_THREAD_ORD(Signal * signal,Uint32 instance_no,Uint32 wakeup_instance)1043 Thrman::sendSET_WAKEUP_THREAD_ORD(Signal *signal,
1044 Uint32 instance_no,
1045 Uint32 wakeup_instance)
1046 {
1047 signal->theData[0] = wakeup_instance;
1048 BlockReference ref = numberToRef(THRMAN,
1049 instance_no,
1050 getOwnNodeId());
1051 sendSignal(ref, GSN_SET_WAKEUP_THREAD_ORD, signal, 1, JBB);
1052 }
1053
1054 void
set_spin_stat(Uint32 spin_time,bool local_call)1055 Thrman::set_spin_stat(Uint32 spin_time, bool local_call)
1056 {
1057 ndbrequire(spin_time <= MAX_SPIN_TIME);
1058 struct ndb_spin_stat spin_stat;
1059 Uint32 used_spin_time = spin_time;
1060 setSpintime(spin_time);
1061 if (!local_call)
1062 {
1063 jam();
1064 return;
1065 }
1066 if (spin_time == 0)
1067 {
1068 /**
1069 * We set spin time to 0, but we use the measure spin time
1070 * in our measurements. This ensures that we quickly get on
1071 * track again with statistics when spin is enabled again.
1072 * We would not arrive here if configured spin time was 0 as
1073 * well.
1074 */
1075 used_spin_time = MEASURE_SPIN_TIME;
1076 }
1077 /**
1078 * We measure in steps of 50%, this gives us the possibility to
1079 * efficiently measure stepping up or stepping down spinning in
1080 * steps of 50% at a time.
1081 */
1082 Uint32 midpoint = (NUM_SPIN_INTERVALS / 2) - 1;
1083 for (Uint32 i = 0; i < NUM_SPIN_INTERVALS; i++)
1084 {
1085 Uint64 spin_time_limit = used_spin_time;
1086 if (i == (NUM_SPIN_INTERVALS - 1))
1087 {
1088 spin_time_limit = UINT32_MAX;
1089 }
1090 else if (i < midpoint)
1091 {
1092 Uint64 mult_factor = 2;
1093 Uint64 div_factor = 3;
1094 for (Uint32 j = i + 1; j < midpoint; j++)
1095 {
1096 mult_factor *= 2;
1097 div_factor *= 3;
1098 }
1099 spin_time_limit = (mult_factor * used_spin_time) / div_factor;
1100 }
1101 else if (i > midpoint)
1102 {
1103 Uint64 mult_factor = 3;
1104 Uint64 div_factor = 2;
1105 for (Uint32 j = midpoint + 1; j < i; j++)
1106 {
1107 mult_factor *= 3;
1108 div_factor *= 2;
1109 }
1110 spin_time_limit = (mult_factor * used_spin_time) / div_factor;
1111 }
1112 else
1113 {
1114 ndbrequire(i == midpoint);
1115 }
1116 spin_stat.m_spin_interval[i] = Uint32(spin_time_limit);
1117 }
1118 mt_set_spin_stat(this, &spin_stat);
1119 }
1120
calc_new_spin(ndb_spin_stat * spin_stat)1121 Uint32 Thrman::calc_new_spin(ndb_spin_stat *spin_stat)
1122 {
1123 #ifdef DEBUG_SPIN
1124 Uint64 calc_spin_cost[NUM_SPIN_INTERVALS - 1];
1125 Uint64 calc_spin_overhead[NUM_SPIN_INTERVALS - 1];
1126 memset(calc_spin_cost, 0, sizeof(calc_spin_cost));
1127 memset(calc_spin_overhead, 0, sizeof(calc_spin_overhead));
1128 #endif
1129 Uint32 num_events = spin_stat->m_num_waits;
1130 Uint32 remaining_events = num_events;
1131
1132 Uint32 found = 0;
1133 Uint64 min_overhead = UINT64_MAX;
1134 for (Uint32 i = 0; i < (NUM_SPIN_INTERVALS - 1); i++)
1135 {
1136 Uint32 events_in_this_slot = spin_stat->m_micros_sleep_times[i];
1137 if (events_in_this_slot == 0 ||
1138 spin_stat->m_spin_interval[i] == 0 ||
1139 spin_stat->m_spin_interval[i] > m_configured_spintime)
1140 {
1141 /**
1142 * Ignore empty slots, they will not be choosen for sure.
1143 * Also ignore slots where we measure 0 spin time.
1144 * Also ignore slots with higher spintime than what is
1145 * configured as maximum spintime.
1146 */
1147 continue;
1148 }
1149 /**
1150 * Calculate each slot as if it will become new spintime.
1151 */
1152 remaining_events -= events_in_this_slot;
1153 Uint32 num_gained_spins = num_events - remaining_events;
1154
1155 Uint32 this_spin_cost = spin_stat->m_spin_interval[i];
1156 Uint64 gained_time_in_us = Uint64(num_gained_spins) *
1157 Uint64(m_gain_spintime_in_us);
1158
1159 Uint64 spin_cost = 0;
1160 Uint32 avg_spin_cost = spin_stat->m_spin_interval[0] / 2;
1161 spin_cost += Uint64(avg_spin_cost * spin_stat->m_micros_sleep_times[0]);
1162 for (Uint32 j = 1; j <= i; j++)
1163 {
1164 Uint32 diff_time = spin_stat->m_spin_interval[j] -
1165 spin_stat->m_spin_interval[j - 1];
1166 diff_time /= 2;
1167 avg_spin_cost = diff_time + spin_stat->m_spin_interval[j - 1];
1168 spin_cost += Uint64(avg_spin_cost * spin_stat->m_micros_sleep_times[j]);
1169 }
1170
1171 spin_cost += Uint64(this_spin_cost * remaining_events);
1172 ndbrequire(gained_time_in_us);
1173 Uint64 spin_overhead = Uint64(1000) * spin_cost / gained_time_in_us;
1174 spin_overhead += 5;
1175 spin_overhead /= 10;
1176
1177 if (spin_overhead <= min_overhead ||
1178 spin_overhead < Uint64(100) ||
1179 (spin_overhead < Uint64(130) &&
1180 events_in_this_slot > 1))
1181 {
1182 /**
1183 * This was the lowest overhead so far. Will be picked unless overhead
1184 * is too high. Will always be picked for i == 0.
1185 *
1186 * If there is a sufficient amount of events in this slot and we keep
1187 * the cost below 130, we will always pick this one.
1188 */
1189 min_overhead = spin_overhead;
1190 found = i;
1191 }
1192 else if (spin_overhead < Uint64(m_allowed_spin_overhead) &&
1193 events_in_this_slot > 1)
1194 {
1195 /**
1196 * This wasn't the lowest overhead so far. We will evaluate the
1197 * conditional probability of it paying off to continue from here since
1198 * we are still in the allowed range for allowed spin overhead.
1199 * Conditioned on the fact that we have to wait for at least the
1200 * already waited overhead.
1201 *
1202 * This means that we calculate the time estimated to continue spinning
1203 * before an event occurs based on that we know that we spent already
1204 * this time spinning (m_spin_interval[i - 1]). We know that i >= 1 here.
1205 *
1206 * The extra gain we get by continuing to spin until m_spin_interval[i] is
1207 * sum of gains from found + 1 to i. The extra cost is the added extra
1208 * spin time imposed on all remaining_events. The added cost is
1209 * m_spin_interval[i] - m_spin_interval[found].
1210 *
1211 * We will ignore this check if there is only a single event in this
1212 * slot. This represents a too high risk of spinning for too long if the
1213 * circumstances changes only slightly.
1214 */
1215 ndbrequire(i > 0);
1216 Uint64 extra_gain = 0;
1217 Uint64 extra_cost = 0;
1218 for (Uint32 j = found + 1; j <= i; j++)
1219 {
1220 Uint64 events_in_slot = Uint64(spin_stat->m_micros_sleep_times[j]);
1221 extra_gain += events_in_slot;
1222 Uint32 diff_time = spin_stat->m_spin_interval[j] -
1223 spin_stat->m_spin_interval[j - 1];
1224 diff_time /= 2;
1225 Uint64 avg_spin_cost = Uint64(diff_time) +
1226 Uint64(spin_stat->m_spin_interval[j - 1] -
1227 spin_stat->m_spin_interval[found]);
1228 extra_cost += Uint64(avg_spin_cost *
1229 spin_stat->m_micros_sleep_times[j]);
1230 }
1231 extra_gain *= Uint64(m_gain_spintime_in_us);
1232 extra_gain *= Uint64(m_allowed_spin_overhead);
1233 extra_gain /= Uint64(100);
1234 extra_cost += Uint64(remaining_events) *
1235 Uint64(this_spin_cost -
1236 spin_stat->m_spin_interval[found]);
1237 if (extra_gain > extra_cost)
1238 {
1239 found = i;
1240 min_overhead = spin_overhead;
1241 }
1242 }
1243 #ifdef DEBUG_SPIN
1244 calc_spin_cost[i] = spin_cost;
1245 calc_spin_overhead[i] = spin_overhead;
1246 #endif
1247 }
1248 /**
1249 * When we are already spinning, we allow for a bit more overhead to avoid
1250 * jumping in and out of spinning too often. We need at least 4 observations
1251 * to make any judgement, only 2 events in 10ms doesn't seem to imply any
1252 * need of spinning.
1253 */
1254 #define EXTRA_OVERHEAD_ALLOWED_WHEN_ALREADY_SPINNING 20
1255 #define MIN_EVENTS_TO_BE_NOT_IDLE 20
1256
1257 Uint32 midpoint = (NUM_SPIN_INTERVALS / 2) - 1;
1258 if (num_events <= 3 ||
1259 (min_overhead > Uint64(m_allowed_spin_overhead) &&
1260 (m_current_spintime == 0 ||
1261 min_overhead >
1262 Uint64(m_allowed_spin_overhead +
1263 EXTRA_OVERHEAD_ALLOWED_WHEN_ALREADY_SPINNING))))
1264 {
1265 /* Quickly shut down spin environment when no longer beneficial. */
1266 if (m_current_spintime != 0)
1267 {
1268 DEB_SPIN(("(%u)New spintime = 0", instance()));
1269 }
1270 m_current_spintime = 0;
1271 }
1272 else if (m_current_spintime == 0 ||
1273 m_current_spintime !=
1274 spin_stat->m_spin_interval[midpoint])
1275 {
1276 /**
1277 * Immediately adjust to new spin environment when activity starts up
1278 * from a more idle state. We also arrive here the next timeout
1279 * after a quick activation of spintime. In this case we have set
1280 * the spintime, but still haven't changed the spin intervals, so
1281 * set it directly to the found spintime.
1282 */
1283 m_current_spintime = spin_stat->m_spin_interval[found];
1284 DEB_SPIN(("(%u)New spintime = %u", instance(), m_current_spintime));
1285 }
1286 else
1287 {
1288 /**
1289 * When we are already spinning AND we want to continue spinning,
1290 * adjust change to not change the spin behaviour too fast. In this
1291 * case we are likely to be a in a more stable environment, so no
1292 * need of the very fast adaption to the environment.
1293 */
1294 if (found < midpoint)
1295 {
1296 m_current_spintime = spin_stat->m_spin_interval[midpoint - 1];
1297 }
1298 else if (found > midpoint)
1299 {
1300 m_current_spintime = spin_stat->m_spin_interval[midpoint + 1];
1301 }
1302 DEB_SPIN(("(%u)2:New spintime = %u", instance(), m_current_spintime));
1303 }
1304 if (num_events > MIN_EVENTS_TO_BE_NOT_IDLE)
1305 {
1306 jam();
1307 m_is_idle = false;
1308 }
1309 else
1310 {
1311 jam();
1312 m_is_idle = true;
1313 }
1314 /* Never select a spintime less than 2 microseconds. */
1315 if (m_current_spintime != 0 && m_current_spintime < 2)
1316 {
1317 m_current_spintime = 2;
1318 }
1319 /**
1320 * Never go beyond the configured spin time. The adaptive part can only
1321 * decrease the spinning, not increase it.
1322 */
1323 Uint32 max_spintime = m_configured_spintime;
1324 if (m_current_cpu_usage > 90)
1325 {
1326 jam();
1327 max_spintime /= 4; // 25%
1328 }
1329 else if (m_current_cpu_usage > 80)
1330 {
1331 jam();
1332 max_spintime /= 3; // 33%
1333 }
1334 else if (m_current_cpu_usage > 70)
1335 {
1336 jam();
1337 max_spintime *= 45;
1338 max_spintime /= 100; // 45%
1339 }
1340 else if (m_current_cpu_usage > 60)
1341 {
1342 jam();
1343 max_spintime *= 60;
1344 max_spintime /= 100; // 60%
1345 }
1346 else if (m_current_cpu_usage > 50)
1347 {
1348 jam();
1349 max_spintime *= 75;
1350 max_spintime /= 100; // 75%
1351 }
1352 else if (m_current_cpu_usage > 40)
1353 {
1354 jam();
1355 max_spintime *= 90;
1356 max_spintime /= 100; // 90%
1357 }
1358
1359 if (m_current_spintime > max_spintime)
1360 {
1361 m_current_spintime = max_spintime;
1362 }
1363 if (num_events >= 3)
1364 {
1365 DEB_SPIN(("(%u)SPIN events: %u, spintime selected: %u "
1366 ":ovh[0]=%llu,cost[0]=%llu"
1367 ":ovh[1]=%llu,cost[1]=%llu"
1368 ":ovh[2]=%llu,cost[2]=%llu"
1369 ":ovh[3]=%llu,cost[3]=%llu"
1370 ":ovh[4]=%llu,cost[4]=%llu"
1371 ":ovh[5]=%llu,cost[5]=%llu"
1372 ":ovh[6]=%llu,cost[6]=%llu"
1373 ":ovh[7]=%llu,cost[7]=%llu"
1374 ":ovh[8]=%llu,cost[8]=%llu"
1375 ":ovh[9]=%llu,cost[9]=%llu"
1376 ":ovh[10]=%llu,cost[10]=%llu"
1377 ":ovh[11]=%llu,cost[11]=%llu"
1378 ":ovh[12]=%llu,cost[12]=%llu"
1379 ":ovh[13]=%llu,cost[13]=%llu"
1380 ":ovh[14]=%llu,cost[14]=%llu",
1381 instance(),
1382 num_events,
1383 m_current_spintime,
1384 calc_spin_overhead[0],
1385 calc_spin_cost[0],
1386 calc_spin_overhead[1],
1387 calc_spin_cost[1],
1388 calc_spin_overhead[2],
1389 calc_spin_cost[2],
1390 calc_spin_overhead[3],
1391 calc_spin_cost[3],
1392 calc_spin_overhead[4],
1393 calc_spin_cost[4],
1394 calc_spin_overhead[5],
1395 calc_spin_cost[5],
1396 calc_spin_overhead[6],
1397 calc_spin_cost[6],
1398 calc_spin_overhead[7],
1399 calc_spin_cost[7],
1400 calc_spin_overhead[8],
1401 calc_spin_cost[8],
1402 calc_spin_overhead[9],
1403 calc_spin_cost[9],
1404 calc_spin_overhead[10],
1405 calc_spin_cost[10],
1406 calc_spin_overhead[11],
1407 calc_spin_cost[11],
1408 calc_spin_overhead[12],
1409 calc_spin_cost[12],
1410 calc_spin_overhead[13],
1411 calc_spin_cost[13],
1412 calc_spin_overhead[14],
1413 calc_spin_cost[14]));
1414 }
1415 return m_current_spintime;
1416 }
1417
1418 void
check_spintime(bool local_call)1419 Thrman::check_spintime(bool local_call)
1420 {
1421 if (!m_phase2_done)
1422 {
1423 jam();
1424 return;
1425 }
1426 if (!local_call && !m_is_idle)
1427 {
1428 jam();
1429 return;
1430 }
1431 if (!m_enable_adaptive_spinning)
1432 {
1433 jam();
1434 return;
1435 }
1436 if (m_configured_spintime == 0)
1437 {
1438 /* No configured spinning on the thread, so ignore check of spin time. */
1439 jam();
1440 return;
1441 }
1442 struct ndb_spin_stat spin_stat;
1443 mt_get_spin_stat(this, &spin_stat);
1444
1445 if (spin_stat.m_num_waits >= 3)
1446 {
1447 DEB_SPIN(("(%u)m_sleep_longer_spin_time: %u, "
1448 "m_sleep_shorter_spin_time: %u"
1449 ", local_call: %s",
1450 instance(),
1451 spin_stat.m_sleep_longer_spin_time,
1452 spin_stat.m_sleep_shorter_spin_time,
1453 local_call ? "true" : "false"));
1454 }
1455
1456 if (m_shared_environment)
1457 {
1458 /**
1459 * We never spin in a shared environment, this would cause even more
1460 * overload on the CPUs to happen.
1461 */
1462 set_spin_stat(0, local_call);
1463 return;
1464 }
1465 if (spin_stat.m_num_waits >= 3)
1466 {
1467 DEB_SPIN(("(%u): <= %u: %u, <= %u: %u, <= %u: %u, <= %u: %u,"
1468 " <= %u: %u, <= %u: %u, <= %u: %u, <= %u: %u"
1469 " <= %u: %u, <= %u: %u, <= %u: %u, <= %u: %u"
1470 " <= %u: %u, <= %u: %u, <= %u: %u, <= MAX: %u",
1471 instance(),
1472 spin_stat.m_spin_interval[0],
1473 spin_stat.m_micros_sleep_times[0],
1474 spin_stat.m_spin_interval[1],
1475 spin_stat.m_micros_sleep_times[1],
1476 spin_stat.m_spin_interval[2],
1477 spin_stat.m_micros_sleep_times[2],
1478 spin_stat.m_spin_interval[3],
1479 spin_stat.m_micros_sleep_times[3],
1480 spin_stat.m_spin_interval[4],
1481 spin_stat.m_micros_sleep_times[4],
1482 spin_stat.m_spin_interval[5],
1483 spin_stat.m_micros_sleep_times[5],
1484 spin_stat.m_spin_interval[6],
1485 spin_stat.m_micros_sleep_times[6],
1486 spin_stat.m_spin_interval[7],
1487 spin_stat.m_micros_sleep_times[7],
1488 spin_stat.m_spin_interval[8],
1489 spin_stat.m_micros_sleep_times[8],
1490 spin_stat.m_spin_interval[9],
1491 spin_stat.m_micros_sleep_times[9],
1492 spin_stat.m_spin_interval[10],
1493 spin_stat.m_micros_sleep_times[10],
1494 spin_stat.m_spin_interval[11],
1495 spin_stat.m_micros_sleep_times[11],
1496 spin_stat.m_spin_interval[12],
1497 spin_stat.m_micros_sleep_times[12],
1498 spin_stat.m_spin_interval[13],
1499 spin_stat.m_micros_sleep_times[13],
1500 spin_stat.m_spin_interval[14],
1501 spin_stat.m_micros_sleep_times[14],
1502 spin_stat.m_micros_sleep_times[15]));
1503 }
1504 Uint32 spin_time = calc_new_spin(&spin_stat);
1505 set_spin_stat(spin_time, local_call);
1506 return;
1507 }
1508
1509 /**
1510 * We call this function every 50 milliseconds.
1511 *
1512 * Load Information Gathering in THRMAN
1513 * ------------------------------------
1514 * We gather information from the operating system on how user time and
1515 * system time the thread has spent. We also get information from the
1516 * scheduler about how much time the thread has spent in sleep mode,
1517 * how much time spent sending and how much time spent doing the work
1518 * the thread is assigned (for most block threads this is executing
1519 * signals, for receive threads it is receiving and for send threads
1520 * it is sending.
1521 *
1522 * ndbinfo tables based on this gathered information
1523 * -------------------------------------------------
1524 * We collect this data such that we can report the last 1 second
1525 * information about status per 50 milliseconds.
1526 * We also collect information about reports for 20 seconds with
1527 * 1 second per collection point.
1528 * We also collect information about reports for 400 seconds with
1529 * 20 second per collection point.
1530 *
1531 * This data is reported in 3 different ndbinfo tables where each
1532 * thread reports its own data. Thus twenty rows per thread per node
1533 * in each of those tables. These tables represent similar information
1534 * as we can get from top, but here it is reported per ndbmtd
1535 * block thread and also ndbmtd send thread. Currently we don't
1536 * cover NDBFS threads and transporter connection threads.
1537 *
1538 * We also have a smaller table that reports one row per thread per
1539 * node and this row represents the load information for the last
1540 * second.
1541 *
1542 * Use of the data for adaptive load regulation of LCPs
1543 * ----------------------------------------------------
1544 * This data is also used in adaptive load regulation algorithms in
1545 * MySQL Cluster data nodes. The intention is to increase this usage
1546 * with time. The first use case was in 7.4 for adaptive speed of
1547 * LCPs.
1548 *
1549 * Use of data for adaptive send assistance of block threads
1550 * ---------------------------------------------------------
1551 * The next use case is to control the sending from various threads.
1552 * Using send threads we are able to send from any block thread, the
1553 * receive threads and finally also the send threads.
1554 *
1555 * We want to know the load status of each thread to decide how active
1556 * each thread should be in assisting send threads in sending. The send
1557 * threads can always send at highest speed.
1558 *
1559 * Description of overload states
1560 * ------------------------------
1561 * The idea is that when a thread reaches a level where it needs more
1562 * than 75% of the time to execute then it should offload all send
1563 * activities to all other threads. However even before we reach this
1564 * critical level we should adjust our assistance to send threads.
1565 *
1566 * As with any adaptive algorithm it is good to have a certain level of
1567 * hysteresis in the changes. So we should not adjust the levels too
1568 * fast. One reason for this is that as we increase our level of send
1569 * assistance we will obviously become more loaded, we want to keep
1570 * this extra load on a level such that the block thread still can
1571 * deliver reponses to its main activities within reasonable limits.
1572 *
1573 * So we will have at least 3 different levels of load for a thread.
1574 * STATE: Overload
1575 * ---------------
1576 * It can be overloaded when it has passed 75% usage for normal thread
1577 * activity without send activities.
1578 *
1579 * STATE: Medium
1580 * -------------
1581 * It can be at medium load when it has reached 30% normal thread activity.
1582 * In this case we should still handle a bit of send assistance, but also
1583 * offload a part to the send threads.
1584 *
1585 * STATE: Light
1586 * ------------
1587 * The final level is light load where we are below 30% time spent for normal
1588 * thread activities. In this case we will for the most part handle our own
1589 * sending and also assist others in sending.
1590 *
1591 * A more detailed description of the send algorithms and how they interact
1592 * is found in mt.cpp around the method do_send.
1593 *
1594 * Global node state for send assistance
1595 * -------------------------------------
1596 * One more thing is that we also need global information about the node
1597 * state. This is provided by the THRMAN with instance number 1 which is
1598 * non-proxy block executing in the main thread. The scheduler needs to
1599 * know if any thread is currently in overload mode. If one thread is
1600 * is in overload mode we should change the sleep interval in all threads.
1601 * So when there are overloaded threads in the node then we should ensure
1602 * that all threads wakeup more often to assist in sending. So we change
1603 * the sleep interval for all threads to 1 milliseconds when we are in
1604 * this state.
1605 *
1606 * The information gathered in instance 1 about send threads is reported to
1607 * all threads to ensure that all threads can use the mean percentage of
1608 * usage for send threads in the algorithm to decide when to change overload
1609 * level. The aim is that overload is defined as 85% instead of 75% when
1610 * send threads are at more than 75% load level.
1611 *
1612 * THRMAN with instance number has one more responsibility, this is to
1613 * gather the statistics from the send threads.
1614 *
1615 * So each thread is responsible to gather information and decide which
1616 * level of overload it currently is at. It will however report to
1617 * THRMAN instance 1 about any decision to change of its overload state.
1618 * So this THRMAN instance have the global node state and have a bit
1619 * more information about the global state. Based on this information
1620 * it could potentially make decisions to change the overload state
1621 * for a certain thread.
1622 *
1623 * Reasons for global node state for send assistance
1624 * -------------------------------------------------
1625 * One reason to change the state is if we are in a state where we are in
1626 * a global overload state, this means that the local decisions are not
1627 * sufficient since the send threads are not capable to keep up with the
1628 * load even with the assistance they get.
1629 *
1630 * The algorithms in THRMAN are to a great extent designed to protect the
1631 * LDM threads from overload, but at the same time it is possible that
1632 * the thread configuration is setup such that we have either constant or
1633 * temporary overload on other threads. Even more in a cloud environment
1634 * we could easily be affected by other activities in other cloud apps
1635 * and we thus need to have a bit of flexibility in moving load to other
1636 * threads currently not so overloaded and thus ensure that we make best
1637 * use of all CPU resources in the machine assigned to us.
1638 *
1639 * Potential future usage of this load information
1640 * -----------------------------------------------
1641 * We can provide load control to ensure that the cluster continues to
1642 * deliver the basic services and in this case we might decrease certain
1643 * types of query types. We could introduce different priority levels for
1644 * queries and use those to decide which transactions that are allowed to
1645 * continue in an overloaded state.
1646 *
1647 * The best place to stop any activities is when a transaction starts, so
1648 * either at normal transaction start in DBTC or DBSPJ or in schema
1649 * transaction start in DBDICT. Refusing to start a transaction has no
1650 * impact on already performed work, so this is the best manner to ensure
1651 * that we don't get into feedback problems where we have to redo the
1652 * work more than once which is likely to make the overload situation even
1653 * more severe.
1654 *
1655 * Another future development is that threads provide receive thread
1656 * assistance in the same manner so as to protect the receive threads
1657 * from overload. This will however require us to ensure that we don't
1658 * create signalling order issues since signals will be routed different
1659 * ways dependent on which block thread performs the receive operation.
1660 */
1661 void
measure_cpu_usage(Signal * signal)1662 Thrman::measure_cpu_usage(Signal *signal)
1663 {
1664 struct ndb_rusage curr_rusage;
1665
1666 /**
1667 * Start by making a new CPU usage measurement. After that we will
1668 * measure how much time has passed since last measurement and from
1669 * this we can calculate a percentage of CPU usage that this thread
1670 * has had for the last second or so.
1671 */
1672
1673 MeasurementRecordPtr measurePtr;
1674 MeasurementRecordPtr measure_1sec_Ptr;
1675 MeasurementRecordPtr measure_20sec_Ptr;
1676
1677 NDB_TICKS curr_time = NdbTick_getCurrentTicks();
1678 Uint64 elapsed_50ms = NdbTick_Elapsed(prev_50ms_tick, curr_time).microSec();
1679 Uint64 elapsed_1sec = NdbTick_Elapsed(prev_1sec_tick, curr_time).microSec();
1680 Uint64 elapsed_20sec = NdbTick_Elapsed(prev_20sec_tick, curr_time).microSec();
1681 MeasurementRecord loc_measure;
1682
1683 /* Get performance timers from scheduler. */
1684 getPerformanceTimers(loc_measure.m_sleep_time_thread,
1685 loc_measure.m_spin_time_thread,
1686 loc_measure.m_buffer_full_time_thread,
1687 loc_measure.m_send_time_thread);
1688
1689 bool check_1sec = false;
1690 bool check_20sec = false;
1691
1692 int res = Ndb_GetRUsage(&curr_rusage, false);
1693 if (res != 0)
1694 {
1695 jam();
1696 #ifdef DEBUG_CPU_USAGE
1697 g_eventLogger->info("instance: %u failed Ndb_GetRUsage, res: %d",
1698 instance(),
1699 -res);
1700 #endif
1701 memset(&curr_rusage, 0, sizeof(curr_rusage));
1702 }
1703 {
1704 jam();
1705 c_next_50ms_measure.first(measurePtr);
1706 calculate_measurement(measurePtr,
1707 &curr_rusage,
1708 &m_last_50ms_rusage,
1709 &loc_measure,
1710 &m_last_50ms_base_measure,
1711 elapsed_50ms);
1712 Uint64 exec_time = measurePtr.p->m_exec_time_thread -
1713 measurePtr.p->m_spin_time_thread;
1714 Uint64 elapsed_time = measurePtr.p->m_elapsed_time;
1715 if (elapsed_time > 0)
1716 {
1717 Uint64 exec_perc = exec_time * 1000 + 500;
1718 exec_perc /= (10 * elapsed_time);
1719 if (exec_perc <= 100)
1720 {
1721 jam();
1722 m_current_cpu_usage = Uint32(exec_perc);
1723 }
1724 else
1725 {
1726 jam();
1727 m_current_cpu_usage = 0;
1728 }
1729 }
1730 else
1731 {
1732 jam();
1733 m_current_cpu_usage = 0;
1734 }
1735 if (m_current_cpu_usage >= 40)
1736 {
1737 DEB_SPIN(("(%u)Current CPU usage is %u percent",
1738 instance(),
1739 m_current_cpu_usage));
1740 }
1741 c_next_50ms_measure.remove(measurePtr);
1742 c_next_50ms_measure.addLast(measurePtr);
1743 prev_50ms_tick = curr_time;
1744 }
1745 if (elapsed_1sec > Uint64(1000 * 1000))
1746 {
1747 jam();
1748 check_1sec = true;
1749 c_next_1sec_measure.first(measurePtr);
1750 calculate_measurement(measurePtr,
1751 &curr_rusage,
1752 &m_last_1sec_rusage,
1753 &loc_measure,
1754 &m_last_1sec_base_measure,
1755 elapsed_1sec);
1756 c_next_1sec_measure.remove(measurePtr);
1757 c_next_1sec_measure.addLast(measurePtr);
1758 prev_1sec_tick = curr_time;
1759 }
1760 if (elapsed_20sec > Uint64(20 * 1000 * 1000))
1761 {
1762 jam();
1763 check_20sec = true;
1764 c_next_20sec_measure.first(measurePtr);
1765 calculate_measurement(measurePtr,
1766 &curr_rusage,
1767 &m_last_20sec_rusage,
1768 &loc_measure,
1769 &m_last_20sec_base_measure,
1770 elapsed_20sec);
1771 c_next_20sec_measure.remove(measurePtr);
1772 c_next_20sec_measure.addLast(measurePtr);
1773 prev_20sec_tick = curr_time;
1774 }
1775 if (instance() == MAIN_THRMAN_INSTANCE)
1776 {
1777 jam();
1778 for (Uint32 send_instance = 0;
1779 send_instance < m_num_send_threads;
1780 send_instance++)
1781 {
1782 jam();
1783 SendThreadPtr sendThreadPtr;
1784 SendThreadMeasurementPtr sendThreadMeasurementPtr;
1785 SendThreadMeasurement curr_send_thread_measure;
1786
1787 getSendPerformanceTimers(send_instance,
1788 curr_send_thread_measure.m_exec_time,
1789 curr_send_thread_measure.m_sleep_time,
1790 curr_send_thread_measure.m_spin_time,
1791 curr_send_thread_measure.m_user_time_os,
1792 curr_send_thread_measure.m_kernel_time_os,
1793 curr_send_thread_measure.m_elapsed_time_os);
1794
1795 c_sendThreadRecordPool.getPtr(sendThreadPtr, send_instance);
1796 {
1797 jam();
1798 Local_SendThreadMeasurement_fifo list_50ms(c_sendThreadMeasurementPool,
1799 sendThreadPtr.p->m_send_thread_50ms_measurements);
1800 list_50ms.first(sendThreadMeasurementPtr);
1801 calculate_send_measurement(sendThreadMeasurementPtr,
1802 &curr_send_thread_measure,
1803 &sendThreadPtr.p->m_last_50ms_send_thread_measure,
1804 elapsed_50ms,
1805 send_instance);
1806 list_50ms.remove(sendThreadMeasurementPtr);
1807 list_50ms.addLast(sendThreadMeasurementPtr);
1808 }
1809 if (elapsed_1sec > Uint64(1000 * 1000))
1810 {
1811 jam();
1812 Local_SendThreadMeasurement_fifo list_1sec(c_sendThreadMeasurementPool,
1813 sendThreadPtr.p->m_send_thread_1sec_measurements);
1814 list_1sec.first(sendThreadMeasurementPtr);
1815 calculate_send_measurement(sendThreadMeasurementPtr,
1816 &curr_send_thread_measure,
1817 &sendThreadPtr.p->m_last_1sec_send_thread_measure,
1818 elapsed_1sec,
1819 send_instance);
1820 list_1sec.remove(sendThreadMeasurementPtr);
1821 list_1sec.addLast(sendThreadMeasurementPtr);
1822 }
1823 if (elapsed_20sec > Uint64(20 * 1000 * 1000))
1824 {
1825 jam();
1826 Local_SendThreadMeasurement_fifo list_20sec(c_sendThreadMeasurementPool,
1827 sendThreadPtr.p->m_send_thread_20sec_measurements);
1828 list_20sec.first(sendThreadMeasurementPtr);
1829 calculate_send_measurement(sendThreadMeasurementPtr,
1830 &curr_send_thread_measure,
1831 &sendThreadPtr.p->m_last_20sec_send_thread_measure,
1832 elapsed_20sec,
1833 send_instance);
1834 list_20sec.remove(sendThreadMeasurementPtr);
1835 list_20sec.addLast(sendThreadMeasurementPtr);
1836 }
1837 }
1838 if (check_1sec)
1839 {
1840 Uint32 send_thread_percentage =
1841 calculate_mean_send_thread_load();
1842 sendSEND_THREAD_STATUS_REP(signal, send_thread_percentage);
1843 }
1844 }
1845 check_overload_status(signal, check_1sec, check_20sec);
1846 }
1847
1848 void
calculate_measurement(MeasurementRecordPtr measurePtr,struct ndb_rusage * curr_rusage,struct ndb_rusage * base_rusage,MeasurementRecord * curr_measure,MeasurementRecord * base_measure,Uint64 elapsed_micros)1849 Thrman::calculate_measurement(MeasurementRecordPtr measurePtr,
1850 struct ndb_rusage *curr_rusage,
1851 struct ndb_rusage *base_rusage,
1852 MeasurementRecord *curr_measure,
1853 MeasurementRecord *base_measure,
1854 Uint64 elapsed_micros)
1855 {
1856 Uint64 user_micros;
1857 Uint64 kernel_micros;
1858 Uint64 total_micros;
1859
1860
1861 measurePtr.p->m_first_measure_done = true;
1862
1863 measurePtr.p->m_send_time_thread = curr_measure->m_send_time_thread -
1864 base_measure->m_send_time_thread;
1865
1866 measurePtr.p->m_sleep_time_thread = curr_measure->m_sleep_time_thread -
1867 base_measure->m_sleep_time_thread;
1868
1869 measurePtr.p->m_spin_time_thread = curr_measure->m_spin_time_thread -
1870 base_measure->m_spin_time_thread;
1871
1872
1873 measurePtr.p->m_buffer_full_time_thread =
1874 curr_measure->m_buffer_full_time_thread -
1875 base_measure->m_buffer_full_time_thread;
1876
1877 measurePtr.p->m_exec_time_thread =
1878 elapsed_micros - measurePtr.p->m_sleep_time_thread;
1879
1880 measurePtr.p->m_elapsed_time = elapsed_micros;
1881
1882 if ((curr_rusage->ru_utime == 0 &&
1883 curr_rusage->ru_stime == 0) ||
1884 (base_rusage->ru_utime == 0 &&
1885 base_rusage->ru_stime == 0))
1886 {
1887 jam();
1888 measurePtr.p->m_user_time_os = 0;
1889 measurePtr.p->m_kernel_time_os = 0;
1890 measurePtr.p->m_idle_time_os = 0;
1891 }
1892 else
1893 {
1894 jam();
1895 user_micros = curr_rusage->ru_utime - base_rusage->ru_utime;
1896 kernel_micros = curr_rusage->ru_stime - base_rusage->ru_stime;
1897 total_micros = user_micros + kernel_micros;
1898
1899 measurePtr.p->m_user_time_os = user_micros;
1900 measurePtr.p->m_kernel_time_os = kernel_micros;
1901 if (elapsed_micros >= total_micros)
1902 {
1903 jam();
1904 measurePtr.p->m_idle_time_os = elapsed_micros - total_micros;
1905 }
1906 else
1907 {
1908 jam();
1909 measurePtr.p->m_idle_time_os = 0;
1910 }
1911 }
1912
1913 #ifdef DEBUG_CPU_USAGE
1914 #ifndef HIGH_DEBUG_CPU_USAGE
1915 if (elapsed_micros > Uint64(1000 * 1000))
1916 #endif
1917 g_eventLogger->info("(%u)name: %s, ut_os: %u, kt_os: %u,"
1918 " idle_os: %u"
1919 ", elapsed_time: %u, exec_time: %u,"
1920 " sleep_time: %u, spin_time: %u, send_time: %u",
1921 instance(),
1922 m_thread_name,
1923 Uint32(measurePtr.p->m_user_time_os),
1924 Uint32(measurePtr.p->m_kernel_time_os),
1925 Uint32(measurePtr.p->m_idle_time_os),
1926 Uint32(measurePtr.p->m_elapsed_time),
1927 Uint32(measurePtr.p->m_exec_time_thread),
1928 Uint32(measurePtr.p->m_sleep_time_thread),
1929 Uint32(measurePtr.p->m_spin_time_thread),
1930 Uint32(measurePtr.p->m_send_time_thread));
1931 #endif
1932 base_rusage->ru_utime = curr_rusage->ru_utime;
1933 base_rusage->ru_stime = curr_rusage->ru_stime;
1934
1935 base_measure->m_send_time_thread = curr_measure->m_send_time_thread;
1936 base_measure->m_sleep_time_thread = curr_measure->m_sleep_time_thread;
1937 base_measure->m_spin_time_thread = curr_measure->m_spin_time_thread;
1938 base_measure->m_buffer_full_time_thread = curr_measure->m_buffer_full_time_thread;
1939 }
1940
1941 void
calculate_send_measurement(SendThreadMeasurementPtr sendThreadMeasurementPtr,SendThreadMeasurement * curr_send_thread_measure,SendThreadMeasurement * last_send_thread_measure,Uint64 elapsed_time,Uint32 send_instance)1942 Thrman::calculate_send_measurement(
1943 SendThreadMeasurementPtr sendThreadMeasurementPtr,
1944 SendThreadMeasurement *curr_send_thread_measure,
1945 SendThreadMeasurement *last_send_thread_measure,
1946 Uint64 elapsed_time,
1947 Uint32 send_instance)
1948 {
1949 (void)elapsed_time;
1950 sendThreadMeasurementPtr.p->m_first_measure_done = true;
1951
1952
1953 sendThreadMeasurementPtr.p->m_exec_time =
1954 curr_send_thread_measure->m_exec_time -
1955 last_send_thread_measure->m_exec_time;
1956
1957 sendThreadMeasurementPtr.p->m_sleep_time =
1958 curr_send_thread_measure->m_sleep_time -
1959 last_send_thread_measure->m_sleep_time;
1960
1961 sendThreadMeasurementPtr.p->m_spin_time =
1962 curr_send_thread_measure->m_spin_time -
1963 last_send_thread_measure->m_spin_time;
1964
1965 /**
1966 * Elapsed time on measurements done is exec_time + sleep_time
1967 * as exec_time is first measured as elapsed time and then the
1968 * sleep time is subtracted from elapsed time to get exec time.
1969 *
1970 * See run_send_thread main loop for details.
1971 */
1972 sendThreadMeasurementPtr.p->m_elapsed_time =
1973 sendThreadMeasurementPtr.p->m_exec_time +
1974 sendThreadMeasurementPtr.p->m_sleep_time;
1975 elapsed_time = sendThreadMeasurementPtr.p->m_elapsed_time;
1976
1977 if ((curr_send_thread_measure->m_user_time_os == 0 &&
1978 curr_send_thread_measure->m_kernel_time_os == 0 &&
1979 curr_send_thread_measure->m_elapsed_time_os == 0) ||
1980 (last_send_thread_measure->m_user_time_os == 0 &&
1981 last_send_thread_measure->m_kernel_time_os == 0 &&
1982 last_send_thread_measure->m_elapsed_time_os == 0))
1983 {
1984 jam();
1985 sendThreadMeasurementPtr.p->m_user_time_os = 0;
1986 sendThreadMeasurementPtr.p->m_kernel_time_os = 0;
1987 sendThreadMeasurementPtr.p->m_elapsed_time_os = 0;
1988 sendThreadMeasurementPtr.p->m_idle_time_os = 0;
1989 }
1990 else
1991 {
1992 jam();
1993 sendThreadMeasurementPtr.p->m_user_time_os =
1994 curr_send_thread_measure->m_user_time_os -
1995 last_send_thread_measure->m_user_time_os;
1996
1997 sendThreadMeasurementPtr.p->m_kernel_time_os =
1998 curr_send_thread_measure->m_kernel_time_os -
1999 last_send_thread_measure->m_kernel_time_os;
2000
2001 sendThreadMeasurementPtr.p->m_elapsed_time_os =
2002 curr_send_thread_measure->m_elapsed_time_os -
2003 last_send_thread_measure->m_elapsed_time_os;
2004 sendThreadMeasurementPtr.p->m_idle_time_os =
2005 sendThreadMeasurementPtr.p->m_elapsed_time_os -
2006 (sendThreadMeasurementPtr.p->m_user_time_os +
2007 sendThreadMeasurementPtr.p->m_kernel_time_os);
2008 }
2009 #ifdef DEBUG_CPU_USAGE
2010 #ifndef HIGH_DEBUG_CPU_USAGE
2011 if (elapsed_time > Uint64(1000 * 1000))
2012 {
2013 #endif
2014 Uint32 sleep = sendThreadMeasurementPtr.p->m_sleep_time;
2015 Uint32 exec = sendThreadMeasurementPtr.p->m_exec_time;
2016 int diff = elapsed_time - (sleep + exec);
2017 g_eventLogger->info("send_instance: %u, exec_time: %u, sleep_time: %u,"
2018 " spin_tim: %u, elapsed_time: %u, diff: %d"
2019 ", user_time_os: %u, kernel_time_os: %u,"
2020 " elapsed_time_os: %u",
2021 send_instance,
2022 (Uint32)sendThreadMeasurementPtr.p->m_exec_time,
2023 (Uint32)sendThreadMeasurementPtr.p->m_sleep_time,
2024 (Uint32)sendThreadMeasurementPtr.p->m_spin_time,
2025 (Uint32)sendThreadMeasurementPtr.p->m_elapsed_time,
2026 diff,
2027 (Uint32)sendThreadMeasurementPtr.p->m_user_time_os,
2028 (Uint32)sendThreadMeasurementPtr.p->m_kernel_time_os,
2029 (Uint32)sendThreadMeasurementPtr.p->m_elapsed_time_os);
2030 #ifndef HIGH_DEBUG_CPU_USAGE
2031 }
2032 #endif
2033 #else
2034 (void)send_instance;
2035 #endif
2036
2037 last_send_thread_measure->m_exec_time =
2038 curr_send_thread_measure->m_exec_time;
2039
2040 last_send_thread_measure->m_sleep_time =
2041 curr_send_thread_measure->m_sleep_time;
2042
2043 last_send_thread_measure->m_spin_time =
2044 curr_send_thread_measure->m_spin_time;
2045
2046 last_send_thread_measure->m_user_time_os =
2047 curr_send_thread_measure->m_user_time_os;
2048
2049 last_send_thread_measure->m_kernel_time_os =
2050 curr_send_thread_measure->m_kernel_time_os;
2051
2052 last_send_thread_measure->m_elapsed_time_os =
2053 curr_send_thread_measure->m_elapsed_time_os;
2054 }
2055
2056 void
sum_measures(MeasurementRecord * dest,MeasurementRecord * source)2057 Thrman::sum_measures(MeasurementRecord *dest,
2058 MeasurementRecord *source)
2059 {
2060 dest->m_user_time_os += source->m_user_time_os;
2061 dest->m_kernel_time_os += source->m_kernel_time_os;
2062 dest->m_idle_time_os += source->m_idle_time_os;
2063 dest->m_exec_time_thread += source->m_exec_time_thread;
2064 dest->m_sleep_time_thread += source->m_sleep_time_thread;
2065 dest->m_spin_time_thread += source->m_spin_time_thread;
2066 dest->m_send_time_thread += source->m_send_time_thread;
2067 dest->m_buffer_full_time_thread += source->m_buffer_full_time_thread;
2068 dest->m_elapsed_time += source->m_elapsed_time;
2069 }
2070
2071 bool
calculate_cpu_load_last_second(MeasurementRecord * measure)2072 Thrman::calculate_cpu_load_last_second(MeasurementRecord *measure)
2073 {
2074 MeasurementRecordPtr measurePtr;
2075
2076 memset(measure, 0, sizeof(MeasurementRecord));
2077
2078 c_next_50ms_measure.first(measurePtr);
2079 if (measurePtr.p->m_first_measure_done)
2080 {
2081 do
2082 {
2083 jam();
2084 sum_measures(measure, measurePtr.p);
2085 c_next_50ms_measure.next(measurePtr);
2086 } while (measurePtr.i != RNIL &&
2087 measure->m_elapsed_time <
2088 Uint64(NUM_MEASUREMENTS * 50 * 1000));
2089 STATIC_ASSERT(NUM_MEASUREMENTS * 50 * 1000 == 1000 * 1000);
2090 return true;
2091 }
2092 jam();
2093 return false;
2094 }
2095
2096 bool
calculate_cpu_load_last_20seconds(MeasurementRecord * measure)2097 Thrman::calculate_cpu_load_last_20seconds(MeasurementRecord *measure)
2098 {
2099 MeasurementRecordPtr measurePtr;
2100
2101 memset(measure, 0, sizeof(MeasurementRecord));
2102
2103 c_next_1sec_measure.first(measurePtr);
2104 if (measurePtr.p->m_first_measure_done)
2105 {
2106 do
2107 {
2108 jam();
2109 sum_measures(measure, measurePtr.p);
2110 c_next_1sec_measure.next(measurePtr);
2111 } while (measurePtr.i != RNIL &&
2112 measure->m_elapsed_time <
2113 Uint64(NUM_MEASUREMENTS * NUM_MEASUREMENTS * 50 * 1000));
2114 STATIC_ASSERT(NUM_MEASUREMENTS *
2115 NUM_MEASUREMENTS *
2116 50 * 1000 == 20 * 1000 * 1000);
2117 return true;
2118 }
2119 jam();
2120 return false;
2121 }
2122
2123 bool
calculate_cpu_load_last_400seconds(MeasurementRecord * measure)2124 Thrman::calculate_cpu_load_last_400seconds(MeasurementRecord *measure)
2125 {
2126 MeasurementRecordPtr measurePtr;
2127
2128 memset(measure, 0, sizeof(MeasurementRecord));
2129
2130 c_next_20sec_measure.first(measurePtr);
2131 if (measurePtr.p->m_first_measure_done)
2132 {
2133 do
2134 {
2135 jam();
2136 sum_measures(measure, measurePtr.p);
2137 c_next_20sec_measure.next(measurePtr);
2138 } while (measurePtr.i != RNIL &&
2139 measure->m_elapsed_time <
2140 Uint64(NUM_MEASUREMENTS *
2141 NUM_MEASUREMENTS *
2142 NUM_MEASUREMENTS * 50 * 1000));
2143 STATIC_ASSERT(NUM_MEASUREMENTS *
2144 NUM_MEASUREMENTS *
2145 NUM_MEASUREMENTS *
2146 50 * 1000 == 400 * 1000 * 1000);
2147 return true;
2148 }
2149 jam();
2150 return false;
2151 }
2152
2153 void
init_stats(MeasureStats * stats)2154 Thrman::init_stats(MeasureStats *stats)
2155 {
2156 stats->min_os_percentage = 100;
2157 stats->min_next_os_percentage = 100;
2158
2159 stats->max_os_percentage = 0;
2160 stats->max_next_os_percentage = 0;
2161
2162 stats->avg_os_percentage = 0;
2163
2164 stats->min_thread_percentage = 100;
2165 stats->min_next_thread_percentage = 100;
2166
2167 stats->max_thread_percentage = 0;
2168 stats->max_next_thread_percentage = 0;
2169 stats->avg_thread_percentage = 0;
2170
2171 stats->avg_send_percentage = 0;
2172 }
2173
2174 void
calc_stats(MeasureStats * stats,MeasurementRecord * measure)2175 Thrman::calc_stats(MeasureStats *stats,
2176 MeasurementRecord *measure)
2177 {
2178 Uint64 thread_percentage = 0;
2179 {
2180 if (measure->m_elapsed_time > 0)
2181 {
2182 Uint64 not_used_exec_time =
2183 measure->m_buffer_full_time_thread +
2184 measure->m_spin_time_thread;
2185 Uint64 used_exec_time = 0;
2186 if (measure->m_exec_time_thread > not_used_exec_time)
2187 {
2188 used_exec_time = measure->m_exec_time_thread - not_used_exec_time;
2189 }
2190 thread_percentage = Uint64(1000) * used_exec_time /
2191 measure->m_elapsed_time;
2192 }
2193 thread_percentage += 5;
2194 thread_percentage /= 10;
2195 if (thread_percentage > 100)
2196 {
2197 thread_percentage = 100;
2198 }
2199
2200 if (thread_percentage < stats->min_thread_percentage)
2201 {
2202 jam();
2203 stats->min_next_thread_percentage = stats->min_thread_percentage;
2204 stats->min_thread_percentage = thread_percentage;
2205 }
2206 else if (thread_percentage < stats->min_next_thread_percentage)
2207 {
2208 jam();
2209 stats->min_next_thread_percentage = thread_percentage;
2210 }
2211 else if (thread_percentage > stats->max_thread_percentage)
2212 {
2213 jam();
2214 stats->max_next_thread_percentage = stats->max_thread_percentage;
2215 stats->max_thread_percentage = thread_percentage;
2216 }
2217 else if (thread_percentage > stats->max_next_thread_percentage)
2218 {
2219 jam();
2220 stats->max_next_thread_percentage = thread_percentage;
2221 }
2222 stats->avg_thread_percentage += thread_percentage;
2223 }
2224
2225 Uint64 divider = 1;
2226 Uint64 multiplier = 1;
2227 Uint64 spin_percentage = 0;
2228 if (measure->m_elapsed_time > 0)
2229 {
2230 spin_percentage = (Uint64(1000) * measure->m_spin_time_thread) /
2231 measure->m_elapsed_time;
2232 spin_percentage += 5;
2233 spin_percentage /= 10;
2234 }
2235 if (spin_percentage > 1)
2236 {
2237 jam();
2238 /**
2239 * We take spin time into account for OS time when it is at least
2240 * spinning 2% of the time. Otherwise we will ignore it. What we
2241 * do is that we assume that the time spent in OS time is equally
2242 * divided as the measured time, so e.g. if we spent 60% of the
2243 * time in exec and 30% spinning, then we will multiply os
2244 * percentage by 2/3 since we assume that a third of the time
2245 * in the OS time was spent spinning and we don't want spin time
2246 * to be counted as execution time, it is a form of busy sleep
2247 * time.
2248 */
2249 multiplier = thread_percentage;
2250 divider = (spin_percentage + thread_percentage);
2251 }
2252
2253 {
2254 Uint64 os_percentage = 0;
2255 if (measure->m_elapsed_time > 0)
2256 {
2257 os_percentage = Uint64(1000) *
2258 (measure->m_user_time_os + measure->m_kernel_time_os) /
2259 measure->m_elapsed_time;
2260 }
2261 /* Take spin time into account */
2262 os_percentage *= multiplier;
2263 os_percentage /= divider;
2264
2265 /**
2266 * We calculated percentage * 10, so by adding 5 we ensure that
2267 * rounding is ok. Integer division always round 99.9 to 99, so
2268 * we need to add 0.5% to get proper rounding.
2269 */
2270 os_percentage += 5;
2271 os_percentage /= 10;
2272
2273 if (os_percentage < stats->min_os_percentage)
2274 {
2275 jam();
2276 stats->min_next_os_percentage = stats->min_os_percentage;
2277 stats->min_os_percentage = os_percentage;
2278 }
2279 else if (os_percentage < stats->min_next_os_percentage)
2280 {
2281 jam();
2282 stats->min_next_os_percentage = os_percentage;
2283 }
2284 else if (os_percentage > stats->max_os_percentage)
2285 {
2286 jam();
2287 stats->max_next_os_percentage = stats->max_os_percentage;
2288 stats->max_os_percentage = os_percentage;
2289 }
2290 else if (os_percentage > stats->max_next_os_percentage)
2291 {
2292 jam();
2293 stats->max_next_os_percentage = os_percentage;
2294 }
2295 stats->avg_os_percentage += os_percentage;
2296 }
2297 Uint64 send_percentage = 0;
2298 if (measure->m_elapsed_time > 0)
2299 {
2300 send_percentage = (Uint64(1000) *
2301 measure->m_send_time_thread) / measure->m_elapsed_time;
2302 }
2303 send_percentage += 5;
2304 send_percentage /= 10;
2305 stats->avg_send_percentage += send_percentage;
2306 }
2307
2308 void
calc_avgs(MeasureStats * stats,Uint32 num_stats)2309 Thrman::calc_avgs(MeasureStats *stats, Uint32 num_stats)
2310 {
2311 stats->avg_os_percentage /= num_stats;
2312 stats->avg_thread_percentage /= num_stats;
2313 stats->avg_send_percentage /= num_stats;
2314 }
2315
2316 bool
calculate_stats_last_100ms(MeasureStats * stats)2317 Thrman::calculate_stats_last_100ms(MeasureStats *stats)
2318 {
2319 MeasurementRecordPtr measurePtr;
2320 Uint32 num_stats = 0;
2321 Uint64 elapsed_time = 0;
2322
2323 init_stats(stats);
2324 c_next_50ms_measure.first(measurePtr);
2325 if (!measurePtr.p->m_first_measure_done)
2326 {
2327 jam();
2328 return false;
2329 }
2330 do
2331 {
2332 jam();
2333 calc_stats(stats, measurePtr.p);
2334 num_stats++;
2335 elapsed_time += measurePtr.p->m_elapsed_time;
2336 c_next_50ms_measure.next(measurePtr);
2337 } while (measurePtr.i != RNIL &&
2338 elapsed_time < Uint64(100 * 1000));
2339 calc_avgs(stats, num_stats);
2340 return true;
2341 }
2342
2343 bool
calculate_stats_last_second(MeasureStats * stats)2344 Thrman::calculate_stats_last_second(MeasureStats *stats)
2345 {
2346 MeasurementRecordPtr measurePtr;
2347 Uint32 num_stats = 0;
2348 Uint64 elapsed_time = 0;
2349
2350 init_stats(stats);
2351 c_next_50ms_measure.first(measurePtr);
2352 if (!measurePtr.p->m_first_measure_done)
2353 {
2354 jam();
2355 return false;
2356 }
2357 do
2358 {
2359 jam();
2360 calc_stats(stats, measurePtr.p);
2361 num_stats++;
2362 elapsed_time += measurePtr.p->m_elapsed_time;
2363 c_next_50ms_measure.next(measurePtr);
2364 } while (measurePtr.i != RNIL &&
2365 elapsed_time < Uint64(NUM_MEASUREMENTS * 50 * 1000));
2366 STATIC_ASSERT(NUM_MEASUREMENTS * 50 * 1000 == 1000 * 1000);
2367 calc_avgs(stats, num_stats);
2368 return true;
2369 }
2370
2371 bool
calculate_stats_last_20seconds(MeasureStats * stats)2372 Thrman::calculate_stats_last_20seconds(MeasureStats *stats)
2373 {
2374 MeasurementRecordPtr measurePtr;
2375 Uint32 num_stats = 0;
2376 Uint64 elapsed_time = 0;
2377
2378 init_stats(stats);
2379 c_next_1sec_measure.first(measurePtr);
2380 if (!measurePtr.p->m_first_measure_done)
2381 {
2382 jam();
2383 return false;
2384 }
2385 do
2386 {
2387 jam();
2388 calc_stats(stats, measurePtr.p);
2389 num_stats++;
2390 elapsed_time += measurePtr.p->m_elapsed_time;
2391 c_next_1sec_measure.next(measurePtr);
2392 } while (measurePtr.i != RNIL &&
2393 elapsed_time <
2394 Uint64(NUM_MEASUREMENTS * NUM_MEASUREMENTS * 50 * 1000));
2395 STATIC_ASSERT(NUM_MEASUREMENTS *
2396 NUM_MEASUREMENTS *
2397 50 * 1000 == 20 * 1000 * 1000);
2398 calc_avgs(stats, num_stats);
2399 return true;
2400 }
2401
2402 bool
calculate_stats_last_400seconds(MeasureStats * stats)2403 Thrman::calculate_stats_last_400seconds(MeasureStats *stats)
2404 {
2405 MeasurementRecordPtr measurePtr;
2406 Uint32 num_stats = 0;
2407 Uint64 elapsed_time = 0;
2408
2409 init_stats(stats);
2410 c_next_20sec_measure.first(measurePtr);
2411 if (!measurePtr.p->m_first_measure_done)
2412 {
2413 jam();
2414 return false;
2415 }
2416 do
2417 {
2418 jam();
2419 calc_stats(stats, measurePtr.p);
2420 num_stats++;
2421 elapsed_time += measurePtr.p->m_elapsed_time;
2422 c_next_20sec_measure.next(measurePtr);
2423 } while (measurePtr.i != RNIL &&
2424 elapsed_time <
2425 Uint64(NUM_MEASUREMENTS *
2426 NUM_MEASUREMENTS *
2427 NUM_MEASUREMENTS * 50 * 1000));
2428 STATIC_ASSERT(NUM_MEASUREMENTS *
2429 NUM_MEASUREMENTS *
2430 NUM_MEASUREMENTS *
2431 50 * 1000 == 400 * 1000 * 1000);
2432 calc_avgs(stats, num_stats);
2433 return true;
2434 }
2435
2436 bool
calculate_send_thread_load_last_second(Uint32 send_instance,SendThreadMeasurement * measure)2437 Thrman::calculate_send_thread_load_last_second(Uint32 send_instance,
2438 SendThreadMeasurement *measure)
2439 {
2440 SendThreadPtr sendThreadPtr;
2441 SendThreadMeasurementPtr sendThreadMeasurementPtr;
2442
2443 memset(measure, 0, sizeof(SendThreadMeasurement));
2444
2445 c_sendThreadRecordPool.getPtr(sendThreadPtr, send_instance);
2446
2447 Local_SendThreadMeasurement_fifo list_50ms(c_sendThreadMeasurementPool,
2448 sendThreadPtr.p->m_send_thread_50ms_measurements);
2449 list_50ms.first(sendThreadMeasurementPtr);
2450
2451 if (sendThreadMeasurementPtr.p->m_first_measure_done)
2452 {
2453 do
2454 {
2455 jam();
2456 measure->m_exec_time += sendThreadMeasurementPtr.p->m_exec_time;
2457 measure->m_sleep_time += sendThreadMeasurementPtr.p->m_sleep_time;
2458 measure->m_spin_time += sendThreadMeasurementPtr.p->m_spin_time;
2459 measure->m_elapsed_time += (sendThreadMeasurementPtr.p->m_exec_time +
2460 sendThreadMeasurementPtr.p->m_sleep_time);
2461 measure->m_user_time_os += sendThreadMeasurementPtr.p->m_user_time_os;
2462 measure->m_kernel_time_os += sendThreadMeasurementPtr.p->m_kernel_time_os;
2463 measure->m_elapsed_time_os += sendThreadMeasurementPtr.p->m_elapsed_time_os;
2464 measure->m_idle_time_os += sendThreadMeasurementPtr.p->m_idle_time_os;
2465 list_50ms.next(sendThreadMeasurementPtr);
2466 } while (sendThreadMeasurementPtr.i != RNIL &&
2467 measure->m_elapsed_time < Uint64(1000 * 1000));
2468 return true;
2469 }
2470 jam();
2471 return false;
2472 }
2473
2474 Uint32
calculate_mean_send_thread_load()2475 Thrman::calculate_mean_send_thread_load()
2476 {
2477 SendThreadMeasurement measure;
2478 Uint32 tot_percentage = 0;
2479 if (m_num_send_threads == 0)
2480 {
2481 return 0;
2482 }
2483 for (Uint32 i = 0; i < m_num_send_threads; i++)
2484 {
2485 jam();
2486 bool succ = calculate_send_thread_load_last_second(i, &measure);
2487 if (!succ)
2488 {
2489 jam();
2490 return 0;
2491 }
2492
2493 Uint64 send_thread_percentage = 0;
2494 if (measure.m_elapsed_time)
2495 {
2496 send_thread_percentage = Uint64(1000) *
2497 (measure.m_exec_time - measure.m_spin_time) /
2498 measure.m_elapsed_time;
2499 }
2500 send_thread_percentage += 5;
2501 send_thread_percentage /= 10;
2502
2503 Uint64 send_spin_percentage = 0;
2504 Uint64 multiplier = 1;
2505 Uint64 divider = 1;
2506 if (measure.m_elapsed_time)
2507 {
2508 send_spin_percentage =
2509 (Uint64(1000) * measure.m_spin_time) / measure.m_elapsed_time;
2510 send_spin_percentage += 5;
2511 send_spin_percentage /= 10;
2512 }
2513
2514 if (send_spin_percentage > 1)
2515 {
2516 jam();
2517 multiplier = send_thread_percentage;
2518 divider = (send_thread_percentage + send_spin_percentage);
2519 }
2520
2521 Uint64 send_os_percentage = 0;
2522 if (measure.m_elapsed_time_os)
2523 {
2524 send_os_percentage =
2525 (Uint64(1000) * (measure.m_user_time_os + measure.m_kernel_time_os) /
2526 measure.m_elapsed_time_os);
2527 }
2528 send_os_percentage *= multiplier;
2529 send_os_percentage /= divider;
2530
2531 send_os_percentage += 5;
2532 send_os_percentage /= 10;
2533
2534 if (send_os_percentage > send_thread_percentage)
2535 {
2536 jam();
2537 send_thread_percentage = send_os_percentage;
2538 }
2539 tot_percentage += Uint32(send_thread_percentage);
2540 }
2541 tot_percentage /= m_num_send_threads;
2542 return tot_percentage;
2543 }
2544
2545 void
execGET_CPU_USAGE_REQ(Signal * signal)2546 Thrman::execGET_CPU_USAGE_REQ(Signal *signal)
2547 {
2548 MeasurementRecord curr_measure;
2549 if (calculate_cpu_load_last_second(&curr_measure))
2550 {
2551 jam();
2552 Uint64 percentage = (Uint64(100) *
2553 curr_measure.m_exec_time_thread) /
2554 curr_measure.m_elapsed_time;
2555 signal->theData[0] = Uint32(percentage);
2556 }
2557 else
2558 {
2559 jam();
2560 signal->theData[0] = default_cpu_load;
2561 }
2562 }
2563
2564 void
handle_decisions()2565 Thrman::handle_decisions()
2566 {
2567 MeasureStats *stats = m_current_decision_stats;
2568
2569 if (stats->avg_thread_percentage > (stats->avg_os_percentage + 25))
2570 {
2571 jam();
2572 if (!m_shared_environment)
2573 {
2574 jam();
2575 g_eventLogger->info("Setting ourselves in shared environment,"
2576 " instance: %u, thread pct: %u"
2577 ", os_pct: %u, intervals os: [%u, %u] thread: [%u, %u]",
2578 instance(),
2579 Uint32(stats->avg_thread_percentage),
2580 Uint32(stats->avg_os_percentage),
2581 Uint32(stats->min_next_os_percentage),
2582 Uint32(stats->max_next_os_percentage),
2583 Uint32(stats->min_next_thread_percentage),
2584 Uint32(stats->max_next_thread_percentage));
2585 }
2586 m_shared_environment = true;
2587 m_max_warning_level = 200;
2588 }
2589 else if (stats->avg_thread_percentage < (stats->avg_os_percentage + 15))
2590 {
2591 /**
2592 * We use a hysteresis to avoid swapping between shared environment and
2593 * exclusive environment to quick when conditions quickly change.
2594 */
2595 jam();
2596 if (m_shared_environment)
2597 {
2598 jam();
2599 g_eventLogger->info("Setting ourselves in exclusive environment,"
2600 " instance: %u, thread pct: %u"
2601 ", os_pct: %u, intervals os: [%u, %u] thread: [%u, %u]",
2602 instance(),
2603 Uint32(stats->avg_thread_percentage),
2604 Uint32(stats->avg_os_percentage),
2605 Uint32(stats->min_next_os_percentage),
2606 Uint32(stats->max_next_os_percentage),
2607 Uint32(stats->min_next_thread_percentage),
2608 Uint32(stats->max_next_thread_percentage));
2609 }
2610 m_shared_environment = false;
2611 m_max_warning_level = 20;
2612 }
2613 }
2614
2615 Uint32
calculate_load(MeasureStats & stats,Uint32 & burstiness)2616 Thrman::calculate_load(MeasureStats & stats, Uint32 & burstiness)
2617 {
2618 if (stats.avg_os_percentage >= stats.avg_thread_percentage)
2619 {
2620 burstiness = 0;
2621 jam();
2622 /* Always pick OS reported average unless thread reports higher. */
2623 return Uint32(stats.avg_os_percentage);
2624 }
2625 jam();
2626 burstiness = Uint32(stats.avg_thread_percentage - stats.avg_os_percentage);
2627 return Uint32(stats.avg_thread_percentage);
2628 }
2629
2630 #define LIGHT_LOAD_LEVEL 30
2631 #define MEDIUM_LOAD_LEVEL 75
2632 #define CRITICAL_SEND_LEVEL 75
2633 #define CRITICAL_OVERLOAD_LEVEL 85
2634
2635 Int32
get_load_status(Uint32 load,Uint32 send_load)2636 Thrman::get_load_status(Uint32 load, Uint32 send_load)
2637 {
2638 Uint32 base_load = 0;
2639 if (load > send_load)
2640 {
2641 jam();
2642 base_load = load - send_load;
2643 }
2644
2645 if (base_load < LIGHT_LOAD_LEVEL &&
2646 load < CRITICAL_OVERLOAD_LEVEL)
2647 {
2648 jam();
2649 return (OverloadStatus)LIGHT_LOAD_CONST;
2650 }
2651 else if (base_load < MEDIUM_LOAD_LEVEL)
2652 {
2653 jam();
2654 return (OverloadStatus)MEDIUM_LOAD_CONST;
2655 }
2656 else if (base_load < CRITICAL_OVERLOAD_LEVEL)
2657 {
2658 if (m_send_thread_percentage >= CRITICAL_SEND_LEVEL)
2659 {
2660 jam();
2661 return (OverloadStatus)MEDIUM_LOAD_CONST;
2662 }
2663 else
2664 {
2665 jam();
2666 return (OverloadStatus)OVERLOAD_CONST;
2667 }
2668 }
2669 else
2670 {
2671 jam();
2672 return (OverloadStatus)OVERLOAD_CONST;
2673 }
2674 }
2675
2676 void
change_warning_level(Int32 diff_status,Uint32 factor)2677 Thrman::change_warning_level(Int32 diff_status, Uint32 factor)
2678 {
2679 switch (diff_status)
2680 {
2681 case Int32(-2):
2682 jam();
2683 inc_warning(3 * factor);
2684 break;
2685 case Int32(-1):
2686 jam();
2687 inc_warning(factor);
2688 break;
2689 case Int32(0):
2690 jam();
2691 down_warning(factor);
2692 break;
2693 case Int32(1):
2694 jam();
2695 dec_warning(factor);
2696 break;
2697 case Int32(2):
2698 jam();
2699 dec_warning(3 * factor);
2700 break;
2701 default:
2702 ndbabort();
2703 }
2704 }
2705
2706 void
handle_overload_stats_1sec()2707 Thrman::handle_overload_stats_1sec()
2708 {
2709 Uint32 burstiness;
2710 bool decision_stats = m_current_decision_stats == &c_1sec_stats;
2711
2712 if (decision_stats)
2713 {
2714 jam();
2715 handle_decisions();
2716 }
2717 Uint32 load = calculate_load(c_1sec_stats, burstiness);
2718 m_burstiness += burstiness;
2719
2720 Int32 load_status = get_load_status(load,
2721 c_1sec_stats.avg_send_percentage);
2722 Int32 diff_status = Int32(m_current_overload_status) - load_status;
2723 Uint32 factor = 10;
2724 change_warning_level(diff_status, factor);
2725 }
2726
2727
2728 void
handle_overload_stats_20sec()2729 Thrman::handle_overload_stats_20sec()
2730 {
2731 Uint32 burstiness;
2732 bool decision_stats = m_current_decision_stats == &c_20sec_stats;
2733
2734 if (decision_stats)
2735 {
2736 jam();
2737 handle_decisions();
2738 }
2739 /* Burstiness only incremented for 1 second stats */
2740 Uint32 load = calculate_load(c_20sec_stats, burstiness);
2741 check_burstiness();
2742
2743 Int32 load_status = get_load_status(load,
2744 c_20sec_stats.avg_send_percentage);
2745 Int32 diff_status = Int32(m_current_overload_status) - load_status;
2746 Uint32 factor = 3;
2747 change_warning_level(diff_status, factor);
2748 }
2749
2750 void
handle_overload_stats_400sec()2751 Thrman::handle_overload_stats_400sec()
2752 {
2753 /**
2754 * We only use 400 second stats for long-term decisions, not to affect
2755 * the ongoing decisions.
2756 */
2757 handle_decisions();
2758 }
2759
2760 /**
2761 * Sum burstiness for 20 seconds and if burstiness is at very high levels
2762 * we report it to the user in the node log. It is rather unlikely that
2763 * a reliable service can be delivered in very bursty environments.
2764 */
2765 void
check_burstiness()2766 Thrman::check_burstiness()
2767 {
2768 if (m_burstiness > NUM_MEASUREMENTS * 25)
2769 {
2770 jam();
2771 g_eventLogger->info("Bursty environment, mean burstiness of %u pct"
2772 ", some risk of congestion issues",
2773 m_burstiness / NUM_MEASUREMENTS);
2774 }
2775 else if (m_burstiness > NUM_MEASUREMENTS * 50)
2776 {
2777 jam();
2778 g_eventLogger->info("Very bursty environment, mean burstiness of %u pct"
2779 ", risk for congestion issues",
2780 m_burstiness / NUM_MEASUREMENTS);
2781 }
2782 else if (m_burstiness > NUM_MEASUREMENTS * 75)
2783 {
2784 jam();
2785 g_eventLogger->info("Extremely bursty environment, mean burstiness of %u pct"
2786 ", very high risk for congestion issues",
2787 m_burstiness / NUM_MEASUREMENTS);
2788 }
2789 m_burstiness = 0;
2790 }
2791
2792 /**
2793 * This function is used to indicate that we're moving towards higher overload
2794 * states, so we will unconditionally move the warning level up.
2795 */
2796 void
inc_warning(Uint32 inc_factor)2797 Thrman::inc_warning(Uint32 inc_factor)
2798 {
2799 m_warning_level += inc_factor;
2800 }
2801
2802 /**
2803 * This function is used to indicate that we're moving towards lower overload
2804 * states, so we will unconditionally move the warning level down.
2805 */
2806 void
dec_warning(Uint32 dec_factor)2807 Thrman::dec_warning(Uint32 dec_factor)
2808 {
2809 m_warning_level -= dec_factor;
2810 }
2811
2812 /**
2813 * This function is used to indicate that we're at the correct overload state.
2814 * We will therefore decrease warning levels towards zero independent of whether
2815 * we are at high warning levels or low levels.
2816 */
2817 void
down_warning(Uint32 down_factor)2818 Thrman::down_warning(Uint32 down_factor)
2819 {
2820 if (m_warning_level > Int32(down_factor))
2821 {
2822 jam();
2823 m_warning_level -= down_factor;
2824 }
2825 else if (m_warning_level < (-Int32(down_factor)))
2826 {
2827 jam();
2828 m_warning_level += down_factor;
2829 }
2830 else
2831 {
2832 jam();
2833 m_warning_level = 0;
2834 }
2835 }
2836
2837 void
sendOVERLOAD_STATUS_REP(Signal * signal)2838 Thrman::sendOVERLOAD_STATUS_REP(Signal *signal)
2839 {
2840 signal->theData[0] = instance();
2841 signal->theData[1] = m_current_overload_status;
2842 BlockReference ref = numberToRef(THRMAN,
2843 MAIN_THRMAN_INSTANCE,
2844 getOwnNodeId());
2845 sendSignal(ref, GSN_OVERLOAD_STATUS_REP, signal, 2, JBB);
2846 }
2847
2848 void
sendSEND_THREAD_STATUS_REP(Signal * signal,Uint32 percentage)2849 Thrman::sendSEND_THREAD_STATUS_REP(Signal *signal, Uint32 percentage)
2850 {
2851 signal->theData[0] = percentage;
2852 for (Uint32 instance_no = 1; instance_no <= m_num_threads; instance_no++)
2853 {
2854 BlockReference ref = numberToRef(THRMAN,
2855 instance_no,
2856 getOwnNodeId());
2857 sendSignal(ref, GSN_SEND_THREAD_STATUS_REP, signal, 1, JBB);
2858 }
2859 }
2860
2861 void
handle_state_change(Signal * signal)2862 Thrman::handle_state_change(Signal *signal)
2863 {
2864 if (m_warning_level > Int32(m_max_warning_level))
2865 {
2866 /**
2867 * Warning has reached a threshold and we need to increase the overload
2868 * status.
2869 */
2870 if (m_current_overload_status == (OverloadStatus)LIGHT_LOAD_CONST)
2871 {
2872 jam();
2873 m_current_overload_status = (OverloadStatus)MEDIUM_LOAD_CONST;
2874 }
2875 else if (m_current_overload_status == (OverloadStatus)MEDIUM_LOAD_CONST)
2876 {
2877 jam();
2878 m_current_overload_status = (OverloadStatus)OVERLOAD_CONST;
2879 }
2880 else
2881 {
2882 ndbabort();
2883 }
2884 jam();
2885 #ifdef DEBUG_CPU_USAGE
2886 g_eventLogger->info("instance: %u change to new state: %u, warning: %d",
2887 instance(),
2888 m_current_overload_status,
2889 m_warning_level);
2890 #endif
2891 setOverloadStatus(m_current_overload_status);
2892 m_warning_level = 0;
2893 sendOVERLOAD_STATUS_REP(signal);
2894 return;
2895 }
2896 else if (m_warning_level < (-Int32(m_max_warning_level)))
2897 {
2898 /**
2899 * Warning has reached a threshold and we need to decrease the overload
2900 * status.
2901 */
2902 if (m_current_overload_status == (OverloadStatus)LIGHT_LOAD_CONST)
2903 {
2904 ndbabort();
2905 }
2906 else if (m_current_overload_status == (OverloadStatus)MEDIUM_LOAD_CONST)
2907 {
2908 jam();
2909 m_current_overload_status = (OverloadStatus)LIGHT_LOAD_CONST;
2910 }
2911 else if (m_current_overload_status == (OverloadStatus)OVERLOAD_CONST)
2912 {
2913 jam();
2914 m_current_overload_status = (OverloadStatus)MEDIUM_LOAD_CONST;
2915 }
2916 else
2917 {
2918 ndbabort();
2919 }
2920 jam();
2921 #ifdef DEBUG_CPU_USAGE
2922 g_eventLogger->info("instance: %u change to new state: %u, warning: %d",
2923 instance(),
2924 m_current_overload_status,
2925 m_warning_level);
2926 #endif
2927 setOverloadStatus(m_current_overload_status);
2928 m_warning_level = 0;
2929 sendOVERLOAD_STATUS_REP(signal);
2930 return;
2931 }
2932 jam();
2933 #ifdef HIGH_DEBUG_CPU_USAGE
2934 g_eventLogger->info("instance: %u stay at state: %u, warning: %d",
2935 instance(),
2936 m_current_overload_status,
2937 m_warning_level);
2938 #endif
2939 /* Warning level is within bounds, no need to change anything. */
2940 return;
2941 }
2942
2943 void
check_overload_status(Signal * signal,bool check_1sec,bool check_20sec)2944 Thrman::check_overload_status(Signal *signal,
2945 bool check_1sec,
2946 bool check_20sec)
2947 {
2948 /**
2949 * This function checks the current overload status and makes a decision if
2950 * the status should change or if it is to remain at the current status.
2951 *
2952 * We have two measurements that we use to decide on overload status.
2953 * The first is the measurement based on the actual data reported by the OS.
2954 * This data is considered as correct when it comes to how much CPU time our
2955 * thread has used. However it will not say anything about the environment
2956 * we are executing in.
2957 *
2958 * So in order to get a feel for this environment we estimate also the time
2959 * we are spending in execution mode, how much time we are spending in
2960 * sleep mode. We also take into account if the thread has been spinning,
2961 * this time is added to the sleep time and subtracted fromt the exec time
2962 * of a thread.
2963 *
2964 * We can calculate idle time in two ways.
2965 * 1) m_elapsed_time - (m_user_time_os + m_kernel_time_os)
2966 * This is the actual idle time for the thread. We can only really use
2967 * this measurement in the absence of spin time, spinning time will be
2968 * added to OS time, but isn't really execution time.
2969 * 2) m_sleep_time_thread + m_spin_time_thread
2970 * This is the time that we actually decided to be idle because we had
2971 * no work to do. There are two possible reasons why these could differ.
2972 * One is if we have much mutex contentions that makes the OS put us into
2973 * idle mode since we need the mutex to proceed. The second is when we go
2974 * to sleep based on that we cannot proceed because we're out of buffers
2975 * somewhere. This is actually tracked by m_buffer_full_time_thread, so
2976 * we can add m_sleep_time_thread and m_buffer_full_time_thread to see
2977 * the total time we have decided to go to sleep.
2978 *
2979 * Finally we can also be descheduled by the OS by other threads that
2980 * compete for CPU resources. This kind of environment is much harder
2981 * to control since the variance of the load can be significant.
2982 *
2983 * So we want to measure this background load to see how much CPU resources
2984 * we actually have access to. If we operate in this type of environment we
2985 * need to change the overload status in a much slower speed. If we operate
2986 * in an environment where we get all the resources we need and more or less
2987 * always have access to a CPU when we need to, in this case we can react
2988 * much faster to changes. Still we don't want to react too fast since the
2989 * application behaviour can be a bit bursty as well, and we don't want to
2990 * go back to default behaviour too quick in these cases.
2991 *
2992 * We decide which environment we are processing in once every 20 seconds.
2993 * If we decide that we are in an environment where we don't have access
2994 * to dedicated CPU resources we will set the change speed to 10 seconds.
2995 * This means that the warning level need to reach 200 before we actually
2996 * change to a new overload level.
2997 *
2998 * If we operate in a nice environment where we have very little problems
2999 * with competition for CPU resources we will set the warning level to 20
3000 * before we change the overload level.
3001 *
3002 * So every 20 seconds we will calculate the following parameters for our
3003 * thread.
3004 *
3005 * 1) Mean CPU percentage as defined by (m_user_time_os + m_kernel_time_os)/
3006 * m_elapsed_time_os.
3007 * 2) 95% confidence interval for this measurement (thus given that it is
3008 * calculated by 20 estimates we drop the highest and the lowest
3009 * percentage numbers. We will store the smallest percentage and the
3010 * highest percentage of this interval.
3011 * 3) We calculate the same 3 values based on (m_exec_time_thread -
3012 * (m_buffer_full_time_thread + m_spin_time_thread)) / m_elapsed_time.
3013 * 4) In addition we also calculate the mean value of
3014 * m_send_time_thread / m_elapsed_time.
3015 *
3016 * Finally we take the mean numbers calculated in 1) and 3) and compare
3017 * them. If 3) is more than 10% higher than 1) than we consider ourselves
3018 * to be in a "shared" environment. Otherwise we decide that we are in an
3019 * "exclusive" environment.
3020 *
3021 * If we haven't got 400 seconds of statistics we will make a first estimate
3022 * based on 1 second of data and then again after 20 seconds of execution.
3023 * So the first 20 seconds we will check once per second the above. Then
3024 * we will check once per 20 seconds but only check the last 20 seconds of
3025 * data. After 400 seconds we will go over to checking all statistics back
3026 * 400 seconds.
3027 *
3028 * We will track the overload level by using warning level which is an
3029 * integer. So when it reaches either -20 or +20 we will decide to decrease/
3030 * increase the overload level in an exclusive environment.
3031 * In addition once every 1 seconds we will calculate the average over the
3032 * period and once every 20 seconds we will calculate the average over this
3033 * period.
3034 *
3035 * In general the overload levels are aimed at the following:
3036 * LIGHT_LOAD:
3037 * Light load is defined as using less than 30% of the capacity.
3038 *
3039 * MEDIUM_LOAD:
3040 * Medium load is defined as using less than 75% of the capacity, but
3041 * more than or equal to 30% of the capacity.
3042 *
3043 * OVERLOAD:
3044 * Overload is defined as when one is using more than 75% of the capacity.
3045 *
3046 * The capacity is the CPU resources we have access to, they can differ
3047 * based on which environment we are in.
3048 *
3049 * We define OVERLOAD_STATUS as being at more than 75% load level. At this level
3050 * we want to avoid sending anything from our node. We will definitely stay at
3051 * this level if we can show that any of the following is true for the last
3052 * 50 milliseconds:
3053 * 1) m_user_time_os + m_kernel_time_os is at least 75% of m_elapsed_time
3054 * OR
3055 * 2) m_exec_time_thread is at least 75% of m_elapsed_time
3056 *
3057 * At this level the influence of doing sends should not matter since we
3058 * are not performing any sends at this overload level.
3059 *
3060 * If performance drops down into the 30-75% range for any of this values
3061 * then we will increment a warning counter. This warning counter will be
3062 * decreased by reaching above 75%. If the warning counter reaches 20 we
3063 * will go down to MEDIUM overload level. In shared environment with bursty
3064 * behaviour we will wait until the warning level reaches 200.
3065 */
3066 if (check_1sec)
3067 {
3068 jam();
3069 if (calculate_stats_last_second(&c_1sec_stats))
3070 {
3071 jam();
3072 m_overload_handling_activated = true;
3073 handle_overload_stats_1sec();
3074 }
3075 }
3076 if (check_20sec)
3077 {
3078 jam();
3079 if (calculate_stats_last_400seconds(&c_400sec_stats))
3080 {
3081 jam();
3082 m_overload_handling_activated = true;
3083 m_current_decision_stats = &c_400sec_stats;
3084 handle_overload_stats_400sec();
3085 ndbrequire(calculate_stats_last_20seconds(&c_20sec_stats));
3086 }
3087 else if (calculate_stats_last_20seconds(&c_20sec_stats))
3088 {
3089 jam();
3090 if (m_current_decision_stats != &c_400sec_stats)
3091 {
3092 jam();
3093 m_current_decision_stats = &c_20sec_stats;
3094 }
3095 m_overload_handling_activated = true;
3096 handle_overload_stats_20sec();
3097 }
3098 }
3099 if (!m_overload_handling_activated)
3100 {
3101 jam();
3102 return;
3103 }
3104
3105 MeasureStats stats;
3106 Uint32 burstiness;
3107 calculate_stats_last_100ms(&stats);
3108 Uint32 load = calculate_load(stats, burstiness);
3109
3110 Int32 load_status = get_load_status(load,
3111 stats.avg_send_percentage);
3112 Int32 diff_status = Int32(m_current_overload_status) - load_status;
3113 Uint32 factor = 1;
3114 change_warning_level(diff_status, factor);
3115
3116 handle_state_change(signal);
3117 }
3118
3119 void
execDBINFO_SCANREQ(Signal * signal)3120 Thrman::execDBINFO_SCANREQ(Signal* signal)
3121 {
3122 jamEntry();
3123
3124 DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
3125 const Ndbinfo::ScanCursor* cursor =
3126 CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
3127 Ndbinfo::Ratelimit rl;
3128
3129 switch(req.tableId) {
3130 case Ndbinfo::THREADS_TABLEID: {
3131 Uint32 pos = cursor->data[0];
3132 for (;;)
3133 {
3134 if (pos == 0)
3135 {
3136 jam();
3137 Ndbinfo::Row row(signal, req);
3138 row.write_uint32(getOwnNodeId());
3139 row.write_uint32(getThreadId()); // thr_no
3140 row.write_string(m_thread_name);
3141 row.write_string(m_thread_description);
3142 ndbinfo_send_row(signal, req, row, rl);
3143 }
3144 if (instance() != MAIN_THRMAN_INSTANCE)
3145 {
3146 jam();
3147 break;
3148 }
3149 pos++;
3150 if (pos > m_num_send_threads)
3151 {
3152 jam();
3153 break;
3154 }
3155 {
3156 jam();
3157 Ndbinfo::Row row(signal, req);
3158 row.write_uint32(getOwnNodeId());
3159 row.write_uint32(m_num_threads + (pos - 1)); // thr_no
3160 row.write_string(m_send_thread_name);
3161 row.write_string(m_send_thread_description);
3162 ndbinfo_send_row(signal, req, row, rl);
3163 }
3164
3165 if (pos >= m_num_send_threads)
3166 {
3167 jam();
3168 break;
3169 }
3170
3171 if (rl.need_break(req))
3172 {
3173 jam();
3174 ndbinfo_send_scan_break(signal, req, rl, pos);
3175 return;
3176 }
3177 }
3178 break;
3179 }
3180 case Ndbinfo::THREADBLOCKS_TABLEID: {
3181 Uint32 arr[NO_OF_BLOCKS];
3182 Uint32 len = mt_get_blocklist(this, arr, NDB_ARRAY_SIZE(arr));
3183 Uint32 pos = cursor->data[0];
3184 for (; ; )
3185 {
3186 Ndbinfo::Row row(signal, req);
3187 row.write_uint32(getOwnNodeId());
3188 row.write_uint32(getThreadId()); // thr_no
3189 row.write_uint32(blockToMain(arr[pos])); // block_number
3190 row.write_uint32(blockToInstance(arr[pos])); // block_instance
3191 ndbinfo_send_row(signal, req, row, rl);
3192
3193 pos++;
3194 if (pos == len)
3195 {
3196 jam();
3197 break;
3198 }
3199 else if (rl.need_break(req))
3200 {
3201 jam();
3202 ndbinfo_send_scan_break(signal, req, rl, pos);
3203 return;
3204 }
3205 }
3206 break;
3207 }
3208 case Ndbinfo::THREADSTAT_TABLEID:{
3209 ndb_thr_stat stat;
3210 mt_get_thr_stat(this, &stat);
3211 Ndbinfo::Row row(signal, req);
3212 row.write_uint32(getOwnNodeId());
3213 row.write_uint32(getThreadId()); // thr_no
3214 row.write_string(stat.name);
3215 row.write_uint64(stat.loop_cnt);
3216 row.write_uint64(stat.exec_cnt);
3217 row.write_uint64(stat.wait_cnt);
3218 row.write_uint64(stat.local_sent_prioa);
3219 row.write_uint64(stat.local_sent_priob);
3220 row.write_uint64(stat.remote_sent_prioa);
3221 row.write_uint64(stat.remote_sent_priob);
3222
3223 row.write_uint64(stat.os_tid);
3224 row.write_uint64(NdbTick_CurrentMillisecond());
3225
3226 struct ndb_rusage os_rusage;
3227 Ndb_GetRUsage(&os_rusage, false);
3228 row.write_uint64(os_rusage.ru_utime);
3229 row.write_uint64(os_rusage.ru_stime);
3230 row.write_uint64(os_rusage.ru_minflt);
3231 row.write_uint64(os_rusage.ru_majflt);
3232 row.write_uint64(os_rusage.ru_nvcsw);
3233 row.write_uint64(os_rusage.ru_nivcsw);
3234 ndbinfo_send_row(signal, req, row, rl);
3235 break;
3236 }
3237 case Ndbinfo::CPUSTAT_50MS_TABLEID:
3238 case Ndbinfo::CPUSTAT_1SEC_TABLEID:
3239 case Ndbinfo::CPUSTAT_20SEC_TABLEID:
3240 {
3241
3242 Uint32 pos = cursor->data[0];
3243
3244 SendThreadMeasurementPtr sendThreadMeasurementPtr;
3245 MeasurementRecordPtr measurePtr;
3246
3247 for ( ; ; )
3248 {
3249 jam();
3250 Uint32 pos_thread_id = ((pos >> 8) & 255);
3251 Uint32 pos_index = (pos & 255);
3252 Uint32 pos_ptrI = (pos >> 16);
3253 sendThreadMeasurementPtr.i = RNIL;
3254 sendThreadMeasurementPtr.p = NULL;
3255 measurePtr.i = RNIL;
3256 measurePtr.p = NULL;
3257 if (pos_index >= NUM_MEASUREMENTS)
3258 {
3259 jam();
3260 ndbassert(false);
3261 g_eventLogger->info("pos_index out of range in ndbinfo table %u",
3262 req.tableId);
3263 ndbinfo_send_scan_conf(signal, req, rl);
3264 return;
3265 }
3266
3267 if (pos == 0)
3268 {
3269 /**
3270 * This is the first row to start. We start with the rows from our
3271 * own thread. The pos variable is divided in 3 fields.
3272 * Bit 0-7 contains index number from 0 up to 19.
3273 * Bit 8-15 contains thread number
3274 * Bit 16-31 is a pointer to the next SendThreadMeasurement record.
3275 *
3276 * Thread number 0 is our own thread always. Thread 1 is send thread
3277 * instance 0 and thread 2 send thread instance 1 and so forth. We
3278 * will only worry about send thread data in the main thread where
3279 * we keep track of this information.
3280 *
3281 * The latest measurement is at the end of the linked list and so we
3282 * proceed backwards in the list.
3283 */
3284 if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3285 {
3286 jam();
3287 c_next_50ms_measure.last(measurePtr);
3288 }
3289 else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3290 {
3291 jam();
3292 c_next_1sec_measure.last(measurePtr);
3293 }
3294 else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3295 {
3296 jam();
3297 c_next_20sec_measure.last(measurePtr);
3298 }
3299 else
3300 {
3301 ndbabort();
3302 return;
3303 }
3304 /* Start at index 0, thread 0, measurePtr.i */
3305 pos = measurePtr.i << 16;
3306 }
3307 else if (pos_thread_id != 0)
3308 {
3309 /**
3310 * We are working on the send thread measurement as we are the
3311 * main thread.
3312 */
3313 jam();
3314 if (instance() != MAIN_THRMAN_INSTANCE)
3315 {
3316 g_eventLogger->info("pos_thread_id = %u in non-main thread",
3317 pos_thread_id);
3318 ndbassert(false);
3319 ndbinfo_send_scan_conf(signal, req, rl);
3320 return;
3321 }
3322 c_sendThreadMeasurementPool.getPtr(sendThreadMeasurementPtr, pos_ptrI);
3323 }
3324 else
3325 {
3326 jam();
3327 c_measurementRecordPool.getPtr(measurePtr, pos_ptrI);
3328 }
3329
3330 Ndbinfo::Row row(signal, req);
3331 if (pos_thread_id == 0 && measurePtr.p->m_first_measure_done)
3332 {
3333 jam();
3334 /**
3335 * We report buffer_full_time, spin_time and exec_time as
3336 * separate times. So exec time does not include buffer_full_time
3337 * when we report it to the user and it also does not include
3338 * spin time when we report it to the user and finally it does
3339 * also not include send time of the thread. So essentially
3340 * the sum of exec_time, sleep_time, spin_time, send_time and
3341 * buffer_full_time should be very close to the elapsed time.
3342 */
3343 Uint32 exec_time = measurePtr.p->m_exec_time_thread;
3344 Uint32 spin_time = measurePtr.p->m_spin_time_thread;
3345 Uint32 buffer_full_time = measurePtr.p->m_buffer_full_time_thread;
3346 Uint32 send_time = measurePtr.p->m_send_time_thread;
3347
3348 if (exec_time < (buffer_full_time + send_time + spin_time))
3349 {
3350 exec_time = 0;
3351 }
3352 else
3353 {
3354 exec_time -= buffer_full_time;
3355 exec_time -= spin_time;
3356 exec_time -= send_time;
3357 }
3358 row.write_uint32(getOwnNodeId());
3359 row.write_uint32 (getThreadId());
3360 row.write_uint32(Uint32(measurePtr.p->m_user_time_os));
3361 row.write_uint32(Uint32(measurePtr.p->m_kernel_time_os));
3362 row.write_uint32(Uint32(measurePtr.p->m_idle_time_os));
3363 row.write_uint32(Uint32(exec_time));
3364 row.write_uint32(Uint32(measurePtr.p->m_sleep_time_thread));
3365 row.write_uint32(Uint32(measurePtr.p->m_spin_time_thread));
3366 row.write_uint32(Uint32(measurePtr.p->m_send_time_thread));
3367 row.write_uint32(Uint32(measurePtr.p->m_buffer_full_time_thread));
3368 row.write_uint32(Uint32(measurePtr.p->m_elapsed_time));
3369 ndbinfo_send_row(signal, req, row, rl);
3370 }
3371 else if (pos_thread_id != 0 && sendThreadMeasurementPtr.p->m_first_measure_done)
3372 {
3373 jam();
3374 row.write_uint32(getOwnNodeId());
3375 row.write_uint32 (m_num_threads + (pos_thread_id - 1));
3376
3377 Uint32 exec_time = sendThreadMeasurementPtr.p->m_exec_time;
3378 Uint32 sleep_time = sendThreadMeasurementPtr.p->m_sleep_time;
3379
3380 row.write_uint32(Uint32(sendThreadMeasurementPtr.p->m_user_time_os));
3381 row.write_uint32(Uint32(sendThreadMeasurementPtr.p->m_kernel_time_os));
3382 row.write_uint32(Uint32(sendThreadMeasurementPtr.p->m_idle_time_os));
3383 row.write_uint32(exec_time);
3384 row.write_uint32(sleep_time);
3385 row.write_uint32(0);
3386 row.write_uint32(exec_time);
3387 row.write_uint32(Uint32(0));
3388 Uint32 elapsed_time =
3389 sendThreadMeasurementPtr.p->m_exec_time +
3390 sendThreadMeasurementPtr.p->m_sleep_time;
3391 row.write_uint32(elapsed_time);
3392 ndbinfo_send_row(signal, req, row, rl);
3393 }
3394 else
3395 {
3396 // Procede to next thread at first undone measurement
3397 pos_index = NUM_MEASUREMENTS - 1;
3398 }
3399
3400 if ((pos_index + 1) == NUM_MEASUREMENTS)
3401 {
3402 /**
3403 * We are done with this thread, we need to either move on to next
3404 * send thread or stop.
3405 */
3406 if (instance() != MAIN_THRMAN_INSTANCE)
3407 {
3408 jam();
3409 break;
3410 }
3411 /* This check will also ensure that we break without send threads */
3412 if (pos_thread_id == m_num_send_threads)
3413 {
3414 jam();
3415 break;
3416 }
3417 jam();
3418 pos_thread_id++;
3419 SendThreadPtr sendThreadPtr;
3420 c_sendThreadRecordPool.getPtr(sendThreadPtr, pos_thread_id - 1);
3421
3422 if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3423 {
3424 jam();
3425 Local_SendThreadMeasurement_fifo list_50ms(
3426 c_sendThreadMeasurementPool,
3427 sendThreadPtr.p->m_send_thread_50ms_measurements);
3428 list_50ms.last(sendThreadMeasurementPtr);
3429 }
3430 else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3431 {
3432 jam();
3433 Local_SendThreadMeasurement_fifo list_1sec(
3434 c_sendThreadMeasurementPool,
3435 sendThreadPtr.p->m_send_thread_1sec_measurements);
3436 list_1sec.last(sendThreadMeasurementPtr);
3437 }
3438 else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3439 {
3440 jam();
3441 Local_SendThreadMeasurement_fifo list_20sec(
3442 c_sendThreadMeasurementPool,
3443 sendThreadPtr.p->m_send_thread_20sec_measurements);
3444 list_20sec.last(sendThreadMeasurementPtr);
3445 }
3446 else
3447 {
3448 ndbabort();
3449 return;
3450 }
3451
3452 pos = (sendThreadMeasurementPtr.i << 16) +
3453 (pos_thread_id << 8) +
3454 0;
3455 }
3456 else if (pos_thread_id == 0)
3457 {
3458 if (measurePtr.i == RNIL)
3459 {
3460 jam();
3461 g_eventLogger->info("measurePtr.i = RNIL");
3462 ndbassert(false);
3463 ndbinfo_send_scan_conf(signal, req, rl);
3464 return;
3465 }
3466 if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3467 {
3468 jam();
3469 c_next_50ms_measure.prev(measurePtr);
3470 if (measurePtr.i == RNIL)
3471 {
3472 jam();
3473 c_next_50ms_measure.first(measurePtr);
3474 }
3475 }
3476 else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3477 {
3478 jam();
3479 c_next_1sec_measure.prev(measurePtr);
3480 if (measurePtr.i == RNIL)
3481 {
3482 jam();
3483 c_next_1sec_measure.first(measurePtr);
3484 }
3485 }
3486 else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3487 {
3488 jam();
3489 c_next_20sec_measure.prev(measurePtr);
3490 if (measurePtr.i == RNIL)
3491 {
3492 jam();
3493 c_next_20sec_measure.first(measurePtr);
3494 }
3495 }
3496 else
3497 {
3498 ndbabort();
3499 return;
3500 }
3501 pos = (measurePtr.i << 16) +
3502 (0 << 8) +
3503 pos_index + 1;
3504 }
3505 else
3506 {
3507 SendThreadPtr sendThreadPtr;
3508 c_sendThreadRecordPool.getPtr(sendThreadPtr, pos_thread_id - 1);
3509
3510 ndbrequire(sendThreadMeasurementPtr.i != RNIL);
3511 if (req.tableId == Ndbinfo::CPUSTAT_50MS_TABLEID)
3512 {
3513 Local_SendThreadMeasurement_fifo list_50ms(
3514 c_sendThreadMeasurementPool,
3515 sendThreadPtr.p->m_send_thread_50ms_measurements);
3516 list_50ms.prev(sendThreadMeasurementPtr);
3517 if (sendThreadMeasurementPtr.i == RNIL)
3518 {
3519 jam();
3520 list_50ms.first(sendThreadMeasurementPtr);
3521 }
3522 }
3523 else if (req.tableId == Ndbinfo::CPUSTAT_1SEC_TABLEID)
3524 {
3525 Local_SendThreadMeasurement_fifo list_1sec(
3526 c_sendThreadMeasurementPool,
3527 sendThreadPtr.p->m_send_thread_1sec_measurements);
3528 list_1sec.prev(sendThreadMeasurementPtr);
3529 if (sendThreadMeasurementPtr.i == RNIL)
3530 {
3531 jam();
3532 list_1sec.first(sendThreadMeasurementPtr);
3533 }
3534 }
3535 else if (req.tableId == Ndbinfo::CPUSTAT_20SEC_TABLEID)
3536 {
3537 Local_SendThreadMeasurement_fifo list_20sec(
3538 c_sendThreadMeasurementPool,
3539 sendThreadPtr.p->m_send_thread_20sec_measurements);
3540 list_20sec.prev(sendThreadMeasurementPtr);
3541 if (sendThreadMeasurementPtr.i == RNIL)
3542 {
3543 jam();
3544 list_20sec.first(sendThreadMeasurementPtr);
3545 }
3546 }
3547 else
3548 {
3549 ndbabort();
3550 return;
3551 }
3552 pos = (sendThreadMeasurementPtr.i << 16) +
3553 (pos_thread_id << 8) +
3554 pos_index + 1;
3555 }
3556
3557 if (rl.need_break(req))
3558 {
3559 jam();
3560 ndbinfo_send_scan_break(signal, req, rl, pos);
3561 return;
3562 }
3563 }
3564 break;
3565 }
3566 case Ndbinfo::CPUSTAT_TABLEID:
3567 {
3568
3569 Uint32 pos = cursor->data[0];
3570
3571 SendThreadMeasurementPtr sendThreadMeasurementPtr;
3572 MeasurementRecordPtr measurePtr;
3573
3574 for ( ; ; )
3575 {
3576 if (pos == 0)
3577 {
3578 jam();
3579 MeasurementRecord measure;
3580 bool success = calculate_cpu_load_last_second(&measure);
3581 ndbrequire(success);
3582 Ndbinfo::Row row(signal, req);
3583 row.write_uint32(getOwnNodeId());
3584 row.write_uint32 (getThreadId());
3585
3586 if (measure.m_elapsed_time)
3587 {
3588 jam();
3589 Uint64 user_os_percentage =
3590 ((Uint64(100) *
3591 measure.m_user_time_os) +
3592 Uint64(500 * 1000)) /
3593 measure.m_elapsed_time;
3594
3595 Uint64 kernel_percentage =
3596 ((Uint64(100) *
3597 measure.m_kernel_time_os) +
3598 Uint64(500 * 1000)) /
3599 measure.m_elapsed_time;
3600
3601 /* Ensure that total percentage reported is always 100% */
3602 if (user_os_percentage + kernel_percentage > Uint64(100))
3603 {
3604 kernel_percentage = Uint64(100) - user_os_percentage;
3605 }
3606 Uint64 idle_os_percentage =
3607 Uint64(100) - (user_os_percentage + kernel_percentage);
3608 row.write_uint32(Uint32(user_os_percentage));
3609 row.write_uint32(Uint32(kernel_percentage));
3610 row.write_uint32(Uint32(idle_os_percentage));
3611
3612 Uint64 exec_time = measure.m_exec_time_thread;
3613 Uint64 spin_time = measure.m_spin_time_thread;
3614 Uint64 buffer_full_time = measure.m_buffer_full_time_thread;
3615 Uint64 send_time = measure.m_send_time_thread;
3616
3617 Uint64 non_exec_time = spin_time + send_time + buffer_full_time;
3618 if (unlikely(non_exec_time > exec_time))
3619 {
3620 exec_time = 0;
3621 }
3622 else
3623 {
3624 exec_time -= non_exec_time;
3625 }
3626
3627 Uint64 exec_percentage =
3628 ((Uint64(100) * exec_time) +
3629 Uint64(500 * 1000)) /
3630 measure.m_elapsed_time;
3631
3632 Uint64 spin_percentage =
3633 ((Uint64(100) * spin_time) +
3634 Uint64(500 * 1000)) /
3635 measure.m_elapsed_time;
3636
3637 Uint64 send_percentage =
3638 ((Uint64(100) * send_time) +
3639 Uint64(500 * 1000)) /
3640 measure.m_elapsed_time;
3641
3642 Uint64 buffer_full_percentage =
3643 ((Uint64(100) * buffer_full_time) +
3644 Uint64(500 * 1000)) /
3645 measure.m_elapsed_time;
3646
3647 /* Ensure that total percentage reported is always 100% */
3648 Uint64 exec_full_percentage = exec_percentage +
3649 buffer_full_percentage;
3650 Uint64 exec_full_send_percentage = exec_percentage +
3651 buffer_full_percentage +
3652 send_percentage;
3653 Uint64 all_exec_percentage = exec_percentage +
3654 buffer_full_percentage +
3655 send_percentage +
3656 spin_percentage;
3657 Uint64 sleep_percentage = 0;
3658 if (buffer_full_percentage > Uint64(100))
3659 {
3660 buffer_full_percentage = Uint64(100);
3661 exec_percentage = 0;
3662 send_percentage = 0;
3663 spin_percentage = 0;
3664 }
3665 else if (exec_full_percentage > Uint64(100))
3666 {
3667 exec_percentage = Uint64(100) - buffer_full_percentage;
3668 send_percentage = 0;
3669 spin_percentage = 0;
3670 }
3671 else if (exec_full_send_percentage > Uint64(100))
3672 {
3673 exec_percentage = Uint64(100) - exec_full_percentage;
3674 spin_percentage = 0;
3675 }
3676 else if (all_exec_percentage > Uint64(100))
3677 {
3678 exec_percentage = Uint64(100) - exec_full_send_percentage;
3679 }
3680 else
3681 {
3682 sleep_percentage = Uint64(100) - all_exec_percentage;
3683 }
3684 ndbrequire(exec_percentage +
3685 buffer_full_percentage +
3686 send_percentage +
3687 spin_percentage +
3688 sleep_percentage == Uint64(100));
3689
3690 row.write_uint32(Uint32(exec_percentage));
3691 row.write_uint32(Uint32(sleep_percentage));
3692 row.write_uint32(Uint32(spin_percentage));
3693 row.write_uint32(Uint32(send_percentage));
3694 row.write_uint32(Uint32(buffer_full_percentage));
3695
3696 row.write_uint32(Uint32(measure.m_elapsed_time));
3697 }
3698 else
3699 {
3700 jam();
3701 row.write_uint32(0);
3702 row.write_uint32(0);
3703 row.write_uint32(0);
3704 row.write_uint32(0);
3705 row.write_uint32(0);
3706 row.write_uint32(0);
3707 row.write_uint32(0);
3708 row.write_uint32(0);
3709 row.write_uint32(0);
3710 row.write_uint32(0);
3711 }
3712
3713 ndbinfo_send_row(signal, req, row, rl);
3714 if (instance() != MAIN_THRMAN_INSTANCE ||
3715 m_num_send_threads == 0)
3716 {
3717 jam();
3718 break;
3719 }
3720 pos++;
3721 }
3722 else
3723 {
3724 /* Send thread CPU load */
3725 jam();
3726 if ((pos - 1) >= m_num_send_threads)
3727 {
3728 jam();
3729 g_eventLogger->info("send instance out of range");
3730 ndbassert(false);
3731 ndbinfo_send_scan_conf(signal, req, rl);
3732 return;
3733 }
3734 SendThreadMeasurement measure;
3735 bool success = calculate_send_thread_load_last_second(pos - 1,
3736 &measure);
3737 if (!success)
3738 {
3739 g_eventLogger->info("Failed calculate_send_thread_load_last_second");
3740 ndbassert(false);
3741 ndbinfo_send_scan_conf(signal, req, rl);
3742 return;
3743 }
3744 Ndbinfo::Row row(signal, req);
3745 row.write_uint32(getOwnNodeId());
3746 row.write_uint32 (m_num_threads + (pos - 1));
3747
3748 if (measure.m_elapsed_time_os == 0)
3749 {
3750 jam();
3751 row.write_uint32(0);
3752 row.write_uint32(0);
3753 row.write_uint32(0);
3754 }
3755 else
3756 {
3757 Uint64 user_time_os_percentage = ((Uint64(100) *
3758 measure.m_user_time_os) +
3759 Uint64(500 * 1000)) /
3760 measure.m_elapsed_time_os;
3761
3762 row.write_uint32(Uint32(user_time_os_percentage));
3763
3764 Uint64 kernel_time_os_percentage = ((Uint64(100) *
3765 measure.m_kernel_time_os) +
3766 Uint64(500 * 1000)) /
3767 measure.m_elapsed_time_os;
3768
3769 row.write_uint32(Uint32(kernel_time_os_percentage));
3770
3771 Uint64 idle_time_os_percentage = ((Uint64(100) *
3772 measure.m_idle_time_os) +
3773 Uint64(500 * 1000)) /
3774 measure.m_elapsed_time_os;
3775
3776 row.write_uint32(Uint32(idle_time_os_percentage));
3777 }
3778
3779 if (measure.m_elapsed_time > 0)
3780 {
3781 Uint64 exec_time = measure.m_exec_time;
3782 Uint64 spin_time = measure.m_spin_time;
3783 Uint64 sleep_time = measure.m_sleep_time;
3784
3785 exec_time -= spin_time;
3786
3787 Uint64 exec_percentage = ((Uint64(100) * exec_time) +
3788 Uint64(500 * 1000)) /
3789 measure.m_elapsed_time;
3790
3791 Uint64 sleep_percentage = ((Uint64(100) * sleep_time) +
3792 Uint64(500 * 1000)) /
3793 measure.m_elapsed_time;
3794
3795 Uint64 spin_percentage = ((Uint64(100) * spin_time) +
3796 Uint64(500 * 1000)) /
3797 measure.m_elapsed_time;
3798
3799 row.write_uint32(Uint32(exec_percentage));
3800 row.write_uint32(Uint32(sleep_percentage));
3801 row.write_uint32(Uint32(spin_percentage));
3802 row.write_uint32(Uint32(exec_percentage));
3803 row.write_uint32(Uint32(0));
3804 row.write_uint32(Uint32(measure.m_elapsed_time));
3805 }
3806 else
3807 {
3808 jam();
3809 row.write_uint32(0);
3810 row.write_uint32(0);
3811 row.write_uint32(0);
3812 row.write_uint32(0);
3813 row.write_uint32(0);
3814 row.write_uint32(0);
3815 }
3816 ndbinfo_send_row(signal, req, row, rl);
3817
3818 if (pos == m_num_send_threads)
3819 {
3820 jam();
3821 break;
3822 }
3823 pos++;
3824 }
3825 if (rl.need_break(req))
3826 {
3827 jam();
3828 ndbinfo_send_scan_break(signal, req, rl, pos);
3829 return;
3830 }
3831 }
3832 break;
3833 }
3834 default:
3835 break;
3836 }
3837
3838 ndbinfo_send_scan_conf(signal, req, rl);
3839 }
3840
3841 static void
release_wait_freeze()3842 release_wait_freeze()
3843 {
3844 NdbMutex_Lock(g_freeze_mutex);
3845 g_freeze_waiters--;
3846 if (g_freeze_waiters == 0)
3847 {
3848 g_freeze_wakeup = false;
3849 }
3850 NdbMutex_Unlock(g_freeze_mutex);
3851 }
3852
3853 static Uint32
check_freeze_waiters()3854 check_freeze_waiters()
3855 {
3856 NdbMutex_Lock(g_freeze_mutex);
3857 Uint32 sync_waiters = g_freeze_waiters;
3858 NdbMutex_Unlock(g_freeze_mutex);
3859 return sync_waiters;
3860 }
3861
3862 void
execFREEZE_THREAD_REQ(Signal * signal)3863 Thrman::execFREEZE_THREAD_REQ(Signal *signal)
3864 {
3865 FreezeThreadReq* req = (FreezeThreadReq*)&signal->theData[0];
3866 m_freeze_req = *req;
3867 /**
3868 * We are requested to stop executing in this thread here. When all
3869 * threads have stopped here we are ready to perform the change
3870 * operation requested.
3871 *
3872 * The current change operations supported are:
3873 * Switch between inactive transporters and active transporters.
3874 * This is used when we increase the number of transporters on a link
3875 * from a single transporter to multiple transporters sharing the
3876 * load. It is important synchronize this to ensure that signals
3877 * continue to arrive to the destination threads in signal order.
3878 */
3879 if (instance() != 1)
3880 {
3881 flush_send_buffers();
3882 wait_freeze(false);
3883 return;
3884 }
3885 wait_freeze(true);
3886 wait_all_stop(signal);
3887 }
3888
3889 void
wait_freeze(bool ret)3890 Thrman::wait_freeze(bool ret)
3891 {
3892 NdbMutex_Lock(g_freeze_mutex);
3893 g_freeze_waiters++;
3894 if (ret)
3895 {
3896 NdbMutex_Unlock(g_freeze_mutex);
3897 jam();
3898 return;
3899 }
3900 while (true)
3901 {
3902 NdbCondition_WaitTimeout(g_freeze_condition,
3903 g_freeze_mutex,
3904 10);
3905 set_watchdog_counter();
3906 if (g_freeze_wakeup)
3907 {
3908 g_freeze_waiters--;
3909 if (g_freeze_waiters == 0)
3910 {
3911 g_freeze_wakeup = false;
3912 }
3913 NdbMutex_Unlock(g_freeze_mutex);
3914 jam();
3915 return;
3916 }
3917 }
3918 return;
3919 }
3920
3921 void
wait_all_stop(Signal * signal)3922 Thrman::wait_all_stop(Signal *signal)
3923 {
3924 if (check_freeze_waiters() == m_num_threads)
3925 {
3926 jam();
3927 FreezeActionReq* req = CAST_PTR(FreezeActionReq, signal->getDataPtrSend());
3928 BlockReference ref = m_freeze_req.senderRef;
3929 req->nodeId = m_freeze_req.nodeId;
3930 req->senderRef = reference();
3931 sendSignal(ref, GSN_FREEZE_ACTION_REQ, signal,
3932 FreezeActionReq::SignalLength, JBA);
3933 return;
3934 }
3935 signal->theData[0] = ZWAIT_ALL_STOP;
3936 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
3937 }
3938
3939 void
execFREEZE_ACTION_CONF(Signal * signal)3940 Thrman::execFREEZE_ACTION_CONF(Signal *signal)
3941 {
3942 /**
3943 * The action is performed, we have completed this action.
3944 * We can now release all threads and ensure that they are
3945 * woken up again. We wait for all threads to wakeup before
3946 * we proceed to ensure that the functionality is available
3947 * for a new synchronize action.
3948 */
3949 NdbMutex_Lock(g_freeze_mutex);
3950 g_freeze_wakeup = true;
3951 NdbCondition_Broadcast(g_freeze_condition);
3952 NdbMutex_Unlock(g_freeze_mutex);
3953 release_wait_freeze();
3954 wait_all_start(signal);
3955 }
3956
wait_all_start(Signal * signal)3957 void Thrman::wait_all_start(Signal *signal)
3958 {
3959 if (check_freeze_waiters() == 0)
3960 {
3961 jam();
3962 FreezeThreadConf* conf = CAST_PTR(FreezeThreadConf, signal->getDataPtrSend());
3963 BlockReference ref = m_freeze_req.senderRef;
3964 conf->nodeId = m_freeze_req.nodeId;
3965 sendSignal(ref, GSN_FREEZE_THREAD_CONF, signal,
3966 FreezeThreadConf::SignalLength, JBA);
3967 return;
3968 }
3969 signal->theData[0] = ZWAIT_ALL_START;
3970 sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
3971 }
3972
3973 void
execDUMP_STATE_ORD(Signal * signal)3974 Thrman::execDUMP_STATE_ORD(Signal *signal)
3975 {
3976 DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0];
3977 Uint32 arg = dumpState->args[0];
3978 Uint32 val1 = dumpState->args[1];
3979 if (arg == DumpStateOrd::SetSchedulerSpinTimerAll)
3980 {
3981 if (signal->length() != 2)
3982 {
3983 if (instance() == MAIN_THRMAN_INSTANCE)
3984 {
3985 g_eventLogger->info("Use: DUMP 104000 spintime");
3986 }
3987 return;
3988 }
3989 set_configured_spintime(val1, false);
3990 }
3991 else if (arg == DumpStateOrd::SetSchedulerSpinTimerThread)
3992 {
3993 if (signal->length() != 3)
3994 {
3995 if (instance() == MAIN_THRMAN_INSTANCE)
3996 {
3997 g_eventLogger->info("Use: DUMP 104001 thr_no spintime");
3998 }
3999 return;
4000 }
4001 Uint32 val2 = dumpState->args[2];
4002 if (val1 + 1 == instance())
4003 {
4004 jam();
4005 set_configured_spintime(val2, true);
4006 }
4007 }
4008 else if (arg == DumpStateOrd::SetAllowedSpinOverhead)
4009 {
4010 if (signal->length() != 2)
4011 {
4012 if (instance() == MAIN_THRMAN_INSTANCE)
4013 {
4014 g_eventLogger->info("Use: DUMP 104002 AllowedSpinOverhead");
4015 }
4016 return;
4017 }
4018 set_allowed_spin_overhead(val1);
4019 }
4020 else if (arg == DumpStateOrd::SetSpintimePerCall)
4021 {
4022 if (signal->length() != 2)
4023 {
4024 if (instance() == MAIN_THRMAN_INSTANCE)
4025 {
4026 g_eventLogger->info("Use: DUMP 104003 SpintimePerCall");
4027 }
4028 return;
4029 }
4030 set_spintime_per_call(val1);
4031 }
4032 else if (arg == DumpStateOrd::EnableAdaptiveSpinning)
4033 {
4034 if (signal->length() != 2)
4035 {
4036 if (instance() == MAIN_THRMAN_INSTANCE)
4037 {
4038 g_eventLogger->info("Use: DUMP 104004 0/1"
4039 " (Enable/Disable Adaptive Spinning");
4040 }
4041 return;
4042 }
4043 set_enable_adaptive_spinning(val1 != 0);
4044 }
4045 return;
4046 }
4047
ThrmanProxy(Block_context & ctx)4048 ThrmanProxy::ThrmanProxy(Block_context & ctx) :
4049 LocalProxy(THRMAN, ctx)
4050 {
4051 addRecSignal(GSN_FREEZE_THREAD_REQ, &ThrmanProxy::execFREEZE_THREAD_REQ);
4052 }
4053
~ThrmanProxy()4054 ThrmanProxy::~ThrmanProxy()
4055 {
4056 }
4057
4058 SimulatedBlock*
newWorker(Uint32 instanceNo)4059 ThrmanProxy::newWorker(Uint32 instanceNo)
4060 {
4061 return new Thrman(m_ctx, instanceNo);
4062 }
4063
BLOCK_FUNCTIONS(ThrmanProxy)4064 BLOCK_FUNCTIONS(ThrmanProxy)
4065
4066 void
4067 ThrmanProxy::execFREEZE_THREAD_REQ(Signal* signal)
4068 {
4069 /**
4070 * This signal is always sent from the main thread. Thus we should not
4071 * send the signal to the first instance in THRMAN which is the main
4072 * thread since this would block the main thread from moving forward.
4073 *
4074 * The work to be done is done by the main thread, the other threads
4075 * only need to stop and wait to be woken up again to proceed with
4076 * normal processing.
4077 */
4078 for (Uint32 i = 0; i < c_workers; i++)
4079 {
4080 jam();
4081 Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
4082 sendSignal(ref, GSN_FREEZE_THREAD_REQ, signal, signal->getLength(), JBA);
4083 }
4084 }
4085