1 /*
2    Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef FastScheduler_H
26 #define FastScheduler_H
27 
28 #include <VMSignal.hpp>
29 #include <kernel_types.h>
30 #include <Prio.hpp>
31 #include <SignalLoggerManager.hpp>
32 #include <SimulatedBlock.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <GlobalData.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <portlib/ndb_prefetch.h>
37 
38 #define JAM_FILE_ID 244
39 
40 
41 #define MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN 100
42 #define MAX_OCCUPANCY 1024
43 
44 #define JBASIZE   1280 // Jobs which have dead lines to meet use this level
45 #define JBBSIZE   4096 // Most jobs use this level
46 #define JBCSIZE   64   // Only used by STTOR and STTORRY currently
47 #define JBDSIZE   4096 // Time Queue uses this level for storage, not supported
48                        // as priority level
49 void bnr_error();
50 void jbuf_error();
51 class Signal;
52 class Block;
53 
54 class BufferEntry
55 {
56 public:
57   SignalHeader header;
58   Uint32 theDataRegister[25];
59 };
60 
61 class APZJobBuffer
62 {
63 public:
64   APZJobBuffer();
65   ~APZJobBuffer();
66 
67   void newBuffer(int size);
68 
69   void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn);
70   void insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]);
71   void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn,
72 	      Uint32 myWPtr);
73 
74   Uint32 retrieve(Signal *signal);
75   void retrieve(Signal *signal, Uint32 myRptr);
76 
77   /**
78    * Used when dumping to trace file
79    */
80   void retrieveDump(Signal *signal, Uint32 myRptr);
81 
82   void clear();
83   Uint32 getOccupancy() const;
84 
85   Uint32 getReadPtr() const;
86   Uint32 getWritePtr() const;
87   Uint32 getBufSize() const;
88 
89 private:
90   void signal2buffer(Signal* signal, BlockNumber bnr,
91 		     GlobalSignalNumber gsn, BufferEntry& buf);
92   Uint32 rPtr;
93   Uint32 wPtr;
94   Uint32 theOccupancy;
95   Uint32 bufSize;
96   BufferEntry* buffer;
97   BufferEntry* memRef;
98 };
99 
100 
101 class FastScheduler
102 {
103 public:
104    FastScheduler();
105    ~FastScheduler();
106 
107   Uint32 doJob(Uint32 loopStartCount);
108   void postPoll();
109   int checkDoJob();
110 
111   void activateSendPacked();
112 
113   void execute(Signal* signal,
114 	       Priority prio,
115 	       BlockNumber bnr,
116 	       GlobalSignalNumber gsn);
117 
118   void execute(const SignalHeader * const sh,
119 	       Uint8 prio, const Uint32 * const theData, const Uint32 secPtr[3]);
120 
121   void clear();
122   Signal* getVMSignals();
123 
124   Priority highestAvailablePrio() const;
125   Uint32 getBOccupancy() const;
126   void sendPacked();
127 
128   void insertTimeQueue(Signal* aSignal, BlockNumber bnr,
129 		       GlobalSignalNumber gsn, Uint32 aIndex);
130   void scheduleTimeQueue(Uint32 aIndex);
131 
132   /*
133     The following implement aspects of ErrorReporter that differs between
134     singlethreaded and multithread ndbd.
135   */
136 
137   /* Called before dumping, intended to stop any still running processing. */
138   void traceDumpPrepare(NdbShutdownType&);
139   /* Number of threads to create trace files for (thread id 0 .. N-1). */
140   Uint32 traceDumpGetNumThreads();
141 
142   int traceDumpGetCurrentThread(); // returns -1 if not found
143 
144   /* Get jam() buffers etc. for specific thread. */
145   bool traceDumpGetJam(Uint32 thr_no,
146                        const JamEvent * & thrdTheEmulatedJam,
147                        Uint32 & thrdTheEmulatedJamIndex);
148   /* Produce a signal dump. */
149   void dumpSignalMemory(Uint32 thr_no, FILE * output);
150 
151   void reportThreadConfigLoop(Uint32 expired_time, Uint32 extra_constant,
152                               Uint32 *no_exec_loops, Uint32 *tot_exec_time,
153                               Uint32 *no_extra_loops, Uint32 *tot_extra_time);
154 
155 private:
156   void highestAvailablePrio(Priority prio);
157   void reportJob(Priority aPriority);
158   void prio_level_error();
159 
160   Uint32 theDoJobTotalCounter;
161   Uint32 theDoJobCallCounter;
162   Uint8 theJobPriority[4096];
163   APZJobBuffer theJobBuffers[JB_LEVELS];
164 
165   void reportDoJobStatistics(Uint32 meanLoopCount);
166 };
167 
168 inline
169 Uint32
getBOccupancy() const170 FastScheduler::getBOccupancy() const {
171   return theJobBuffers[JBB].getOccupancy();
172 }//FastScheduler::getBOccupancy()
173 
174 inline
175 int
checkDoJob()176 FastScheduler::checkDoJob()
177 {
178   /*
179    * Job buffer overload protetction
180    * If the job buffer B is filled over a certain limit start
181    * to execute the signals in the job buffer's
182    */
183   if (getBOccupancy() < MAX_OCCUPANCY) {
184     return 0;
185   } else {
186     Uint32 loopStartCount = 0;
187     doJob(loopStartCount);
188     return 1;
189   }//if
190 }//FastScheduler::checkDoJob()
191 
192 inline
193 void
reportJob(Priority aPriority)194 FastScheduler::reportJob(Priority aPriority)
195 {
196   Uint32 tJobCounter = globalData.JobCounter;
197   Uint64 tJobLap = globalData.JobLap;
198   theJobPriority[tJobCounter] = (Uint8)aPriority;
199   globalData.JobCounter = (tJobCounter + 1) & 4095;
200   globalData.JobLap = tJobLap + 1;
201 }
202 
203 inline
204 Priority
highestAvailablePrio() const205 FastScheduler::highestAvailablePrio() const
206 {
207    return (Priority)globalData.highestAvailablePrio;
208 }
209 
210 inline
211 void
highestAvailablePrio(Priority prio)212 FastScheduler::highestAvailablePrio(Priority prio)
213 {
214    globalData.highestAvailablePrio = (Uint32)prio;
215 }
216 
217 inline
218 Signal*
getVMSignals()219 FastScheduler::getVMSignals()
220 {
221   return &globalData.VMSignals[0];
222 }
223 
224 
225 // Inserts of a protocol object into the Job Buffer.
226 inline
227 void
execute(const SignalHeader * const sh,Uint8 prio,const Uint32 * const theData,const Uint32 secPtrI[3])228 FastScheduler::execute(const SignalHeader * const sh, Uint8 prio,
229 		       const Uint32 * const theData, const Uint32 secPtrI[3]){
230 #ifdef VM_TRACE
231   if (prio >= LEVEL_IDLE)
232     prio_level_error();
233 #endif
234 
235   theJobBuffers[prio].insert(sh, theData, secPtrI);
236   if (prio < (Uint8)highestAvailablePrio())
237     highestAvailablePrio((Priority)prio);
238 }
239 
240 inline
241 void
execute(Signal * signal,Priority prio,BlockNumber bnr,GlobalSignalNumber gsn)242 FastScheduler::execute(Signal* signal, Priority prio,
243 		       BlockNumber bnr, GlobalSignalNumber gsn)
244 {
245 #ifdef VM_TRACE
246   if (prio >= LEVEL_IDLE)
247     prio_level_error();
248 #endif
249   theJobBuffers[prio].insert(signal, bnr, gsn);
250   if (prio < highestAvailablePrio())
251     highestAvailablePrio(prio);
252 }
253 
254 inline
255 void
insertTimeQueue(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 aIndex)256 FastScheduler::insertTimeQueue(Signal* signal, BlockNumber bnr,
257 			       GlobalSignalNumber gsn, Uint32 aIndex)
258 {
259   theJobBuffers[3].insert(signal, bnr, gsn, aIndex);
260 }
261 
262 inline
263 void
scheduleTimeQueue(Uint32 aIndex)264 FastScheduler::scheduleTimeQueue(Uint32 aIndex)
265 {
266   Signal* signal = getVMSignals();
267   theJobBuffers[3].retrieve(signal, aIndex);
268   theJobBuffers[0].insert
269     (signal,
270      (BlockNumber)signal->header.theReceiversBlockNumber,
271      (GlobalSignalNumber)signal->header.theVerId_signalNumber);
272   if (highestAvailablePrio() > JBA)
273     highestAvailablePrio(JBA);
274 
275   signal->header.m_noOfSections = 0; // Or else sendPacked might pick it up
276 }
277 
278 inline
279 Uint32
getWritePtr() const280 APZJobBuffer::getWritePtr() const
281 {
282   return wPtr;
283 }
284 
285 inline
286 Uint32
getReadPtr() const287 APZJobBuffer::getReadPtr() const
288 {
289   return rPtr;
290 }
291 
292 inline
293 Uint32
getOccupancy() const294 APZJobBuffer::getOccupancy() const
295 {
296   return theOccupancy;
297 }
298 
299 inline
300 Uint32
getBufSize() const301 APZJobBuffer::getBufSize() const
302 {
303   return bufSize;
304 }
305 
306 inline
307 void
retrieve(Signal * signal,Uint32 myRptr)308 APZJobBuffer::retrieve(Signal* signal, Uint32 myRptr)
309 {
310   register BufferEntry& buf = buffer[myRptr];
311 
312   buf.header.theSignalId = globalData.theSignalId++;
313 
314   signal->header = buf.header;
315 
316   Uint32 *from = (Uint32*) &buf.theDataRegister[0];
317   Uint32 *to   = (Uint32*) &signal->theData[0];
318   Uint32 noOfWords = buf.header.theLength + buf.header.m_noOfSections;
319   for(; noOfWords; noOfWords--)
320     *to++ = *from++;
321   // Copy sections references (copy all without if-statements)
322   return;
323 }
324 
325 inline
326 void
retrieveDump(Signal * signal,Uint32 myRptr)327 APZJobBuffer::retrieveDump(Signal* signal, Uint32 myRptr)
328 {
329   /**
330    * Note that signal id is not taken from global data
331    */
332 
333   register BufferEntry& buf = buffer[myRptr];
334   signal->header = buf.header;
335 
336   Uint32 *from = (Uint32*) &buf.theDataRegister[0];
337   Uint32 *to   = (Uint32*) &signal->theData[0];
338   Uint32 noOfWords = buf.header.theLength;
339   for(; noOfWords; noOfWords--)
340     *to++ = *from++;
341   return;
342 }
343 
344 inline
345 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn)346 APZJobBuffer::insert(Signal* signal,
347 		     BlockNumber bnr, GlobalSignalNumber gsn)
348 {
349   Uint32 tOccupancy = theOccupancy + 1;
350   Uint32 myWPtr = wPtr;
351   if (tOccupancy < bufSize) {
352     register BufferEntry& buf = buffer[myWPtr];
353     Uint32 cond =  (++myWPtr == bufSize) - 1;
354     wPtr = myWPtr & cond;
355     theOccupancy = tOccupancy;
356     signal2buffer(signal, bnr, gsn, buf);
357     //---------------------------------------------------------
358     // Prefetch of buffer[wPtr] is done here. We prefetch for
359     // write both the first cache line and the next 64 byte
360     // entry
361     //---------------------------------------------------------
362     NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
363     NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
364   } else {
365     jbuf_error();
366   }//if
367 }
368 
369 
370 inline
371 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 myWPtr)372 APZJobBuffer::insert(Signal* signal, BlockNumber bnr,
373 		     GlobalSignalNumber gsn, Uint32 myWPtr)
374 {
375   register BufferEntry& buf = buffer[myWPtr];
376   signal2buffer(signal, bnr, gsn, buf);
377 }
378 
379 
380 #undef JAM_FILE_ID
381 
382 #endif
383