1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "FastScheduler.hpp"
26 #include "ThreadConfig.hpp"
27 #include "RefConvert.hpp"
28 
29 #include "Emulator.hpp"
30 #include "VMSignal.hpp"
31 
32 #include <SignalLoggerManager.hpp>
33 #include <BlockNumbers.h>
34 #include <GlobalSignalNumbers.h>
35 #include <signaldata/EventReport.hpp>
36 #include "LongSignal.hpp"
37 #include <NdbTick.h>
38 
39 #define JAM_FILE_ID 242
40 
41 
42 #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
43 #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
44 #define EXTRA_SIGNALS_PER_DO_JOB 32
45 
FastScheduler()46 FastScheduler::FastScheduler()
47 {
48    // These constants work for sun only, but they should be initated from
49    // Emulator.C as soon as VMTime has been initiated.
50    theJobBuffers[0].newBuffer(JBASIZE);
51    theJobBuffers[1].newBuffer(JBBSIZE);
52    theJobBuffers[2].newBuffer(JBCSIZE);
53    theJobBuffers[3].newBuffer(JBDSIZE);
54    clear();
55 }
56 
~FastScheduler()57 FastScheduler::~FastScheduler()
58 {
59 }
60 
61 void
clear()62 FastScheduler::clear()
63 {
64   int i;
65   // Make sure the restart signals are not sent too early
66   // the prio is set back in 'main' using the 'ready' method.
67   globalData.highestAvailablePrio = LEVEL_IDLE;
68   globalData.sendPackedActivated = 0;
69   globalData.activateSendPacked = 0;
70   for (i = 0; i < JB_LEVELS; i++){
71     theJobBuffers[i].clear();
72   }
73   globalData.JobCounter = 0;
74   globalData.JobLap = 0;
75   globalData.loopMax = 32;
76   globalData.VMSignals[0].header.theSignalId = 0;
77 
78   theDoJobTotalCounter = 0;
79   theDoJobCallCounter = 0;
80 }
81 
82 void
activateSendPacked()83 FastScheduler::activateSendPacked()
84 {
85   globalData.sendPackedActivated = 1;
86   globalData.activateSendPacked = 0;
87   globalData.loopMax = 2048;
88 }//FastScheduler::activateSendPacked()
89 
90 //------------------------------------------------------------------------
91 // sendPacked is executed at the end of the loop.
92 // To ensure that we don't send any messages before executing all local
93 // packed signals we do another turn in the loop (unless we have already
94 // executed too many signals in the loop).
95 //------------------------------------------------------------------------
96 Uint32
doJob(Uint32 loopStartCount)97 FastScheduler::doJob(Uint32 loopStartCount)
98 {
99   Uint32 loopCount = 0;
100   Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
101   Uint32 TloopMax = (Uint32)globalData.loopMax;
102   if (TminLoops < TloopMax) {
103     TloopMax = TminLoops;
104   }//if
105   if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
106     TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
107   }//if
108   register Signal* signal = getVMSignals();
109   register Uint32 tHighPrio= globalData.highestAvailablePrio;
110   do{
111     while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
112 #ifdef VM_TRACE
113       /* Find reading / propagation of junk */
114       signal->garbage_register();
115 #endif
116       if (unlikely(loopStartCount >
117           MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN))
118       {
119         /**
120          * This implements the bounded delay signal concept. This
121          * means that we will never execute more than 160 signals
122          * before getting the signals with delay 0 put into the
123          * A-level job buffer.
124          */
125         loopStartCount = 0;
126         globalEmulatorData.theThreadConfig->scanZeroTimeQueue();
127       }
128       // To ensure we find bugs quickly
129       register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
130       // also strip any instance bits since this is non-MT code
131       register BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
132       register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
133       globalData.incrementWatchDogCounter(1);
134       if (reg_bnr > 0) {
135         Uint32 tJobCounter = globalData.JobCounter;
136         Uint64 tJobLap = globalData.JobLap;
137         SimulatedBlock* b = globalData.getBlock(reg_bnr);
138         theJobPriority[tJobCounter] = (Uint8)tHighPrio;
139         globalData.JobCounter = (tJobCounter + 1) & 4095;
140         globalData.JobLap = tJobLap + 1;
141 
142 #ifdef VM_TRACE_TIME
143 	const NDB_TICKS t1 = NdbTick_getCurrentTicks();
144 	b->m_currentGsn = reg_gsn;
145 #endif
146 
147 #ifdef VM_TRACE
148         {
149           if (globalData.testOn) {
150 	    signal->header.theVerId_signalNumber = reg_gsn;
151 	    signal->header.theReceiversBlockNumber = reg_bnr;
152 
153             globalSignalLoggers.executeSignal(signal->header,
154 					      tHighPrio,
155 					      &signal->theData[0],
156 					      globalData.ownId);
157           }//if
158         }
159 #endif
160         b->jamBuffer()->markEndOfSigExec();
161         b->executeFunction_async(reg_gsn, signal);
162 #ifdef VM_TRACE_TIME
163 	const NDB_TICKS t2 = NdbTick_getCurrentTicks();
164         const Uint64 diff = NdbTick_Elapsed(t1,t2).microSec();
165 	b->addTime(reg_gsn, diff);
166 #endif
167         tHighPrio = globalData.highestAvailablePrio;
168       } else {
169         tHighPrio++;
170         globalData.highestAvailablePrio = tHighPrio;
171       }//if
172       loopCount++;
173       loopStartCount++;
174     }//while
175     sendPacked();
176     tHighPrio = globalData.highestAvailablePrio;
177     if(getBOccupancy() > MAX_OCCUPANCY)
178     {
179       if(loopCount != TloopMax)
180 	abort();
181       assert( loopCount == TloopMax );
182       TloopMax += 512;
183     }
184   } while ((getBOccupancy() > MAX_OCCUPANCY) ||
185            ((loopCount < TloopMax) &&
186             (tHighPrio < LEVEL_IDLE)));
187 
188   theDoJobCallCounter ++;
189   theDoJobTotalCounter += loopCount;
190   if (theDoJobCallCounter == 8192) {
191     reportDoJobStatistics(theDoJobTotalCounter >> 13);
192     theDoJobCallCounter = 0;
193     theDoJobTotalCounter = 0;
194   }//if
195   return loopStartCount;
196 }//FastScheduler::doJob()
197 
198 void
postPoll()199 FastScheduler::postPoll()
200 {
201   Signal * signal = getVMSignals();
202   SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
203   b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
204 }
205 
sendPacked()206 void FastScheduler::sendPacked()
207 {
208   if (globalData.sendPackedActivated == 1) {
209     SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
210     SimulatedBlock* b_tc = globalData.getBlock(DBTC);
211     SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
212     SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
213     Signal * signal = getVMSignals();
214     b_lqh->executeFunction_async(GSN_SEND_PACKED, signal);
215     b_tc->executeFunction_async(GSN_SEND_PACKED, signal);
216     b_tup->executeFunction_async(GSN_SEND_PACKED, signal);
217     b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
218     return;
219   } else if (globalData.activateSendPacked == 0) {
220     return;
221   } else {
222     activateSendPacked();
223   }//if
224   return;
225 }//FastScheduler::sendPacked()
226 
227 Uint32
retrieve(Signal * signal)228 APZJobBuffer::retrieve(Signal* signal)
229 {
230   Uint32 tOccupancy = theOccupancy;
231   Uint32 myRPtr = rPtr;
232   BufferEntry& buf = buffer[myRPtr];
233   Uint32 gsnbnr;
234   Uint32 cond =  (++myRPtr == bufSize) - 1;
235   Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
236 
237   if (tOccupancy != 0) {
238     if (tRecBlockNo != 0) {
239       // Transform protocol to signal.
240       rPtr = myRPtr & cond;
241       theOccupancy = tOccupancy - 1;
242       gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
243 
244       Uint32 tSignalId = globalData.theSignalId;
245       Uint32 tLength = buf.header.theLength;
246       Uint32 tFirstData = buf.theDataRegister[0];
247       signal->header = buf.header;
248 
249       // Recall our signal Id for restart purposes
250       buf.header.theSignalId = tSignalId;
251       globalData.theSignalId = tSignalId + 1;
252 
253       Uint32* tDataRegPtr = &buf.theDataRegister[0];
254       Uint32* tSigDataPtr = signal->getDataPtrSend();
255       *tSigDataPtr = tFirstData;
256       tDataRegPtr++;
257       tSigDataPtr++;
258       Uint32  tLengthCopied = 1;
259       while (tLengthCopied < tLength) {
260         Uint32 tData0 = tDataRegPtr[0];
261         Uint32 tData1 = tDataRegPtr[1];
262         Uint32 tData2 = tDataRegPtr[2];
263         Uint32 tData3 = tDataRegPtr[3];
264 
265         tDataRegPtr += 4;
266         tLengthCopied += 4;
267 
268         tSigDataPtr[0] = tData0;
269         tSigDataPtr[1] = tData1;
270         tSigDataPtr[2] = tData2;
271         tSigDataPtr[3] = tData3;
272         tSigDataPtr += 4;
273       }//while
274 
275       tSigDataPtr = signal->m_sectionPtrI;
276       tDataRegPtr = buf.theDataRegister + buf.header.theLength;
277       Uint32 ptr0 = * tDataRegPtr ++;
278       Uint32 ptr1 = * tDataRegPtr ++;
279       Uint32 ptr2 = * tDataRegPtr ++;
280       * tSigDataPtr ++ = ptr0;
281       * tSigDataPtr ++ = ptr1;
282       * tSigDataPtr ++ = ptr2;
283 
284       //---------------------------------------------------------
285       // Prefetch of buffer[rPtr] is done here. We prefetch for
286       // read both the first cache line and the next 64 byte
287       // entry
288       //---------------------------------------------------------
289       NDB_PREFETCH_READ((void*)&buffer[rPtr]);
290       NDB_PREFETCH_READ((void*)(((char*)&buffer[rPtr]) + 64));
291       return gsnbnr;
292     } else {
293       bnr_error();
294       return 0; // Will never come here, simply to keep GCC happy.
295     }//if
296   } else {
297     //------------------------------------------------------------
298     // The Job Buffer was empty, signal this by return zero.
299     //------------------------------------------------------------
300     return 0;
301   }//if
302 }//APZJobBuffer::retrieve()
303 
304 void
signal2buffer(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,BufferEntry & buf)305 APZJobBuffer::signal2buffer(Signal* signal,
306 			    BlockNumber bnr, GlobalSignalNumber gsn,
307 			    BufferEntry& buf)
308 {
309   Uint32 tSignalId = globalData.theSignalId;
310   Uint32 tFirstData = signal->theData[0];
311   Uint32 tLength = signal->header.theLength + signal->header.m_noOfSections;
312   Uint32 tSigId  = buf.header.theSignalId;
313 
314   buf.header = signal->header;
315   buf.header.theVerId_signalNumber = gsn;
316   buf.header.theReceiversBlockNumber = bnr;
317   buf.header.theSendersSignalId = tSignalId - 1;
318   buf.header.theSignalId = tSigId;
319   buf.theDataRegister[0] = tFirstData;
320 
321   Uint32 tLengthCopied = 1;
322   Uint32* tSigDataPtr = &signal->theData[1];
323   Uint32* tDataRegPtr = &buf.theDataRegister[1];
324   while (tLengthCopied < tLength) {
325     Uint32 tData0 = tSigDataPtr[0];
326     Uint32 tData1 = tSigDataPtr[1];
327     Uint32 tData2 = tSigDataPtr[2];
328     Uint32 tData3 = tSigDataPtr[3];
329 
330     tLengthCopied += 4;
331     tSigDataPtr += 4;
332 
333     tDataRegPtr[0] = tData0;
334     tDataRegPtr[1] = tData1;
335     tDataRegPtr[2] = tData2;
336     tDataRegPtr[3] = tData3;
337     tDataRegPtr += 4;
338   }//while
339 }//APZJobBuffer::signal2buffer()
340 
341 void
insert(const SignalHeader * const sh,const Uint32 * const theData,const Uint32 secPtrI[3])342 APZJobBuffer::insert(const SignalHeader * const sh,
343 		     const Uint32 * const theData, const Uint32 secPtrI[3]){
344   Uint32 tOccupancy = theOccupancy + 1;
345   Uint32 myWPtr = wPtr;
346   register BufferEntry& buf = buffer[myWPtr];
347 
348   if (tOccupancy < bufSize) {
349     Uint32 cond =  (++myWPtr == bufSize) - 1;
350     wPtr = myWPtr & cond;
351     theOccupancy = tOccupancy;
352 
353     buf.header = * sh;
354     const Uint32 len = buf.header.theLength;
355     memcpy(buf.theDataRegister, theData, 4 * len);
356     memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
357     //---------------------------------------------------------
358     // Prefetch of buffer[wPtr] is done here. We prefetch for
359     // write both the first cache line and the next 64 byte
360     // entry
361     //---------------------------------------------------------
362     NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
363     NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
364   } else {
365     jbuf_error();
366   }//if
367 }
APZJobBuffer()368 APZJobBuffer::APZJobBuffer()
369   : bufSize(0), buffer(NULL), memRef(NULL)
370 {
371   clear();
372 }
373 
~APZJobBuffer()374 APZJobBuffer::~APZJobBuffer()
375 {
376   delete [] buffer;
377 }
378 
379 void
newBuffer(int size)380 APZJobBuffer::newBuffer(int size)
381 {
382   buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
383   if(buffer){
384 #ifndef NDB_PURIFY
385     ::memset(buffer, 0, (size * sizeof(BufferEntry)));
386 #endif
387     bufSize = size;
388   } else
389     bufSize = 0;
390 }
391 
392 void
clear()393 APZJobBuffer::clear()
394 {
395   rPtr = 0;
396   wPtr = 0;
397   theOccupancy = 0;
398 }
399 
400 /**
401  * Function prototype for print_restart
402  *
403  *   Defined later in this file
404  */
405 void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
406 
dumpSignalMemory(Uint32 thr_no,FILE * output)407 void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE * output)
408 {
409   SignalT<25> signalT;
410   Signal * signal = new (&signalT) Signal(0);
411   Uint32 ReadPtr[5];
412   Uint32 tJob;
413   Uint32 tLastJob;
414 
415   /* Single threaded ndbd scheduler, no threads. */
416   assert(thr_no == 0);
417 
418   fprintf(output, "\n");
419 
420   if (globalData.JobLap > 4095) {
421     if (globalData.JobCounter != 0)
422       tJob = globalData.JobCounter - 1;
423     else
424       tJob = 4095;
425     tLastJob = globalData.JobCounter;
426   } else {
427     if (globalData.JobCounter == 0)
428       return; // No signals sent
429     else {
430       tJob = globalData.JobCounter - 1;
431       tLastJob = 4095;
432     }
433   }
434   ReadPtr[0] = theJobBuffers[0].getReadPtr();
435   ReadPtr[1] = theJobBuffers[1].getReadPtr();
436   ReadPtr[2] = theJobBuffers[2].getReadPtr();
437   ReadPtr[3] = theJobBuffers[3].getReadPtr();
438 
439   do {
440     unsigned char tLevel = theJobPriority[tJob];
441     globalData.incrementWatchDogCounter(4);
442     if (ReadPtr[tLevel] == 0)
443       ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
444     else
445       ReadPtr[tLevel]--;
446 
447     theJobBuffers[tLevel].retrieveDump(signal, ReadPtr[tLevel]);
448     // strip instance bits since this in non-MT code
449     signal->header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
450     print_restart(output, signal, tLevel);
451 
452     if (tJob == 0)
453       tJob = 4095;
454     else
455       tJob--;
456 
457   } while (tJob != tLastJob);
458   fflush(output);
459 }
460 
461 void
prio_level_error()462 FastScheduler::prio_level_error()
463 {
464   ERROR_SET(ecError, NDBD_EXIT_WRONG_PRIO_LEVEL,
465 	    "Wrong Priority Level", "FastScheduler.C");
466 }
467 
468 void
jbuf_error()469 jbuf_error()
470 {
471   ERROR_SET(ecError, NDBD_EXIT_BLOCK_JBUFCONGESTION,
472 	    "Job Buffer Full", "APZJobBuffer.C");
473 }
474 
475 void
bnr_error()476 bnr_error()
477 {
478   ERROR_SET(ecError, NDBD_EXIT_BLOCK_BNR_ZERO,
479 	    "Block Number Zero", "FastScheduler.C");
480 }
481 
482 void
print_restart(FILE * output,Signal * signal,Uint32 aLevel)483 print_restart(FILE * output, Signal* signal, Uint32 aLevel)
484 {
485   fprintf(output, "--------------- Signal ----------------\n");
486   SignalLoggerManager::printSignalHeader(output,
487 					 signal->header,
488 					 aLevel,
489 					 globalData.ownId,
490 					 true);
491   SignalLoggerManager::printSignalData  (output,
492 					 signal->header,
493 					 &signal->theData[0]);
494 }
495 
496 void
traceDumpPrepare(NdbShutdownType &)497 FastScheduler::traceDumpPrepare(NdbShutdownType&)
498 {
499   /* No-operation in single-threaded ndbd. */
500 }
501 
502 Uint32
traceDumpGetNumThreads()503 FastScheduler::traceDumpGetNumThreads()
504 {
505   return 1;                     // Single-threaded ndbd scheduler
506 }
507 
508 int
traceDumpGetCurrentThread()509 FastScheduler::traceDumpGetCurrentThread()
510 {
511   return -1;                     // Single-threaded ndbd scheduler
512 }
513 
514 bool
traceDumpGetJam(Uint32 thr_no,const JamEvent * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)515 FastScheduler::traceDumpGetJam(Uint32 thr_no,
516                                const JamEvent * & thrdTheEmulatedJam,
517                                Uint32 & thrdTheEmulatedJamIndex)
518 {
519   /* Single threaded ndbd scheduler, no threads. */
520   assert(thr_no == 0);
521 
522 #ifdef NO_EMULATED_JAM
523   thrdTheEmulatedJam = NULL;
524   thrdTheEmulatedJamIndex = 0;
525 #else
526   const EmulatedJamBuffer *jamBuffer =
527     (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
528   thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
529   thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
530 #endif
531   return true;
532 }
533 
534 
535 /**
536  * This method used to be a Cmvmi member function
537  * but is now a "ordinary" function"
538  *
539  * See TransporterCallback.cpp for explanation
540  */
541 void
reportDoJobStatistics(Uint32 tMeanLoopCount)542 FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
543   SignalT<2> signal;
544 
545   memset(&signal.header, 0, sizeof(signal.header));
546   signal.header.theLength = 2;
547   signal.header.theSendersSignalId = 0;
548   signal.header.theSendersBlockRef = numberToRef(0, 0);
549   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
550   signal.header.theReceiversBlockNumber = CMVMI;
551 
552   signal.theData[0] = NDB_LE_JobStatistic;
553   signal.theData[1] = tMeanLoopCount;
554 
555   Uint32 secPtr[3];
556   execute(&signal.header, JBA, signal.theData, secPtr);
557 }
558 
559 void
reportThreadConfigLoop(Uint32 expired_time,Uint32 extra_constant,Uint32 * no_exec_loops,Uint32 * tot_exec_time,Uint32 * no_extra_loops,Uint32 * tot_extra_time)560 FastScheduler::reportThreadConfigLoop(Uint32 expired_time,
561                                       Uint32 extra_constant,
562                                       Uint32 *no_exec_loops,
563                                       Uint32 *tot_exec_time,
564                                       Uint32 *no_extra_loops,
565                                       Uint32 *tot_extra_time)
566 {
567   SignalT<6> signal;
568 
569   memset(&signal.header, 0, sizeof(signal.header));
570   signal.header.theLength = 6;
571   signal.header.theSendersSignalId = 0;
572   signal.header.theSendersBlockRef = numberToRef(0, 0);
573   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
574   signal.header.theReceiversBlockNumber = CMVMI;
575 
576   signal.theData[0] = NDB_LE_ThreadConfigLoop;
577   signal.theData[1] = expired_time;
578   signal.theData[2] = extra_constant;
579   signal.theData[3] = (*tot_exec_time)/(*no_exec_loops);
580   signal.theData[4] = *no_extra_loops;
581   if (*no_extra_loops > 0)
582     signal.theData[5] = (*tot_extra_time)/(*no_extra_loops);
583   else
584     signal.theData[5] = 0;
585 
586   *no_exec_loops = 0;
587   *tot_exec_time = 0;
588   *no_extra_loops = 0;
589   *tot_extra_time = 0;
590 
591   Uint32 secPtr[3];
592   execute(&signal.header, JBA, signal.theData, secPtr);
593 }
594 
595 static NdbMutex g_mm_mutex;
596 
597 void
mt_mem_manager_init()598 mt_mem_manager_init()
599 {
600   NdbMutex_Init(&g_mm_mutex);
601 }
602 
603 void
mt_mem_manager_lock()604 mt_mem_manager_lock()
605 {
606   NdbMutex_Lock(&g_mm_mutex);
607 }
608 
609 void
mt_mem_manager_unlock()610 mt_mem_manager_unlock()
611 {
612   NdbMutex_Unlock(&g_mm_mutex);
613 }
614