1 /*
2    Copyright (c) 2003, 2018, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "FastScheduler.hpp"
26 #include "ThreadConfig.hpp"
27 #include "RefConvert.hpp"
28 
29 #include "Emulator.hpp"
30 #include "VMSignal.hpp"
31 
32 #include <SignalLoggerManager.hpp>
33 #include <BlockNumbers.h>
34 #include <GlobalSignalNumbers.h>
35 #include <signaldata/EventReport.hpp>
36 #include "LongSignal.hpp"
37 #include <NdbTick.h>
38 
39 #define JAM_FILE_ID 242
40 
41 
42 #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
43 #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
44 #define EXTRA_SIGNALS_PER_DO_JOB 32
45 
FastScheduler()46 FastScheduler::FastScheduler()
47 {
48    // These constants work for sun only, but they should be initated from
49    // Emulator.C as soon as VMTime has been initiated.
50    theJobBuffers[0].newBuffer(JBASIZE);
51    theJobBuffers[1].newBuffer(JBBSIZE);
52    theJobBuffers[2].newBuffer(JBCSIZE);
53    theJobBuffers[3].newBuffer(JBDSIZE);
54    clear();
55 }
56 
~FastScheduler()57 FastScheduler::~FastScheduler()
58 {
59 }
60 
61 void
clear()62 FastScheduler::clear()
63 {
64   int i;
65   // Make sure the restart signals are not sent too early
66   // the prio is set back in 'main' using the 'ready' method.
67   globalData.highestAvailablePrio = LEVEL_IDLE;
68   globalData.sendPackedActivated = 0;
69   globalData.activateSendPacked = 0;
70   for (i = 0; i < JB_LEVELS; i++){
71     theJobBuffers[i].clear();
72   }
73   globalData.JobCounter = 0;
74   globalData.JobLap = 0;
75   globalData.loopMax = 32;
76   globalData.VMSignals[0].header.theSignalId = 0;
77 
78   theDoJobTotalCounter = 0;
79   theDoJobCallCounter = 0;
80 }
81 
82 void
activateSendPacked()83 FastScheduler::activateSendPacked()
84 {
85   globalData.sendPackedActivated = 1;
86   globalData.activateSendPacked = 0;
87   globalData.loopMax = 2048;
88 }//FastScheduler::activateSendPacked()
89 
90 //------------------------------------------------------------------------
91 // sendPacked is executed at the end of the loop.
92 // To ensure that we don't send any messages before executing all local
93 // packed signals we do another turn in the loop (unless we have already
94 // executed too many signals in the loop).
95 //------------------------------------------------------------------------
96 Uint32
doJob(Uint32 loopStartCount)97 FastScheduler::doJob(Uint32 loopStartCount)
98 {
99   Uint32 loopCount = 0;
100   Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
101   Uint32 TloopMax = (Uint32)globalData.loopMax;
102   if (TminLoops < TloopMax) {
103     TloopMax = TminLoops;
104   }//if
105   if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
106     TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
107   }//if
108   Signal* signal = getVMSignals();
109   Uint32 tHighPrio= globalData.highestAvailablePrio;
110   do{
111     while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
112 #ifdef VM_TRACE
113       /* Find reading / propagation of junk */
114       signal->garbage_register();
115 #endif
116       if (unlikely(loopStartCount >
117           MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN))
118       {
119         /**
120          * This implements the bounded delay signal concept. This
121          * means that we will never execute more than 160 signals
122          * before getting the signals with delay 0 put into the
123          * A-level job buffer.
124          */
125         loopStartCount = 0;
126         globalEmulatorData.theThreadConfig->scanZeroTimeQueue();
127       }
128       // To ensure we find bugs quickly
129       Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
130       // also strip any instance bits since this is non-MT code
131       BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
132       GlobalSignalNumber reg_gsn = gsnbnr >> 16;
133       globalData.incrementWatchDogCounter(1);
134       if (reg_bnr > 0) {
135         Uint32 tJobCounter = globalData.JobCounter;
136         Uint64 tJobLap = globalData.JobLap;
137         SimulatedBlock* b = globalData.getBlock(reg_bnr);
138         theJobPriority[tJobCounter] = (Uint8)tHighPrio;
139         globalData.JobCounter = (tJobCounter + 1) & 4095;
140         globalData.JobLap = tJobLap + 1;
141 
142 #ifdef VM_TRACE_TIME
143 	const NDB_TICKS t1 = NdbTick_getCurrentTicks();
144 	b->m_currentGsn = reg_gsn;
145 #endif
146 
147 #ifdef VM_TRACE
148         {
149           if (globalData.testOn) {
150 	    signal->header.theVerId_signalNumber = reg_gsn;
151 	    signal->header.theReceiversBlockNumber = reg_bnr;
152 
153             globalSignalLoggers.executeSignal(signal->header,
154 					      tHighPrio,
155 					      &signal->theData[0],
156 					      globalData.ownId);
157           }//if
158         }
159 #endif
160         b->jamBuffer()->markEndOfSigExec();
161         b->executeFunction_async(reg_gsn, signal);
162 #ifdef VM_TRACE_TIME
163 	const NDB_TICKS t2 = NdbTick_getCurrentTicks();
164         const Uint64 diff = NdbTick_Elapsed(t1,t2).microSec();
165 	b->addTime(reg_gsn, diff);
166 #endif
167         tHighPrio = globalData.highestAvailablePrio;
168       } else {
169         tHighPrio++;
170         globalData.highestAvailablePrio = tHighPrio;
171       }//if
172       loopCount++;
173       loopStartCount++;
174     }//while
175     sendPacked();
176     tHighPrio = globalData.highestAvailablePrio;
177     if(getBOccupancy() > MAX_OCCUPANCY)
178     {
179       if(loopCount != TloopMax)
180 	abort();
181       assert( loopCount == TloopMax );
182       TloopMax += 512;
183     }
184   } while ((getBOccupancy() > MAX_OCCUPANCY) ||
185            ((loopCount < TloopMax) &&
186             (tHighPrio < LEVEL_IDLE)));
187 
188   theDoJobCallCounter ++;
189   theDoJobTotalCounter += loopCount;
190   if (theDoJobCallCounter == 8192) {
191     reportDoJobStatistics(theDoJobTotalCounter >> 13);
192     theDoJobCallCounter = 0;
193     theDoJobTotalCounter = 0;
194   }//if
195   return loopStartCount;
196 }//FastScheduler::doJob()
197 
198 void
postPoll()199 FastScheduler::postPoll()
200 {
201   Signal * signal = getVMSignals();
202   SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
203   b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
204 }
205 
sendPacked()206 void FastScheduler::sendPacked()
207 {
208   if (globalData.sendPackedActivated == 1) {
209     SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
210     SimulatedBlock* b_tc = globalData.getBlock(DBTC);
211     SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
212     SimulatedBlock* b_fs = globalData.getBlock(NDBFS);
213     Signal * signal = getVMSignals();
214     b_lqh->executeFunction_async(GSN_SEND_PACKED, signal);
215     b_tc->executeFunction_async(GSN_SEND_PACKED, signal);
216     b_tup->executeFunction_async(GSN_SEND_PACKED, signal);
217     b_fs->executeFunction_async(GSN_SEND_PACKED, signal);
218     return;
219   } else if (globalData.activateSendPacked == 0) {
220     return;
221   } else {
222     activateSendPacked();
223   }//if
224   return;
225 }//FastScheduler::sendPacked()
226 
227 Uint32
retrieve(Signal * signal)228 APZJobBuffer::retrieve(Signal* signal)
229 {
230   Uint32 tOccupancy = theOccupancy;
231   Uint32 myRPtr = rPtr;
232   BufferEntry& buf = buffer[myRPtr];
233   Uint32 gsnbnr;
234   Uint32 cond =  (++myRPtr == bufSize) - 1;
235   Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
236 
237   if (tOccupancy != 0) {
238     if (tRecBlockNo != 0) {
239       // Transform protocol to signal.
240       rPtr = myRPtr & cond;
241       theOccupancy = tOccupancy - 1;
242       gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
243 
244       Uint32 tSignalId = globalData.theSignalId;
245       Uint32 tLength = buf.header.theLength;
246       Uint32 tFirstData = buf.theDataRegister[0];
247       signal->header = buf.header;
248 
249       // Recall our signal Id for restart purposes
250       buf.header.theSignalId = tSignalId;
251       globalData.theSignalId = tSignalId + 1;
252 
253       Uint32* tDataRegPtr = &buf.theDataRegister[0];
254       Uint32* tSigDataPtr = signal->getDataPtrSend();
255       *tSigDataPtr = tFirstData;
256       tDataRegPtr++;
257       tSigDataPtr++;
258       Uint32  tLengthCopied = 1;
259       while (tLengthCopied < tLength) {
260         Uint32 tData0 = tDataRegPtr[0];
261         Uint32 tData1 = tDataRegPtr[1];
262         Uint32 tData2 = tDataRegPtr[2];
263         Uint32 tData3 = tDataRegPtr[3];
264 
265         tDataRegPtr += 4;
266         tLengthCopied += 4;
267 
268         tSigDataPtr[0] = tData0;
269         tSigDataPtr[1] = tData1;
270         tSigDataPtr[2] = tData2;
271         tSigDataPtr[3] = tData3;
272         tSigDataPtr += 4;
273       }//while
274 
275       tSigDataPtr = signal->m_sectionPtrI;
276       tDataRegPtr = buf.theDataRegister + buf.header.theLength;
277       Uint32 ptr0 = * tDataRegPtr ++;
278       Uint32 ptr1 = * tDataRegPtr ++;
279       Uint32 ptr2 = * tDataRegPtr ++;
280       * tSigDataPtr ++ = ptr0;
281       * tSigDataPtr ++ = ptr1;
282       * tSigDataPtr ++ = ptr2;
283 
284       //---------------------------------------------------------
285       // Prefetch of buffer[rPtr] is done here. We prefetch for
286       // read both the first cache line and the next 64 byte
287       // entry
288       //---------------------------------------------------------
289       NDB_PREFETCH_READ((void*)&buffer[rPtr]);
290       NDB_PREFETCH_READ((void*)(((char*)&buffer[rPtr]) + 64));
291       return gsnbnr;
292     } else {
293       bnr_error();
294       return 0; // Will never come here, simply to keep GCC happy.
295     }//if
296   } else {
297     //------------------------------------------------------------
298     // The Job Buffer was empty, signal this by return zero.
299     //------------------------------------------------------------
300     return 0;
301   }//if
302 }//APZJobBuffer::retrieve()
303 
304 void
signal2buffer(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,BufferEntry & buf)305 APZJobBuffer::signal2buffer(Signal* signal,
306 			    BlockNumber bnr, GlobalSignalNumber gsn,
307 			    BufferEntry& buf)
308 {
309   Uint32 tSignalId = globalData.theSignalId;
310   Uint32 tLength = signal->header.theLength + signal->header.m_noOfSections;
311   Uint32 tSigId  = buf.header.theSignalId;
312 
313   buf.header = signal->header;
314   buf.header.theVerId_signalNumber = gsn;
315   buf.header.theReceiversBlockNumber = bnr;
316   buf.header.theSendersSignalId = tSignalId - 1;
317   buf.header.theSignalId = tSigId;
318 
319   Uint32* tSigDataPtr = &signal->theData[0];
320   Uint32* tDataRegPtr = &buf.theDataRegister[0];
321 
322   // TODO hint that data is aligned(?) to -4 bytes per 16 bytes for performance
323   memcpy(tDataRegPtr, tSigDataPtr, tLength * sizeof(Uint32));
324 }//APZJobBuffer::signal2buffer()
325 
326 void
insert(const SignalHeader * const sh,const Uint32 * const theData,const Uint32 secPtrI[3])327 APZJobBuffer::insert(const SignalHeader * const sh,
328 		     const Uint32 * const theData, const Uint32 secPtrI[3]){
329   Uint32 tOccupancy = theOccupancy + 1;
330   Uint32 myWPtr = wPtr;
331   BufferEntry& buf = buffer[myWPtr];
332 
333   if (tOccupancy < bufSize) {
334     Uint32 cond =  (++myWPtr == bufSize) - 1;
335     wPtr = myWPtr & cond;
336     theOccupancy = tOccupancy;
337 
338     buf.header = * sh;
339     const Uint32 len = buf.header.theLength;
340     memcpy(buf.theDataRegister, theData, 4 * len);
341     memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
342     //---------------------------------------------------------
343     // Prefetch of buffer[wPtr] is done here. We prefetch for
344     // write both the first cache line and the next 64 byte
345     // entry
346     //---------------------------------------------------------
347     NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
348     NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
349   } else {
350     jbuf_error();
351   }//if
352 }
APZJobBuffer()353 APZJobBuffer::APZJobBuffer()
354   : bufSize(0), buffer(NULL), memRef(NULL)
355 {
356   clear();
357 }
358 
~APZJobBuffer()359 APZJobBuffer::~APZJobBuffer()
360 {
361   delete [] buffer;
362 }
363 
364 void
newBuffer(int size)365 APZJobBuffer::newBuffer(int size)
366 {
367   buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
368   if(buffer){
369 #ifndef NDB_PURIFY
370     ::memset(buffer, 0, (size * sizeof(BufferEntry)));
371 #endif
372     bufSize = size;
373   } else
374     bufSize = 0;
375 }
376 
377 void
clear()378 APZJobBuffer::clear()
379 {
380   rPtr = 0;
381   wPtr = 0;
382   theOccupancy = 0;
383 }
384 
385 /**
386  * Function prototype for print_restart
387  *
388  *   Defined later in this file
389  */
390 void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
391 
dumpSignalMemory(Uint32 thr_no,FILE * output)392 void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE * output)
393 {
394   SignalT<25> signalT;
395   Signal * signal = new (&signalT) Signal(0);
396   Uint32 ReadPtr[5];
397   Uint32 tJob;
398   Uint32 tLastJob;
399 
400   /* Single threaded ndbd scheduler, no threads. */
401   assert(thr_no == 0);
402 
403   fprintf(output, "\n");
404 
405   if (globalData.JobLap > 4095) {
406     if (globalData.JobCounter != 0)
407       tJob = globalData.JobCounter - 1;
408     else
409       tJob = 4095;
410     tLastJob = globalData.JobCounter;
411   } else {
412     if (globalData.JobCounter == 0)
413       return; // No signals sent
414     else {
415       tJob = globalData.JobCounter - 1;
416       tLastJob = 4095;
417     }
418   }
419   ReadPtr[0] = theJobBuffers[0].getReadPtr();
420   ReadPtr[1] = theJobBuffers[1].getReadPtr();
421   ReadPtr[2] = theJobBuffers[2].getReadPtr();
422   ReadPtr[3] = theJobBuffers[3].getReadPtr();
423 
424   do {
425     unsigned char tLevel = theJobPriority[tJob];
426     globalData.incrementWatchDogCounter(4);
427     if (ReadPtr[tLevel] == 0)
428       ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
429     else
430       ReadPtr[tLevel]--;
431 
432     theJobBuffers[tLevel].retrieveDump(signal, ReadPtr[tLevel]);
433     // strip instance bits since this in non-MT code
434     signal->header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
435     print_restart(output, signal, tLevel);
436 
437     if (tJob == 0)
438       tJob = 4095;
439     else
440       tJob--;
441 
442   } while (tJob != tLastJob);
443   fflush(output);
444 }
445 
446 void
prio_level_error()447 FastScheduler::prio_level_error()
448 {
449   ERROR_SET(ecError, NDBD_EXIT_WRONG_PRIO_LEVEL,
450 	    "Wrong Priority Level", "FastScheduler.C");
451 }
452 
453 void
jbuf_error()454 jbuf_error()
455 {
456   ERROR_SET(ecError, NDBD_EXIT_BLOCK_JBUFCONGESTION,
457 	    "Job Buffer Full", "APZJobBuffer.C");
458 }
459 
460 void
bnr_error()461 bnr_error()
462 {
463   ERROR_SET(ecError, NDBD_EXIT_BLOCK_BNR_ZERO,
464 	    "Block Number Zero", "FastScheduler.C");
465 }
466 
467 void
print_restart(FILE * output,Signal * signal,Uint32 aLevel)468 print_restart(FILE * output, Signal* signal, Uint32 aLevel)
469 {
470   fprintf(output, "--------------- Signal ----------------\n");
471   SignalLoggerManager::printSignalHeader(output,
472 					 signal->header,
473 					 aLevel,
474 					 globalData.ownId,
475 					 true);
476   SignalLoggerManager::printSignalData  (output,
477 					 signal->header,
478 					 &signal->theData[0]);
479 }
480 
481 void
traceDumpPrepare(NdbShutdownType &)482 FastScheduler::traceDumpPrepare(NdbShutdownType&)
483 {
484   /* No-operation in single-threaded ndbd. */
485 }
486 
487 Uint32
traceDumpGetNumThreads()488 FastScheduler::traceDumpGetNumThreads()
489 {
490   return 1;                     // Single-threaded ndbd scheduler
491 }
492 
493 int
traceDumpGetCurrentThread()494 FastScheduler::traceDumpGetCurrentThread()
495 {
496   return -1;                     // Single-threaded ndbd scheduler
497 }
498 
499 bool
traceDumpGetJam(Uint32 thr_no,const JamEvent * & thrdTheEmulatedJam,Uint32 & thrdTheEmulatedJamIndex)500 FastScheduler::traceDumpGetJam(Uint32 thr_no,
501                                const JamEvent * & thrdTheEmulatedJam,
502                                Uint32 & thrdTheEmulatedJamIndex)
503 {
504   /* Single threaded ndbd scheduler, no threads. */
505   assert(thr_no == 0);
506 
507 #ifdef NO_EMULATED_JAM
508   thrdTheEmulatedJam = NULL;
509   thrdTheEmulatedJamIndex = 0;
510 #else
511   const EmulatedJamBuffer *jamBuffer = NDB_THREAD_TLS_JAM;
512   thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
513   thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
514 #endif
515   return true;
516 }
517 
518 
519 /**
520  * This method used to be a Cmvmi member function
521  * but is now a "ordinary" function"
522  *
523  * See TransporterCallback.cpp for explanation
524  */
525 void
reportDoJobStatistics(Uint32 tMeanLoopCount)526 FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
527   SignalT<2> signal;
528 
529   memset(&signal.header, 0, sizeof(signal.header));
530   signal.header.theLength = 2;
531   signal.header.theSendersSignalId = 0;
532   signal.header.theSendersBlockRef = numberToRef(0, 0);
533   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
534   signal.header.theReceiversBlockNumber = CMVMI;
535 
536   signal.theData[0] = NDB_LE_JobStatistic;
537   signal.theData[1] = tMeanLoopCount;
538 
539   Uint32 secPtr[3];
540   execute(&signal.header, JBA, signal.theData, secPtr);
541 }
542 
543 void
reportThreadConfigLoop(Uint32 expired_time,Uint32 extra_constant,Uint32 * no_exec_loops,Uint32 * tot_exec_time,Uint32 * no_extra_loops,Uint32 * tot_extra_time)544 FastScheduler::reportThreadConfigLoop(Uint32 expired_time,
545                                       Uint32 extra_constant,
546                                       Uint32 *no_exec_loops,
547                                       Uint32 *tot_exec_time,
548                                       Uint32 *no_extra_loops,
549                                       Uint32 *tot_extra_time)
550 {
551   SignalT<6> signal;
552 
553   memset(&signal.header, 0, sizeof(signal.header));
554   signal.header.theLength = 6;
555   signal.header.theSendersSignalId = 0;
556   signal.header.theSendersBlockRef = numberToRef(0, 0);
557   signal.header.theVerId_signalNumber = GSN_EVENT_REP;
558   signal.header.theReceiversBlockNumber = CMVMI;
559 
560   signal.theData[0] = NDB_LE_ThreadConfigLoop;
561   signal.theData[1] = expired_time;
562   signal.theData[2] = extra_constant;
563   signal.theData[3] = (*tot_exec_time)/(*no_exec_loops);
564   signal.theData[4] = *no_extra_loops;
565   if (*no_extra_loops > 0)
566     signal.theData[5] = (*tot_extra_time)/(*no_extra_loops);
567   else
568     signal.theData[5] = 0;
569 
570   *no_exec_loops = 0;
571   *tot_exec_time = 0;
572   *no_extra_loops = 0;
573   *tot_extra_time = 0;
574 
575   Uint32 secPtr[3];
576   execute(&signal.header, JBA, signal.theData, secPtr);
577 }
578 
579 static NdbMutex g_mm_mutex;
580 
581 void
mt_mem_manager_init()582 mt_mem_manager_init()
583 {
584   NdbMutex_Init(&g_mm_mutex);
585 }
586 
587 void
mt_mem_manager_lock()588 mt_mem_manager_lock()
589 {
590   NdbMutex_Lock(&g_mm_mutex);
591 }
592 
593 void
mt_mem_manager_unlock()594 mt_mem_manager_unlock()
595 {
596   NdbMutex_Unlock(&g_mm_mutex);
597 }
598