1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "FastScheduler.hpp"
26 #include "ThreadConfig.hpp"
27 #include "RefConvert.hpp"
28
29 #include "Emulator.hpp"
30 #include "VMSignal.hpp"
31
32 #include <SignalLoggerManager.hpp>
33 #include <BlockNumbers.h>
34 #include <GlobalSignalNumbers.h>
35 #include <signaldata/EventReport.hpp>
36 #include "LongSignal.hpp"
37 #include <NdbTick.h>
38
39 #define JAM_FILE_ID 242
40
41
42 #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
43 #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
44 #define EXTRA_SIGNALS_PER_DO_JOB 32
45
FastScheduler()46 FastScheduler::FastScheduler()
47 {
48 // These constants work for sun only, but they should be initated from
49 // Emulator.C as soon as VMTime has been initiated.
50 theJobBuffers[0].newBuffer(JBASIZE);
51 theJobBuffers[1].newBuffer(JBBSIZE);
52 theJobBuffers[2].newBuffer(JBCSIZE);
53 theJobBuffers[3].newBuffer(JBDSIZE);
54 clear();
55 }
56
~FastScheduler()57 FastScheduler::~FastScheduler()
58 {
59 }
60
61 void
clear()62 FastScheduler::clear()
63 {
64 int i;
65 // Make sure the restart signals are not sent too early
66 // the prio is set back in 'main' using the 'ready' method.
67 globalData.highestAvailablePrio = LEVEL_IDLE;
68 globalData.sendPackedActivated = 0;
69 globalData.activateSendPacked = 0;
70 for (i = 0; i < JB_LEVELS; i++){
71 theJobBuffers[i].clear();
72 }
73 globalData.JobCounter = 0;
74 globalData.JobLap = 0;
75 globalData.loopMax = 32;
76 globalData.VMSignals[0].header.theSignalId = 0;
77
78 theDoJobTotalCounter = 0;
79 theDoJobCallCounter = 0;
80 }
81
82 void
activateSendPacked()83 FastScheduler::activateSendPacked()
84 {
85 globalData.sendPackedActivated = 1;
86 globalData.activateSendPacked = 0;
87 globalData.loopMax = 2048;
88 }//FastScheduler::activateSendPacked()
89
90 //------------------------------------------------------------------------
91 // sendPacked is executed at the end of the loop.
92 // To ensure that we don't send any messages before executing all local
93 // packed signals we do another turn in the loop (unless we have already
94 // executed too many signals in the loop).
95 //------------------------------------------------------------------------
96 Uint32
doJob(Uint32 loopStartCount)97 FastScheduler::doJob(Uint32 loopStartCount)
98 {
99 Uint32 loopCount = 0;
100 Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
101 Uint32 TloopMax = (Uint32)globalData.loopMax;
102 if (TminLoops < TloopMax) {
103 TloopMax = TminLoops;
104 }//if
105 if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
106 TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
107 }//if
108 register Signal* signal = getVMSignals();
109 register Uint32 tHighPrio= globalData.highestAvailablePrio;
110 do{
111 while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
112 #ifdef VM_TRACE
113 /* Find reading / propagation of junk */
114 signal->garbage_register();
115 #endif
116 if (unlikely(loopStartCount >
117 MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN))
118 {
119 /**
120 * This implements the bounded delay signal concept. This
121 * means that we will never execute more than 160 signals
122 * before getting the signals with delay 0 put into the
123 * A-level job buffer.
124 */
125 loopStartCount = 0;
126 globalEmulatorData.theThreadConfig->scanZeroTimeQueue();
127 }
128 // To ensure we find bugs quickly
129 register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
130 // also strip any instance bits since this is non-MT code
131 register BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
132 register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
133 globalData.incrementWatchDogCounter(1);
134 if (reg_bnr > 0) {
135 Uint32 tJobCounter = globalData.JobCounter;
136 Uint64 tJobLap = globalData.JobLap;
137 SimulatedBlock* b = globalData.getBlock(reg_bnr);
138 theJobPriority[tJobCounter] = (Uint8)tHighPrio;
139 globalData.JobCounter = (tJobCounter + 1) & 4095;
140 globalData.JobLap = tJobLap + 1;
141
142 #ifdef VM_TRACE_TIME
143 const NDB_TICKS t1 = NdbTick_getCurrentTicks();
144 b->m_currentGsn = reg_gsn;
145 #endif
146
147 #ifdef VM_TRACE
148 {
149 if (globalData.testOn) {
150 signal->header.theVerId_signalNumber = reg_gsn;
151 signal->header.theReceiversBlockNumber = reg_bnr;
152
153 globalSignalLoggers.executeSignal(signal->header,
154 tHighPrio,
155 &signal->theData[0],
156 globalData.ownId);
157 }//if
158 }
159 #endif
160 b->jamBuffer()->markEndOfSigExec();
161 b->executeFunction_async(reg_gsn, signal);
162 #ifdef VM_TRACE_TIME
163 const NDB_TICKS t2 = NdbTick_getCurrentTicks();
164 const Uint64 diff = NdbTick_Elapsed(t1,t2).microSec();
165 b->addTime(reg_gsn, diff);
166 #endif
167 tHighPrio = globalData.highestAvailablePrio;
168 } else {
169 tHighPrio++;
170 globalData.highestAvailablePrio = tHighPrio;
171 }//if
172 loopCount++;
173 loopStartCount++;
174 }//while
175 sendPacked();
176 tHighPrio = globalData.highestAvailablePrio;
177 if(getBOccupancy() > MAX_OCCUPANCY)
178 {
179 if(loopCount != TloopMax)
180 abort();
181 assert( loopCount == TloopMax );
182 TloopMax += 512;
183 }
184 } while ((getBOccupancy() > MAX_OCCUPANCY) ||
185 ((loopCount < TloopMax) &&
186 (tHighPrio < LEVEL_IDLE)));
187
188 theDoJobCallCounter ++;
189 theDoJobTotalCounter += loopCount;
190 if (theDoJobCallCounter == 8192) {
191 reportDoJobStatistics(theDoJobTotalCounter >> 13);
192 theDoJobCallCounter = 0;
193 theDoJobTotalCounter = 0;
194 }//if
195 return loopStartCount;
196 }//FastScheduler::doJob()
197
198 void
postPoll()199 FastScheduler::postPoll()
200 {
201 Signal * signal = getVMSignals();
202 SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
203 b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
204 }
205
sendPacked()206 void FastScheduler::sendPacked()
207 {
208 if (globalData.sendPackedActivated == 1) {
209 SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
210 SimulatedBlock* b_tc = globalData.getBlock(DBTC);
211 SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
212 SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
213 Signal * signal = getVMSignals();
214 b_lqh->executeFunction_async(GSN_SEND_PACKED, signal);
215 b_tc->executeFunction_async(GSN_SEND_PACKED, signal);
216 b_tup->executeFunction_async(GSN_SEND_PACKED, signal);
217 b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
218 return;
219 } else if (globalData.activateSendPacked == 0) {
220 return;
221 } else {
222 activateSendPacked();
223 }//if
224 return;
225 }//FastScheduler::sendPacked()
226
227 Uint32
retrieve(Signal * signal)228 APZJobBuffer::retrieve(Signal* signal)
229 {
230 Uint32 tOccupancy = theOccupancy;
231 Uint32 myRPtr = rPtr;
232 BufferEntry& buf = buffer[myRPtr];
233 Uint32 gsnbnr;
234 Uint32 cond = (++myRPtr == bufSize) - 1;
235 Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
236
237 if (tOccupancy != 0) {
238 if (tRecBlockNo != 0) {
239 // Transform protocol to signal.
240 rPtr = myRPtr & cond;
241 theOccupancy = tOccupancy - 1;
242 gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
243
244 Uint32 tSignalId = globalData.theSignalId;
245 Uint32 tLength = buf.header.theLength;
246 Uint32 tFirstData = buf.theDataRegister[0];
247 signal->header = buf.header;
248
249 // Recall our signal Id for restart purposes
250 buf.header.theSignalId = tSignalId;
251 globalData.theSignalId = tSignalId + 1;
252
253 Uint32* tDataRegPtr = &buf.theDataRegister[0];
254 Uint32* tSigDataPtr = signal->getDataPtrSend();
255 *tSigDataPtr = tFirstData;
256 tDataRegPtr++;
257 tSigDataPtr++;
258 Uint32 tLengthCopied = 1;
259 while (tLengthCopied < tLength) {
260 Uint32 tData0 = tDataRegPtr[0];
261 Uint32 tData1 = tDataRegPtr[1];
262 Uint32 tData2 = tDataRegPtr[2];
263 Uint32 tData3 = tDataRegPtr[3];
264
265 tDataRegPtr += 4;
266 tLengthCopied += 4;
267
268 tSigDataPtr[0] = tData0;
269 tSigDataPtr[1] = tData1;
270 tSigDataPtr[2] = tData2;
271 tSigDataPtr[3] = tData3;
272 tSigDataPtr += 4;
273 }//while
274
275 tSigDataPtr = signal->m_sectionPtrI;
276 tDataRegPtr = buf.theDataRegister + buf.header.theLength;
277 Uint32 ptr0 = * tDataRegPtr ++;
278 Uint32 ptr1 = * tDataRegPtr ++;
279 Uint32 ptr2 = * tDataRegPtr ++;
280 * tSigDataPtr ++ = ptr0;
281 * tSigDataPtr ++ = ptr1;
282 * tSigDataPtr ++ = ptr2;
283
284 //---------------------------------------------------------
285 // Prefetch of buffer[rPtr] is done here. We prefetch for
286 // read both the first cache line and the next 64 byte
287 // entry
288 //---------------------------------------------------------
289 NDB_PREFETCH_READ((void*)&buffer[rPtr]);
290 NDB_PREFETCH_READ((void*)(((char*)&buffer[rPtr]) + 64));
291 return gsnbnr;
292 } else {
293 bnr_error();
294 return 0; // Will never come here, simply to keep GCC happy.
295 }//if
296 } else {
297 //------------------------------------------------------------
298 // The Job Buffer was empty, signal this by return zero.
299 //------------------------------------------------------------
300 return 0;
301 }//if
302 }//APZJobBuffer::retrieve()
303
304 void
signal2buffer(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,BufferEntry & buf)305 APZJobBuffer::signal2buffer(Signal* signal,
306 BlockNumber bnr, GlobalSignalNumber gsn,
307 BufferEntry& buf)
308 {
309 Uint32 tSignalId = globalData.theSignalId;
310 Uint32 tFirstData = signal->theData[0];
311 Uint32 tLength = signal->header.theLength + signal->header.m_noOfSections;
312 Uint32 tSigId = buf.header.theSignalId;
313
314 buf.header = signal->header;
315 buf.header.theVerId_signalNumber = gsn;
316 buf.header.theReceiversBlockNumber = bnr;
317 buf.header.theSendersSignalId = tSignalId - 1;
318 buf.header.theSignalId = tSigId;
319 buf.theDataRegister[0] = tFirstData;
320
321 Uint32 tLengthCopied = 1;
322 Uint32* tSigDataPtr = &signal->theData[1];
323 Uint32* tDataRegPtr = &buf.theDataRegister[1];
324 while (tLengthCopied < tLength) {
325 Uint32 tData0 = tSigDataPtr[0];
326 Uint32 tData1 = tSigDataPtr[1];
327 Uint32 tData2 = tSigDataPtr[2];
328 Uint32 tData3 = tSigDataPtr[3];
329
330 tLengthCopied += 4;
331 tSigDataPtr += 4;
332
333 tDataRegPtr[0] = tData0;
334 tDataRegPtr[1] = tData1;
335 tDataRegPtr[2] = tData2;
336 tDataRegPtr[3] = tData3;
337 tDataRegPtr += 4;
338 }//while
339 }//APZJobBuffer::signal2buffer()
340
341 void
insert(const SignalHeader * const sh,const Uint32 * const theData,const Uint32 secPtrI[3])342 APZJobBuffer::insert(const SignalHeader * const sh,
343 const Uint32 * const theData, const Uint32 secPtrI[3]){
344 Uint32 tOccupancy = theOccupancy + 1;
345 Uint32 myWPtr = wPtr;
346 register BufferEntry& buf = buffer[myWPtr];
347
348 if (tOccupancy < bufSize) {
349 Uint32 cond = (++myWPtr == bufSize) - 1;
350 wPtr = myWPtr & cond;
351 theOccupancy = tOccupancy;
352
353 buf.header = * sh;
354 const Uint32 len = buf.header.theLength;
355 memcpy(buf.theDataRegister, theData, 4 * len);
356 memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
357 //---------------------------------------------------------
358 // Prefetch of buffer[wPtr] is done here. We prefetch for
359 // write both the first cache line and the next 64 byte
360 // entry
361 //---------------------------------------------------------
362 NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
363 NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
364 } else {
365 jbuf_error();
366 }//if
367 }
APZJobBuffer()368 APZJobBuffer::APZJobBuffer()
369 : bufSize(0), buffer(NULL), memRef(NULL)
370 {
371 clear();
372 }
373
~APZJobBuffer()374 APZJobBuffer::~APZJobBuffer()
375 {
376 delete [] buffer;
377 }
378
379 void
newBuffer(int size)380 APZJobBuffer::newBuffer(int size)
381 {
382 buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
383 if(buffer){
384 #ifndef NDB_PURIFY
385 ::memset(buffer, 0, (size * sizeof(BufferEntry)));
386 #endif
387 bufSize = size;
388 } else
389 bufSize = 0;
390 }
391
392 void
clear()393 APZJobBuffer::clear()
394 {
395 rPtr = 0;
396 wPtr = 0;
397 theOccupancy = 0;
398 }
399
400 /**
401 * Function prototype for print_restart
402 *
403 * Defined later in this file
404 */
405 void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
406
dumpSignalMemory(Uint32 thr_no,FILE * output)407 void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE * output)
408 {
409 SignalT<25> signalT;
410 Signal * signal = new (&signalT) Signal(0);
411 Uint32 ReadPtr[5];
412 Uint32 tJob;
413 Uint32 tLastJob;
414
415 /* Single threaded ndbd scheduler, no threads. */
416 assert(thr_no == 0);
417
418 fprintf(output, "\n");
419
420 if (globalData.JobLap > 4095) {
421 if (globalData.JobCounter != 0)
422 tJob = globalData.JobCounter - 1;
423 else
424 tJob = 4095;
425 tLastJob = globalData.JobCounter;
426 } else {
427 if (globalData.JobCounter == 0)
428 return; // No signals sent
429 else {
430 tJob = globalData.JobCounter - 1;
431 tLastJob = 4095;
432 }
433 }
434 ReadPtr[0] = theJobBuffers[0].getReadPtr();
435 ReadPtr[1] = theJobBuffers[1].getReadPtr();
436 ReadPtr[2] = theJobBuffers[2].getReadPtr();
437 ReadPtr[3] = theJobBuffers[3].getReadPtr();
438
439 do {
440 unsigned char tLevel = theJobPriority[tJob];
441 globalData.incrementWatchDogCounter(4);
442 if (ReadPtr[tLevel] == 0)
443 ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
444 else
445 ReadPtr[tLevel]--;
446
447 theJobBuffers[tLevel].retrieveDump(signal, ReadPtr[tLevel]);
448 // strip instance bits since this in non-MT code
449 signal->header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
450 print_restart(output, signal, tLevel);
451
452 if (tJob == 0)
453 tJob = 4095;
454 else
455 tJob--;
456
457 } while (tJob != tLastJob);
458 fflush(output);
459 }
460
461 void
prio_level_error()462 FastScheduler::prio_level_error()
463 {
464 ERROR_SET(ecError, NDBD_EXIT_WRONG_PRIO_LEVEL,
465 "Wrong Priority Level", "FastScheduler.C");
466 }
467
468 void
jbuf_error()469 jbuf_error()
470 {
471 ERROR_SET(ecError, NDBD_EXIT_BLOCK_JBUFCONGESTION,
472 "Job Buffer Full", "APZJobBuffer.C");
473 }
474
475 void
bnr_error()476 bnr_error()
477 {
478 ERROR_SET(ecError, NDBD_EXIT_BLOCK_BNR_ZERO,
479 "Block Number Zero", "FastScheduler.C");
480 }
481
482 void
print_restart(FILE * output,Signal * signal,Uint32 aLevel)483 print_restart(FILE * output, Signal* signal, Uint32 aLevel)
484 {
485 fprintf(output, "--------------- Signal ----------------\n");
486 SignalLoggerManager::printSignalHeader(output,
487 signal->header,
488 aLevel,
489 globalData.ownId,
490 true);
491 SignalLoggerManager::printSignalData (output,
492 signal->header,
493 &signal->theData[0]);
494 }
495
496 void
traceDumpPrepare(NdbShutdownType &)497 FastScheduler::traceDumpPrepare(NdbShutdownType&)
498 {
499 /* No-operation in single-threaded ndbd. */
500 }
501
502 Uint32
traceDumpGetNumThreads()503 FastScheduler::traceDumpGetNumThreads()
504 {
505 return 1; // Single-threaded ndbd scheduler
506 }
507
508 int
traceDumpGetCurrentThread()509 FastScheduler::traceDumpGetCurrentThread()
510 {
511 return -1; // Single-threaded ndbd scheduler
512 }
513
514 bool
traceDumpGetJam(Uint32 thr_no,const JamEvent * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)515 FastScheduler::traceDumpGetJam(Uint32 thr_no,
516 const JamEvent * & thrdTheEmulatedJam,
517 Uint32 & thrdTheEmulatedJamIndex)
518 {
519 /* Single threaded ndbd scheduler, no threads. */
520 assert(thr_no == 0);
521
522 #ifdef NO_EMULATED_JAM
523 thrdTheEmulatedJam = NULL;
524 thrdTheEmulatedJamIndex = 0;
525 #else
526 const EmulatedJamBuffer *jamBuffer =
527 (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
528 thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
529 thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
530 #endif
531 return true;
532 }
533
534
535 /**
536 * This method used to be a Cmvmi member function
537 * but is now a "ordinary" function"
538 *
539 * See TransporterCallback.cpp for explanation
540 */
541 void
reportDoJobStatistics(Uint32 tMeanLoopCount)542 FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
543 SignalT<2> signal;
544
545 memset(&signal.header, 0, sizeof(signal.header));
546 signal.header.theLength = 2;
547 signal.header.theSendersSignalId = 0;
548 signal.header.theSendersBlockRef = numberToRef(0, 0);
549 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
550 signal.header.theReceiversBlockNumber = CMVMI;
551
552 signal.theData[0] = NDB_LE_JobStatistic;
553 signal.theData[1] = tMeanLoopCount;
554
555 Uint32 secPtr[3];
556 execute(&signal.header, JBA, signal.theData, secPtr);
557 }
558
559 void
reportThreadConfigLoop(Uint32 expired_time,Uint32 extra_constant,Uint32 * no_exec_loops,Uint32 * tot_exec_time,Uint32 * no_extra_loops,Uint32 * tot_extra_time)560 FastScheduler::reportThreadConfigLoop(Uint32 expired_time,
561 Uint32 extra_constant,
562 Uint32 *no_exec_loops,
563 Uint32 *tot_exec_time,
564 Uint32 *no_extra_loops,
565 Uint32 *tot_extra_time)
566 {
567 SignalT<6> signal;
568
569 memset(&signal.header, 0, sizeof(signal.header));
570 signal.header.theLength = 6;
571 signal.header.theSendersSignalId = 0;
572 signal.header.theSendersBlockRef = numberToRef(0, 0);
573 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
574 signal.header.theReceiversBlockNumber = CMVMI;
575
576 signal.theData[0] = NDB_LE_ThreadConfigLoop;
577 signal.theData[1] = expired_time;
578 signal.theData[2] = extra_constant;
579 signal.theData[3] = (*tot_exec_time)/(*no_exec_loops);
580 signal.theData[4] = *no_extra_loops;
581 if (*no_extra_loops > 0)
582 signal.theData[5] = (*tot_extra_time)/(*no_extra_loops);
583 else
584 signal.theData[5] = 0;
585
586 *no_exec_loops = 0;
587 *tot_exec_time = 0;
588 *no_extra_loops = 0;
589 *tot_extra_time = 0;
590
591 Uint32 secPtr[3];
592 execute(&signal.header, JBA, signal.theData, secPtr);
593 }
594
595 static NdbMutex g_mm_mutex;
596
597 void
mt_mem_manager_init()598 mt_mem_manager_init()
599 {
600 NdbMutex_Init(&g_mm_mutex);
601 }
602
603 void
mt_mem_manager_lock()604 mt_mem_manager_lock()
605 {
606 NdbMutex_Lock(&g_mm_mutex);
607 }
608
609 void
mt_mem_manager_unlock()610 mt_mem_manager_unlock()
611 {
612 NdbMutex_Unlock(&g_mm_mutex);
613 }
614