1 /*
2 Copyright (c) 2003, 2018, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "FastScheduler.hpp"
26 #include "ThreadConfig.hpp"
27 #include "RefConvert.hpp"
28
29 #include "Emulator.hpp"
30 #include "VMSignal.hpp"
31
32 #include <SignalLoggerManager.hpp>
33 #include <BlockNumbers.h>
34 #include <GlobalSignalNumbers.h>
35 #include <signaldata/EventReport.hpp>
36 #include "LongSignal.hpp"
37 #include <NdbTick.h>
38
39 #define JAM_FILE_ID 242
40
41
42 #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
43 #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
44 #define EXTRA_SIGNALS_PER_DO_JOB 32
45
FastScheduler()46 FastScheduler::FastScheduler()
47 {
48 // These constants work for sun only, but they should be initated from
49 // Emulator.C as soon as VMTime has been initiated.
50 theJobBuffers[0].newBuffer(JBASIZE);
51 theJobBuffers[1].newBuffer(JBBSIZE);
52 theJobBuffers[2].newBuffer(JBCSIZE);
53 theJobBuffers[3].newBuffer(JBDSIZE);
54 clear();
55 }
56
~FastScheduler()57 FastScheduler::~FastScheduler()
58 {
59 }
60
61 void
clear()62 FastScheduler::clear()
63 {
64 int i;
65 // Make sure the restart signals are not sent too early
66 // the prio is set back in 'main' using the 'ready' method.
67 globalData.highestAvailablePrio = LEVEL_IDLE;
68 globalData.sendPackedActivated = 0;
69 globalData.activateSendPacked = 0;
70 for (i = 0; i < JB_LEVELS; i++){
71 theJobBuffers[i].clear();
72 }
73 globalData.JobCounter = 0;
74 globalData.JobLap = 0;
75 globalData.loopMax = 32;
76 globalData.VMSignals[0].header.theSignalId = 0;
77
78 theDoJobTotalCounter = 0;
79 theDoJobCallCounter = 0;
80 }
81
82 void
activateSendPacked()83 FastScheduler::activateSendPacked()
84 {
85 globalData.sendPackedActivated = 1;
86 globalData.activateSendPacked = 0;
87 globalData.loopMax = 2048;
88 }//FastScheduler::activateSendPacked()
89
90 //------------------------------------------------------------------------
91 // sendPacked is executed at the end of the loop.
92 // To ensure that we don't send any messages before executing all local
93 // packed signals we do another turn in the loop (unless we have already
94 // executed too many signals in the loop).
95 //------------------------------------------------------------------------
96 Uint32
doJob(Uint32 loopStartCount)97 FastScheduler::doJob(Uint32 loopStartCount)
98 {
99 Uint32 loopCount = 0;
100 Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
101 Uint32 TloopMax = (Uint32)globalData.loopMax;
102 if (TminLoops < TloopMax) {
103 TloopMax = TminLoops;
104 }//if
105 if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
106 TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
107 }//if
108 Signal* signal = getVMSignals();
109 Uint32 tHighPrio= globalData.highestAvailablePrio;
110 do{
111 while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
112 #ifdef VM_TRACE
113 /* Find reading / propagation of junk */
114 signal->garbage_register();
115 #endif
116 if (unlikely(loopStartCount >
117 MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN))
118 {
119 /**
120 * This implements the bounded delay signal concept. This
121 * means that we will never execute more than 160 signals
122 * before getting the signals with delay 0 put into the
123 * A-level job buffer.
124 */
125 loopStartCount = 0;
126 globalEmulatorData.theThreadConfig->scanZeroTimeQueue();
127 }
128 // To ensure we find bugs quickly
129 Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
130 // also strip any instance bits since this is non-MT code
131 BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
132 GlobalSignalNumber reg_gsn = gsnbnr >> 16;
133 globalData.incrementWatchDogCounter(1);
134 if (reg_bnr > 0) {
135 Uint32 tJobCounter = globalData.JobCounter;
136 Uint64 tJobLap = globalData.JobLap;
137 SimulatedBlock* b = globalData.getBlock(reg_bnr);
138 theJobPriority[tJobCounter] = (Uint8)tHighPrio;
139 globalData.JobCounter = (tJobCounter + 1) & 4095;
140 globalData.JobLap = tJobLap + 1;
141
142 #ifdef VM_TRACE_TIME
143 const NDB_TICKS t1 = NdbTick_getCurrentTicks();
144 b->m_currentGsn = reg_gsn;
145 #endif
146
147 #ifdef VM_TRACE
148 {
149 if (globalData.testOn) {
150 signal->header.theVerId_signalNumber = reg_gsn;
151 signal->header.theReceiversBlockNumber = reg_bnr;
152
153 globalSignalLoggers.executeSignal(signal->header,
154 tHighPrio,
155 &signal->theData[0],
156 globalData.ownId);
157 }//if
158 }
159 #endif
160 b->jamBuffer()->markEndOfSigExec();
161 b->executeFunction_async(reg_gsn, signal);
162 #ifdef VM_TRACE_TIME
163 const NDB_TICKS t2 = NdbTick_getCurrentTicks();
164 const Uint64 diff = NdbTick_Elapsed(t1,t2).microSec();
165 b->addTime(reg_gsn, diff);
166 #endif
167 tHighPrio = globalData.highestAvailablePrio;
168 } else {
169 tHighPrio++;
170 globalData.highestAvailablePrio = tHighPrio;
171 }//if
172 loopCount++;
173 loopStartCount++;
174 }//while
175 sendPacked();
176 tHighPrio = globalData.highestAvailablePrio;
177 if(getBOccupancy() > MAX_OCCUPANCY)
178 {
179 if(loopCount != TloopMax)
180 abort();
181 assert( loopCount == TloopMax );
182 TloopMax += 512;
183 }
184 } while ((getBOccupancy() > MAX_OCCUPANCY) ||
185 ((loopCount < TloopMax) &&
186 (tHighPrio < LEVEL_IDLE)));
187
188 theDoJobCallCounter ++;
189 theDoJobTotalCounter += loopCount;
190 if (theDoJobCallCounter == 8192) {
191 reportDoJobStatistics(theDoJobTotalCounter >> 13);
192 theDoJobCallCounter = 0;
193 theDoJobTotalCounter = 0;
194 }//if
195 return loopStartCount;
196 }//FastScheduler::doJob()
197
198 void
postPoll()199 FastScheduler::postPoll()
200 {
201 Signal * signal = getVMSignals();
202 SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
203 b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
204 }
205
sendPacked()206 void FastScheduler::sendPacked()
207 {
208 if (globalData.sendPackedActivated == 1) {
209 SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
210 SimulatedBlock* b_tc = globalData.getBlock(DBTC);
211 SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
212 SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
213 Signal * signal = getVMSignals();
214 b_lqh->executeFunction_async(GSN_SEND_PACKED, signal);
215 b_tc->executeFunction_async(GSN_SEND_PACKED, signal);
216 b_tup->executeFunction_async(GSN_SEND_PACKED, signal);
217 b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
218 return;
219 } else if (globalData.activateSendPacked == 0) {
220 return;
221 } else {
222 activateSendPacked();
223 }//if
224 return;
225 }//FastScheduler::sendPacked()
226
227 Uint32
retrieve(Signal * signal)228 APZJobBuffer::retrieve(Signal* signal)
229 {
230 Uint32 tOccupancy = theOccupancy;
231 Uint32 myRPtr = rPtr;
232 BufferEntry& buf = buffer[myRPtr];
233 Uint32 gsnbnr;
234 Uint32 cond = (++myRPtr == bufSize) - 1;
235 Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
236
237 if (tOccupancy != 0) {
238 if (tRecBlockNo != 0) {
239 // Transform protocol to signal.
240 rPtr = myRPtr & cond;
241 theOccupancy = tOccupancy - 1;
242 gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
243
244 Uint32 tSignalId = globalData.theSignalId;
245 Uint32 tLength = buf.header.theLength;
246 Uint32 tFirstData = buf.theDataRegister[0];
247 signal->header = buf.header;
248
249 // Recall our signal Id for restart purposes
250 buf.header.theSignalId = tSignalId;
251 globalData.theSignalId = tSignalId + 1;
252
253 Uint32* tDataRegPtr = &buf.theDataRegister[0];
254 Uint32* tSigDataPtr = signal->getDataPtrSend();
255 *tSigDataPtr = tFirstData;
256 tDataRegPtr++;
257 tSigDataPtr++;
258 Uint32 tLengthCopied = 1;
259 while (tLengthCopied < tLength) {
260 Uint32 tData0 = tDataRegPtr[0];
261 Uint32 tData1 = tDataRegPtr[1];
262 Uint32 tData2 = tDataRegPtr[2];
263 Uint32 tData3 = tDataRegPtr[3];
264
265 tDataRegPtr += 4;
266 tLengthCopied += 4;
267
268 tSigDataPtr[0] = tData0;
269 tSigDataPtr[1] = tData1;
270 tSigDataPtr[2] = tData2;
271 tSigDataPtr[3] = tData3;
272 tSigDataPtr += 4;
273 }//while
274
275 tSigDataPtr = signal->m_sectionPtrI;
276 tDataRegPtr = buf.theDataRegister + buf.header.theLength;
277 Uint32 ptr0 = * tDataRegPtr ++;
278 Uint32 ptr1 = * tDataRegPtr ++;
279 Uint32 ptr2 = * tDataRegPtr ++;
280 * tSigDataPtr ++ = ptr0;
281 * tSigDataPtr ++ = ptr1;
282 * tSigDataPtr ++ = ptr2;
283
284 //---------------------------------------------------------
285 // Prefetch of buffer[rPtr] is done here. We prefetch for
286 // read both the first cache line and the next 64 byte
287 // entry
288 //---------------------------------------------------------
289 NDB_PREFETCH_READ((void*)&buffer[rPtr]);
290 NDB_PREFETCH_READ((void*)(((char*)&buffer[rPtr]) + 64));
291 return gsnbnr;
292 } else {
293 bnr_error();
294 return 0; // Will never come here, simply to keep GCC happy.
295 }//if
296 } else {
297 //------------------------------------------------------------
298 // The Job Buffer was empty, signal this by return zero.
299 //------------------------------------------------------------
300 return 0;
301 }//if
302 }//APZJobBuffer::retrieve()
303
304 void
signal2buffer(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,BufferEntry & buf)305 APZJobBuffer::signal2buffer(Signal* signal,
306 BlockNumber bnr, GlobalSignalNumber gsn,
307 BufferEntry& buf)
308 {
309 Uint32 tSignalId = globalData.theSignalId;
310 Uint32 tLength = signal->header.theLength + signal->header.m_noOfSections;
311 Uint32 tSigId = buf.header.theSignalId;
312
313 buf.header = signal->header;
314 buf.header.theVerId_signalNumber = gsn;
315 buf.header.theReceiversBlockNumber = bnr;
316 buf.header.theSendersSignalId = tSignalId - 1;
317 buf.header.theSignalId = tSigId;
318
319 Uint32* tSigDataPtr = &signal->theData[0];
320 Uint32* tDataRegPtr = &buf.theDataRegister[0];
321
322 // TODO hint that data is aligned(?) to -4 bytes per 16 bytes for performance
323 memcpy(tDataRegPtr, tSigDataPtr, tLength * sizeof(Uint32));
324 }//APZJobBuffer::signal2buffer()
325
326 void
insert(const SignalHeader * const sh,const Uint32 * const theData,const Uint32 secPtrI[3])327 APZJobBuffer::insert(const SignalHeader * const sh,
328 const Uint32 * const theData, const Uint32 secPtrI[3]){
329 Uint32 tOccupancy = theOccupancy + 1;
330 Uint32 myWPtr = wPtr;
331 BufferEntry& buf = buffer[myWPtr];
332
333 if (tOccupancy < bufSize) {
334 Uint32 cond = (++myWPtr == bufSize) - 1;
335 wPtr = myWPtr & cond;
336 theOccupancy = tOccupancy;
337
338 buf.header = * sh;
339 const Uint32 len = buf.header.theLength;
340 memcpy(buf.theDataRegister, theData, 4 * len);
341 memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
342 //---------------------------------------------------------
343 // Prefetch of buffer[wPtr] is done here. We prefetch for
344 // write both the first cache line and the next 64 byte
345 // entry
346 //---------------------------------------------------------
347 NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
348 NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
349 } else {
350 jbuf_error();
351 }//if
352 }
APZJobBuffer()353 APZJobBuffer::APZJobBuffer()
354 : bufSize(0), buffer(NULL), memRef(NULL)
355 {
356 clear();
357 }
358
~APZJobBuffer()359 APZJobBuffer::~APZJobBuffer()
360 {
361 delete [] buffer;
362 }
363
364 void
newBuffer(int size)365 APZJobBuffer::newBuffer(int size)
366 {
367 buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
368 if(buffer){
369 #ifndef NDB_PURIFY
370 ::memset(buffer, 0, (size * sizeof(BufferEntry)));
371 #endif
372 bufSize = size;
373 } else
374 bufSize = 0;
375 }
376
377 void
clear()378 APZJobBuffer::clear()
379 {
380 rPtr = 0;
381 wPtr = 0;
382 theOccupancy = 0;
383 }
384
385 /**
386 * Function prototype for print_restart
387 *
388 * Defined later in this file
389 */
390 void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
391
dumpSignalMemory(Uint32 thr_no,FILE * output)392 void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE * output)
393 {
394 SignalT<25> signalT;
395 Signal * signal = new (&signalT) Signal(0);
396 Uint32 ReadPtr[5];
397 Uint32 tJob;
398 Uint32 tLastJob;
399
400 /* Single threaded ndbd scheduler, no threads. */
401 assert(thr_no == 0);
402
403 fprintf(output, "\n");
404
405 if (globalData.JobLap > 4095) {
406 if (globalData.JobCounter != 0)
407 tJob = globalData.JobCounter - 1;
408 else
409 tJob = 4095;
410 tLastJob = globalData.JobCounter;
411 } else {
412 if (globalData.JobCounter == 0)
413 return; // No signals sent
414 else {
415 tJob = globalData.JobCounter - 1;
416 tLastJob = 4095;
417 }
418 }
419 ReadPtr[0] = theJobBuffers[0].getReadPtr();
420 ReadPtr[1] = theJobBuffers[1].getReadPtr();
421 ReadPtr[2] = theJobBuffers[2].getReadPtr();
422 ReadPtr[3] = theJobBuffers[3].getReadPtr();
423
424 do {
425 unsigned char tLevel = theJobPriority[tJob];
426 globalData.incrementWatchDogCounter(4);
427 if (ReadPtr[tLevel] == 0)
428 ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
429 else
430 ReadPtr[tLevel]--;
431
432 theJobBuffers[tLevel].retrieveDump(signal, ReadPtr[tLevel]);
433 // strip instance bits since this in non-MT code
434 signal->header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
435 print_restart(output, signal, tLevel);
436
437 if (tJob == 0)
438 tJob = 4095;
439 else
440 tJob--;
441
442 } while (tJob != tLastJob);
443 fflush(output);
444 }
445
446 void
prio_level_error()447 FastScheduler::prio_level_error()
448 {
449 ERROR_SET(ecError, NDBD_EXIT_WRONG_PRIO_LEVEL,
450 "Wrong Priority Level", "FastScheduler.C");
451 }
452
453 void
jbuf_error()454 jbuf_error()
455 {
456 ERROR_SET(ecError, NDBD_EXIT_BLOCK_JBUFCONGESTION,
457 "Job Buffer Full", "APZJobBuffer.C");
458 }
459
460 void
bnr_error()461 bnr_error()
462 {
463 ERROR_SET(ecError, NDBD_EXIT_BLOCK_BNR_ZERO,
464 "Block Number Zero", "FastScheduler.C");
465 }
466
467 void
print_restart(FILE * output,Signal * signal,Uint32 aLevel)468 print_restart(FILE * output, Signal* signal, Uint32 aLevel)
469 {
470 fprintf(output, "--------------- Signal ----------------\n");
471 SignalLoggerManager::printSignalHeader(output,
472 signal->header,
473 aLevel,
474 globalData.ownId,
475 true);
476 SignalLoggerManager::printSignalData (output,
477 signal->header,
478 &signal->theData[0]);
479 }
480
481 void
traceDumpPrepare(NdbShutdownType &)482 FastScheduler::traceDumpPrepare(NdbShutdownType&)
483 {
484 /* No-operation in single-threaded ndbd. */
485 }
486
487 Uint32
traceDumpGetNumThreads()488 FastScheduler::traceDumpGetNumThreads()
489 {
490 return 1; // Single-threaded ndbd scheduler
491 }
492
493 int
traceDumpGetCurrentThread()494 FastScheduler::traceDumpGetCurrentThread()
495 {
496 return -1; // Single-threaded ndbd scheduler
497 }
498
499 bool
traceDumpGetJam(Uint32 thr_no,const JamEvent * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)500 FastScheduler::traceDumpGetJam(Uint32 thr_no,
501 const JamEvent * & thrdTheEmulatedJam,
502 Uint32 & thrdTheEmulatedJamIndex)
503 {
504 /* Single threaded ndbd scheduler, no threads. */
505 assert(thr_no == 0);
506
507 #ifdef NO_EMULATED_JAM
508 thrdTheEmulatedJam = NULL;
509 thrdTheEmulatedJamIndex = 0;
510 #else
511 const EmulatedJamBuffer *jamBuffer = NDB_THREAD_TLS_JAM;
512 thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
513 thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
514 #endif
515 return true;
516 }
517
518
519 /**
520 * This method used to be a Cmvmi member function
521 * but is now a "ordinary" function"
522 *
523 * See TransporterCallback.cpp for explanation
524 */
525 void
reportDoJobStatistics(Uint32 tMeanLoopCount)526 FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
527 SignalT<2> signal;
528
529 memset(&signal.header, 0, sizeof(signal.header));
530 signal.header.theLength = 2;
531 signal.header.theSendersSignalId = 0;
532 signal.header.theSendersBlockRef = numberToRef(0, 0);
533 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
534 signal.header.theReceiversBlockNumber = CMVMI;
535
536 signal.theData[0] = NDB_LE_JobStatistic;
537 signal.theData[1] = tMeanLoopCount;
538
539 Uint32 secPtr[3];
540 execute(&signal.header, JBA, signal.theData, secPtr);
541 }
542
543 void
reportThreadConfigLoop(Uint32 expired_time,Uint32 extra_constant,Uint32 * no_exec_loops,Uint32 * tot_exec_time,Uint32 * no_extra_loops,Uint32 * tot_extra_time)544 FastScheduler::reportThreadConfigLoop(Uint32 expired_time,
545 Uint32 extra_constant,
546 Uint32 *no_exec_loops,
547 Uint32 *tot_exec_time,
548 Uint32 *no_extra_loops,
549 Uint32 *tot_extra_time)
550 {
551 SignalT<6> signal;
552
553 memset(&signal.header, 0, sizeof(signal.header));
554 signal.header.theLength = 6;
555 signal.header.theSendersSignalId = 0;
556 signal.header.theSendersBlockRef = numberToRef(0, 0);
557 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
558 signal.header.theReceiversBlockNumber = CMVMI;
559
560 signal.theData[0] = NDB_LE_ThreadConfigLoop;
561 signal.theData[1] = expired_time;
562 signal.theData[2] = extra_constant;
563 signal.theData[3] = (*tot_exec_time)/(*no_exec_loops);
564 signal.theData[4] = *no_extra_loops;
565 if (*no_extra_loops > 0)
566 signal.theData[5] = (*tot_extra_time)/(*no_extra_loops);
567 else
568 signal.theData[5] = 0;
569
570 *no_exec_loops = 0;
571 *tot_exec_time = 0;
572 *no_extra_loops = 0;
573 *tot_extra_time = 0;
574
575 Uint32 secPtr[3];
576 execute(&signal.header, JBA, signal.theData, secPtr);
577 }
578
579 static NdbMutex g_mm_mutex;
580
581 void
mt_mem_manager_init()582 mt_mem_manager_init()
583 {
584 NdbMutex_Init(&g_mm_mutex);
585 }
586
587 void
mt_mem_manager_lock()588 mt_mem_manager_lock()
589 {
590 NdbMutex_Lock(&g_mm_mutex);
591 }
592
593 void
mt_mem_manager_unlock()594 mt_mem_manager_unlock()
595 {
596 NdbMutex_Unlock(&g_mm_mutex);
597 }
598