1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #ifndef FastScheduler_H
26 #define FastScheduler_H
27 
28 #include <VMSignal.hpp>
29 #include <kernel_types.h>
30 #include <Prio.hpp>
31 #include <SignalLoggerManager.hpp>
32 #include <SimulatedBlock.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <GlobalData.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <portlib/ndb_prefetch.h>
37 
38 #define MAX_OCCUPANCY 1024
39 
40 #define JBASIZE   1280 // Jobs which have dead lines to meet use this level
41 #define JBBSIZE   4096 // Most jobs use this level
42 #define JBCSIZE   64   // Only used by STTOR and STTORRY currently
43 #define JBDSIZE   4096 // Time Queue uses this level for storage, not supported
44                        // as priority level
45 void bnr_error();
46 void jbuf_error();
47 class Signal;
48 class Block;
49 
50 class BufferEntry
51 {
52 public:
53   SignalHeader header;
54   Uint32 theDataRegister[25];
55 };
56 
57 class APZJobBuffer
58 {
59 public:
60   APZJobBuffer();
61   ~APZJobBuffer();
62 
63   void newBuffer(int size);
64 
65   void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn);
66   void insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]);
67   void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn,
68 	      Uint32 myWPtr);
69 
70   Uint32 retrieve(Signal *signal);
71   void retrieve(Signal *signal, Uint32 myRptr);
72 
73   /**
74    * Used when dumping to trace file
75    */
76   void retrieveDump(Signal *signal, Uint32 myRptr);
77 
78   void clear();
79   Uint32 getOccupancy() const;
80 
81   Uint32 getReadPtr() const;
82   Uint32 getWritePtr() const;
83   Uint32 getBufSize() const;
84 
85 private:
86   void signal2buffer(Signal* signal, BlockNumber bnr,
87 		     GlobalSignalNumber gsn, BufferEntry& buf);
88   Uint32 rPtr;
89   Uint32 wPtr;
90   Uint32 theOccupancy;
91   Uint32 bufSize;
92   BufferEntry* buffer;
93   BufferEntry* memRef;
94 };
95 
96 
97 class FastScheduler
98 {
99 public:
100    FastScheduler();
101    ~FastScheduler();
102 
103   void doJob();
104   void postPoll();
105   int checkDoJob();
106 
107   void activateSendPacked();
108 
109   void execute(Signal* signal,
110 	       Priority prio,
111 	       BlockNumber bnr,
112 	       GlobalSignalNumber gsn);
113 
114   void execute(const SignalHeader * const sh,
115 	       Uint8 prio, const Uint32 * const theData, const Uint32 secPtr[3]);
116 
117   void clear();
118   Signal* getVMSignals();
119 
120   Priority highestAvailablePrio() const;
121   Uint32 getBOccupancy() const;
122   void sendPacked();
123 
124   void insertTimeQueue(Signal* aSignal, BlockNumber bnr,
125 		       GlobalSignalNumber gsn, Uint32 aIndex);
126   void scheduleTimeQueue(Uint32 aIndex);
127 
128   /*
129     The following implement aspects of ErrorReporter that differs between
130     singlethreaded and multithread ndbd.
131   */
132 
133   /* Called before dumping, intended to stop any still running processing. */
134   void traceDumpPrepare(NdbShutdownType&);
135   /* Number of threads to create trace files for (thread id 0 .. N-1). */
136   Uint32 traceDumpGetNumThreads();
137 
138   int traceDumpGetCurrentThread(); // returns -1 if not found
139 
140   /* Get jam() buffers etc. for specific thread. */
141   bool traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
142                        const Uint32 * & thrdTheEmulatedJam,
143                        Uint32 & thrdTheEmulatedJamIndex);
144   /* Produce a signal dump. */
145   void dumpSignalMemory(Uint32 thr_no, FILE * output);
146 
147   void reportThreadConfigLoop(Uint32 expired_time, Uint32 extra_constant,
148                               Uint32 *no_exec_loops, Uint32 *tot_exec_time,
149                               Uint32 *no_extra_loops, Uint32 *tot_extra_time);
150 
151 private:
152   void highestAvailablePrio(Priority prio);
153   void reportJob(Priority aPriority);
154   void prio_level_error();
155 
156   Uint32 theDoJobTotalCounter;
157   Uint32 theDoJobCallCounter;
158   Uint8 theJobPriority[4096];
159   APZJobBuffer theJobBuffers[JB_LEVELS];
160 
161   void reportDoJobStatistics(Uint32 meanLoopCount);
162 };
163 
164 inline
165 Uint32
getBOccupancy() const166 FastScheduler::getBOccupancy() const {
167   return theJobBuffers[JBB].getOccupancy();
168 }//FastScheduler::getBOccupancy()
169 
170 inline
171 int
checkDoJob()172 FastScheduler::checkDoJob()
173 {
174   /*
175    * Job buffer overload protetction
176    * If the job buffer B is filled over a certain limit start
177    * to execute the signals in the job buffer's
178    */
179   if (getBOccupancy() < MAX_OCCUPANCY) {
180     return 0;
181   } else {
182     doJob();
183     return 1;
184   }//if
185 }//FastScheduler::checkDoJob()
186 
187 inline
188 void
reportJob(Priority aPriority)189 FastScheduler::reportJob(Priority aPriority)
190 {
191   Uint32 tJobCounter = globalData.JobCounter;
192   Uint64 tJobLap = globalData.JobLap;
193   theJobPriority[tJobCounter] = (Uint8)aPriority;
194   globalData.JobCounter = (tJobCounter + 1) & 4095;
195   globalData.JobLap = tJobLap + 1;
196 }
197 
198 inline
199 Priority
highestAvailablePrio() const200 FastScheduler::highestAvailablePrio() const
201 {
202    return (Priority)globalData.highestAvailablePrio;
203 }
204 
205 inline
206 void
highestAvailablePrio(Priority prio)207 FastScheduler::highestAvailablePrio(Priority prio)
208 {
209    globalData.highestAvailablePrio = (Uint32)prio;
210 }
211 
212 inline
213 Signal*
getVMSignals()214 FastScheduler::getVMSignals()
215 {
216   return &globalData.VMSignals[0];
217 }
218 
219 
220 // Inserts of a protocol object into the Job Buffer.
221 inline
222 void
execute(const SignalHeader * const sh,Uint8 prio,const Uint32 * const theData,const Uint32 secPtrI[3])223 FastScheduler::execute(const SignalHeader * const sh, Uint8 prio,
224 		       const Uint32 * const theData, const Uint32 secPtrI[3]){
225 #ifdef VM_TRACE
226   if (prio >= LEVEL_IDLE)
227     prio_level_error();
228 #endif
229 
230   theJobBuffers[prio].insert(sh, theData, secPtrI);
231   if (prio < (Uint8)highestAvailablePrio())
232     highestAvailablePrio((Priority)prio);
233 }
234 
235 inline
236 void
execute(Signal * signal,Priority prio,BlockNumber bnr,GlobalSignalNumber gsn)237 FastScheduler::execute(Signal* signal, Priority prio,
238 		       BlockNumber bnr, GlobalSignalNumber gsn)
239 {
240 #ifdef VM_TRACE
241   if (prio >= LEVEL_IDLE)
242     prio_level_error();
243 #endif
244   theJobBuffers[prio].insert(signal, bnr, gsn);
245   if (prio < highestAvailablePrio())
246     highestAvailablePrio(prio);
247 }
248 
249 inline
250 void
insertTimeQueue(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 aIndex)251 FastScheduler::insertTimeQueue(Signal* signal, BlockNumber bnr,
252 			       GlobalSignalNumber gsn, Uint32 aIndex)
253 {
254   theJobBuffers[3].insert(signal, bnr, gsn, aIndex);
255 }
256 
257 inline
258 void
scheduleTimeQueue(Uint32 aIndex)259 FastScheduler::scheduleTimeQueue(Uint32 aIndex)
260 {
261   Signal* signal = getVMSignals();
262   theJobBuffers[3].retrieve(signal, aIndex);
263   theJobBuffers[0].insert
264     (signal,
265      (BlockNumber)signal->header.theReceiversBlockNumber,
266      (GlobalSignalNumber)signal->header.theVerId_signalNumber);
267   if (highestAvailablePrio() > JBA)
268     highestAvailablePrio(JBA);
269 
270   signal->header.m_noOfSections = 0; // Or else sendPacked might pick it up
271 }
272 
273 inline
274 Uint32
getWritePtr() const275 APZJobBuffer::getWritePtr() const
276 {
277   return wPtr;
278 }
279 
280 inline
281 Uint32
getReadPtr() const282 APZJobBuffer::getReadPtr() const
283 {
284   return rPtr;
285 }
286 
287 inline
288 Uint32
getOccupancy() const289 APZJobBuffer::getOccupancy() const
290 {
291   return theOccupancy;
292 }
293 
294 inline
295 Uint32
getBufSize() const296 APZJobBuffer::getBufSize() const
297 {
298   return bufSize;
299 }
300 
301 inline
302 void
retrieve(Signal * signal,Uint32 myRptr)303 APZJobBuffer::retrieve(Signal* signal, Uint32 myRptr)
304 {
305   register BufferEntry& buf = buffer[myRptr];
306 
307   buf.header.theSignalId = globalData.theSignalId++;
308 
309   signal->header = buf.header;
310 
311   Uint32 *from = (Uint32*) &buf.theDataRegister[0];
312   Uint32 *to   = (Uint32*) &signal->theData[0];
313   Uint32 noOfWords = buf.header.theLength + buf.header.m_noOfSections;
314   for(; noOfWords; noOfWords--)
315     *to++ = *from++;
316   // Copy sections references (copy all without if-statements)
317   return;
318 }
319 
320 inline
321 void
retrieveDump(Signal * signal,Uint32 myRptr)322 APZJobBuffer::retrieveDump(Signal* signal, Uint32 myRptr)
323 {
324   /**
325    * Note that signal id is not taken from global data
326    */
327 
328   register BufferEntry& buf = buffer[myRptr];
329   signal->header = buf.header;
330 
331   Uint32 *from = (Uint32*) &buf.theDataRegister[0];
332   Uint32 *to   = (Uint32*) &signal->theData[0];
333   Uint32 noOfWords = buf.header.theLength;
334   for(; noOfWords; noOfWords--)
335     *to++ = *from++;
336   return;
337 }
338 
339 inline
340 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn)341 APZJobBuffer::insert(Signal* signal,
342 		     BlockNumber bnr, GlobalSignalNumber gsn)
343 {
344   Uint32 tOccupancy = theOccupancy + 1;
345   Uint32 myWPtr = wPtr;
346   if (tOccupancy < bufSize) {
347     register BufferEntry& buf = buffer[myWPtr];
348     Uint32 cond =  (++myWPtr == bufSize) - 1;
349     wPtr = myWPtr & cond;
350     theOccupancy = tOccupancy;
351     signal2buffer(signal, bnr, gsn, buf);
352     //---------------------------------------------------------
353     // Prefetch of buffer[wPtr] is done here. We prefetch for
354     // write both the first cache line and the next 64 byte
355     // entry
356     //---------------------------------------------------------
357     NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
358     NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
359   } else {
360     jbuf_error();
361   }//if
362 }
363 
364 
365 inline
366 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 myWPtr)367 APZJobBuffer::insert(Signal* signal, BlockNumber bnr,
368 		     GlobalSignalNumber gsn, Uint32 myWPtr)
369 {
370   register BufferEntry& buf = buffer[myWPtr];
371   signal2buffer(signal, bnr, gsn, buf);
372 }
373 
374 #endif
375