1 /*
2    Copyright (c) 2003, 2018, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef FastScheduler_H
26 #define FastScheduler_H
27 
28 #include <VMSignal.hpp>
29 #include <kernel_types.h>
30 #include <Prio.hpp>
31 #include <SignalLoggerManager.hpp>
32 #include <SimulatedBlock.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <GlobalData.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <portlib/ndb_prefetch.h>
37 #include <portlib/NdbTick.h>
38 
39 #define JAM_FILE_ID 244
40 
41 
42 #define MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN 100
43 #define MAX_OCCUPANCY 1024
44 
45 #define JBASIZE   1280 // Jobs which have dead lines to meet use this level
46 #define JBBSIZE   4096 // Most jobs use this level
47 #define JBCSIZE   64   // Only used by STTOR and STTORRY currently
48 #define JBDSIZE   4096 // Time Queue uses this level for storage, not supported
49                        // as priority level
50 void bnr_error();
51 void jbuf_error();
52 class Signal;
53 class Block;
54 
55 class BufferEntry
56 {
57 public:
58   SignalHeader header;
59   Uint32 theDataRegister[25];
60 };
61 
62 class APZJobBuffer
63 {
64 public:
65   APZJobBuffer();
66   ~APZJobBuffer();
67 
68   void newBuffer(int size);
69 
70   void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn);
71   void insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]);
72   void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn,
73 	      Uint32 myWPtr);
74 
75   Uint32 retrieve(Signal *signal);
76   void retrieve(Signal *signal, Uint32 myRptr);
77 
78   /**
79    * Used when dumping to trace file
80    */
81   void retrieveDump(Signal *signal, Uint32 myRptr);
82 
83   void clear();
84   Uint32 getOccupancy() const;
85 
86   Uint32 getReadPtr() const;
87   Uint32 getWritePtr() const;
88   Uint32 getBufSize() const;
89 
90 private:
91   void signal2buffer(Signal* signal, BlockNumber bnr,
92 		     GlobalSignalNumber gsn, BufferEntry& buf);
93   Uint32 rPtr;
94   Uint32 wPtr;
95   Uint32 theOccupancy;
96   Uint32 bufSize;
97   BufferEntry* buffer;
98   BufferEntry* memRef;
99 };
100 
101 
102 class FastScheduler
103 {
104 public:
105    FastScheduler();
106    ~FastScheduler();
107 
108   Uint32 doJob(Uint32 loopStartCount);
109   void postPoll();
110   int checkDoJob();
111 
112   void activateSendPacked();
113 
114   void execute(Signal* signal,
115 	       Priority prio,
116 	       BlockNumber bnr,
117 	       GlobalSignalNumber gsn);
118 
119   void execute(const SignalHeader * const sh,
120 	       Uint8 prio, const Uint32 * const theData, const Uint32 secPtr[3]);
121 
122   void clear();
123   Signal* getVMSignals();
124 
125   Priority highestAvailablePrio() const;
126   Uint32 getBOccupancy() const;
127   void sendPacked();
128 
129   void insertTimeQueue(Signal* aSignal, BlockNumber bnr,
130 		       GlobalSignalNumber gsn, Uint32 aIndex);
131   void scheduleTimeQueue(Uint32 aIndex);
132 
133   /*
134     The following implement aspects of ErrorReporter that differs between
135     singlethreaded and multithread ndbd.
136   */
137 
138   /* Called before dumping, intended to stop any still running processing. */
139   void traceDumpPrepare(NdbShutdownType&);
140   /* Number of threads to create trace files for (thread id 0 .. N-1). */
141   Uint32 traceDumpGetNumThreads();
142 
143   int traceDumpGetCurrentThread(); // returns -1 if not found
144 
145   /* Get jam() buffers etc. for specific thread. */
146   bool traceDumpGetJam(Uint32 thr_no,
147                        const JamEvent * & thrdTheEmulatedJam,
148                        Uint32 & thrdTheEmulatedJamIndex);
149   /* Produce a signal dump. */
150   void dumpSignalMemory(Uint32 thr_no, FILE * output);
151 
152   void reportThreadConfigLoop(Uint32 expired_time, Uint32 extra_constant,
153                               Uint32 *no_exec_loops, Uint32 *tot_exec_time,
154                               Uint32 *no_extra_loops, Uint32 *tot_extra_time);
155 
156   /* Get/Set high resolution timer in microseconds */
getHighResTimer()157   NDB_TICKS getHighResTimer() { return curr_ticks; }
getHighResTimerPtr()158   const NDB_TICKS* getHighResTimerPtr() { return &curr_ticks; }
setHighResTimer(NDB_TICKS ticks)159   void setHighResTimer(NDB_TICKS ticks)
160   { curr_ticks = ticks;}
161 private:
162   void highestAvailablePrio(Priority prio);
163   void reportJob(Priority aPriority);
164   void prio_level_error();
165 
166   Uint32 theDoJobTotalCounter;
167   Uint32 theDoJobCallCounter;
168   NDB_TICKS curr_ticks;
169   Uint8 theJobPriority[4096];
170   APZJobBuffer theJobBuffers[JB_LEVELS];
171 
172   void reportDoJobStatistics(Uint32 meanLoopCount);
173 };
174 
175 inline
176 Uint32
getBOccupancy() const177 FastScheduler::getBOccupancy() const {
178   return theJobBuffers[JBB].getOccupancy();
179 }//FastScheduler::getBOccupancy()
180 
181 inline
182 int
checkDoJob()183 FastScheduler::checkDoJob()
184 {
185   /*
186    * Job buffer overload protetction
187    * If the job buffer B is filled over a certain limit start
188    * to execute the signals in the job buffer's
189    */
190   if (getBOccupancy() < MAX_OCCUPANCY) {
191     return 0;
192   } else {
193     Uint32 loopStartCount = 0;
194     doJob(loopStartCount);
195     return 1;
196   }//if
197 }//FastScheduler::checkDoJob()
198 
199 inline
200 void
reportJob(Priority aPriority)201 FastScheduler::reportJob(Priority aPriority)
202 {
203   Uint32 tJobCounter = globalData.JobCounter;
204   Uint64 tJobLap = globalData.JobLap;
205   theJobPriority[tJobCounter] = (Uint8)aPriority;
206   globalData.JobCounter = (tJobCounter + 1) & 4095;
207   globalData.JobLap = tJobLap + 1;
208 }
209 
210 inline
211 Priority
highestAvailablePrio() const212 FastScheduler::highestAvailablePrio() const
213 {
214    return (Priority)globalData.highestAvailablePrio;
215 }
216 
217 inline
218 void
highestAvailablePrio(Priority prio)219 FastScheduler::highestAvailablePrio(Priority prio)
220 {
221    globalData.highestAvailablePrio = (Uint32)prio;
222 }
223 
224 inline
225 Signal*
getVMSignals()226 FastScheduler::getVMSignals()
227 {
228   return &globalData.VMSignals[0];
229 }
230 
231 
232 // Inserts of a protocol object into the Job Buffer.
233 inline
234 void
execute(const SignalHeader * const sh,Uint8 prio,const Uint32 * const theData,const Uint32 secPtrI[3])235 FastScheduler::execute(const SignalHeader * const sh, Uint8 prio,
236 		       const Uint32 * const theData, const Uint32 secPtrI[3]){
237 #ifdef VM_TRACE
238   if (prio >= LEVEL_IDLE)
239     prio_level_error();
240 #endif
241 
242   theJobBuffers[prio].insert(sh, theData, secPtrI);
243   if (prio < (Uint8)highestAvailablePrio())
244     highestAvailablePrio((Priority)prio);
245 }
246 
247 inline
248 void
execute(Signal * signal,Priority prio,BlockNumber bnr,GlobalSignalNumber gsn)249 FastScheduler::execute(Signal* signal, Priority prio,
250 		       BlockNumber bnr, GlobalSignalNumber gsn)
251 {
252 #ifdef VM_TRACE
253   if (prio >= LEVEL_IDLE)
254     prio_level_error();
255 #endif
256   theJobBuffers[prio].insert(signal, bnr, gsn);
257   if (prio < highestAvailablePrio())
258     highestAvailablePrio(prio);
259 }
260 
261 inline
262 void
insertTimeQueue(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 aIndex)263 FastScheduler::insertTimeQueue(Signal* signal, BlockNumber bnr,
264 			       GlobalSignalNumber gsn, Uint32 aIndex)
265 {
266   theJobBuffers[3].insert(signal, bnr, gsn, aIndex);
267 }
268 
269 inline
270 void
scheduleTimeQueue(Uint32 aIndex)271 FastScheduler::scheduleTimeQueue(Uint32 aIndex)
272 {
273   Signal* signal = getVMSignals();
274   theJobBuffers[3].retrieve(signal, aIndex);
275   theJobBuffers[0].insert
276     (signal,
277      (BlockNumber)signal->header.theReceiversBlockNumber,
278      (GlobalSignalNumber)signal->header.theVerId_signalNumber);
279   if (highestAvailablePrio() > JBA)
280     highestAvailablePrio(JBA);
281 
282   signal->header.m_noOfSections = 0; // Or else sendPacked might pick it up
283 }
284 
285 inline
286 Uint32
getWritePtr() const287 APZJobBuffer::getWritePtr() const
288 {
289   return wPtr;
290 }
291 
292 inline
293 Uint32
getReadPtr() const294 APZJobBuffer::getReadPtr() const
295 {
296   return rPtr;
297 }
298 
299 inline
300 Uint32
getOccupancy() const301 APZJobBuffer::getOccupancy() const
302 {
303   return theOccupancy;
304 }
305 
306 inline
307 Uint32
getBufSize() const308 APZJobBuffer::getBufSize() const
309 {
310   return bufSize;
311 }
312 
313 inline
314 void
retrieve(Signal * signal,Uint32 myRptr)315 APZJobBuffer::retrieve(Signal* signal, Uint32 myRptr)
316 {
317   BufferEntry& buf = buffer[myRptr];
318 
319   buf.header.theSignalId = globalData.theSignalId++;
320 
321   signal->header = buf.header;
322 
323   Uint32 *from = (Uint32*) &buf.theDataRegister[0];
324   Uint32 *to   = (Uint32*) &signal->theData[0];
325   Uint32 noOfWords = buf.header.theLength + buf.header.m_noOfSections;
326   for(; noOfWords; noOfWords--)
327     *to++ = *from++;
328   // Copy sections references (copy all without if-statements)
329   return;
330 }
331 
332 inline
333 void
retrieveDump(Signal * signal,Uint32 myRptr)334 APZJobBuffer::retrieveDump(Signal* signal, Uint32 myRptr)
335 {
336   /**
337    * Note that signal id is not taken from global data
338    */
339 
340   BufferEntry& buf = buffer[myRptr];
341   signal->header = buf.header;
342 
343   Uint32 *from = (Uint32*) &buf.theDataRegister[0];
344   Uint32 *to   = (Uint32*) &signal->theData[0];
345   Uint32 noOfWords = buf.header.theLength;
346   for(; noOfWords; noOfWords--)
347     *to++ = *from++;
348   return;
349 }
350 
351 inline
352 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn)353 APZJobBuffer::insert(Signal* signal,
354 		     BlockNumber bnr, GlobalSignalNumber gsn)
355 {
356   Uint32 tOccupancy = theOccupancy + 1;
357   Uint32 myWPtr = wPtr;
358   if (tOccupancy < bufSize) {
359     BufferEntry& buf = buffer[myWPtr];
360     Uint32 cond =  (++myWPtr == bufSize) - 1;
361     wPtr = myWPtr & cond;
362     theOccupancy = tOccupancy;
363     signal2buffer(signal, bnr, gsn, buf);
364     //---------------------------------------------------------
365     // Prefetch of buffer[wPtr] is done here. We prefetch for
366     // write both the first cache line and the next 64 byte
367     // entry
368     //---------------------------------------------------------
369     NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
370     NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
371   } else {
372     jbuf_error();
373   }//if
374 }
375 
376 
377 inline
378 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 myWPtr)379 APZJobBuffer::insert(Signal* signal, BlockNumber bnr,
380 		     GlobalSignalNumber gsn, Uint32 myWPtr)
381 {
382   BufferEntry& buf = buffer[myWPtr];
383   signal2buffer(signal, bnr, gsn, buf);
384 }
385 
386 
387 #undef JAM_FILE_ID
388 
389 #endif
390