1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef FastScheduler_H
26 #define FastScheduler_H
27
28 #include <VMSignal.hpp>
29 #include <kernel_types.h>
30 #include <Prio.hpp>
31 #include <SignalLoggerManager.hpp>
32 #include <SimulatedBlock.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <GlobalData.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <portlib/ndb_prefetch.h>
37
38 #define MAX_OCCUPANCY 1024
39
40 #define JBASIZE 1280 // Jobs which have dead lines to meet use this level
41 #define JBBSIZE 4096 // Most jobs use this level
42 #define JBCSIZE 64 // Only used by STTOR and STTORRY currently
43 #define JBDSIZE 4096 // Time Queue uses this level for storage, not supported
44 // as priority level
45 void bnr_error();
46 void jbuf_error();
47 class Signal;
48 class Block;
49
50 class BufferEntry
51 {
52 public:
53 SignalHeader header;
54 Uint32 theDataRegister[25];
55 };
56
57 class APZJobBuffer
58 {
59 public:
60 APZJobBuffer();
61 ~APZJobBuffer();
62
63 void newBuffer(int size);
64
65 void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn);
66 void insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]);
67 void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn,
68 Uint32 myWPtr);
69
70 Uint32 retrieve(Signal *signal);
71 void retrieve(Signal *signal, Uint32 myRptr);
72
73 /**
74 * Used when dumping to trace file
75 */
76 void retrieveDump(Signal *signal, Uint32 myRptr);
77
78 void clear();
79 Uint32 getOccupancy() const;
80
81 Uint32 getReadPtr() const;
82 Uint32 getWritePtr() const;
83 Uint32 getBufSize() const;
84
85 private:
86 void signal2buffer(Signal* signal, BlockNumber bnr,
87 GlobalSignalNumber gsn, BufferEntry& buf);
88 Uint32 rPtr;
89 Uint32 wPtr;
90 Uint32 theOccupancy;
91 Uint32 bufSize;
92 BufferEntry* buffer;
93 BufferEntry* memRef;
94 };
95
96
97 class FastScheduler
98 {
99 public:
100 FastScheduler();
101 ~FastScheduler();
102
103 void doJob();
104 void postPoll();
105 int checkDoJob();
106
107 void activateSendPacked();
108
109 void execute(Signal* signal,
110 Priority prio,
111 BlockNumber bnr,
112 GlobalSignalNumber gsn);
113
114 void execute(const SignalHeader * const sh,
115 Uint8 prio, const Uint32 * const theData, const Uint32 secPtr[3]);
116
117 void clear();
118 Signal* getVMSignals();
119
120 Priority highestAvailablePrio() const;
121 Uint32 getBOccupancy() const;
122 void sendPacked();
123
124 void insertTimeQueue(Signal* aSignal, BlockNumber bnr,
125 GlobalSignalNumber gsn, Uint32 aIndex);
126 void scheduleTimeQueue(Uint32 aIndex);
127
128 /*
129 The following implement aspects of ErrorReporter that differs between
130 singlethreaded and multithread ndbd.
131 */
132
133 /* Called before dumping, intended to stop any still running processing. */
134 void traceDumpPrepare(NdbShutdownType&);
135 /* Number of threads to create trace files for (thread id 0 .. N-1). */
136 Uint32 traceDumpGetNumThreads();
137
138 int traceDumpGetCurrentThread(); // returns -1 if not found
139
140 /* Get jam() buffers etc. for specific thread. */
141 bool traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
142 const Uint32 * & thrdTheEmulatedJam,
143 Uint32 & thrdTheEmulatedJamIndex);
144 /* Produce a signal dump. */
145 void dumpSignalMemory(Uint32 thr_no, FILE * output);
146
147 void reportThreadConfigLoop(Uint32 expired_time, Uint32 extra_constant,
148 Uint32 *no_exec_loops, Uint32 *tot_exec_time,
149 Uint32 *no_extra_loops, Uint32 *tot_extra_time);
150
151 private:
152 void highestAvailablePrio(Priority prio);
153 void reportJob(Priority aPriority);
154 void prio_level_error();
155
156 Uint32 theDoJobTotalCounter;
157 Uint32 theDoJobCallCounter;
158 Uint8 theJobPriority[4096];
159 APZJobBuffer theJobBuffers[JB_LEVELS];
160
161 void reportDoJobStatistics(Uint32 meanLoopCount);
162 };
163
164 inline
165 Uint32
getBOccupancy() const166 FastScheduler::getBOccupancy() const {
167 return theJobBuffers[JBB].getOccupancy();
168 }//FastScheduler::getBOccupancy()
169
170 inline
171 int
checkDoJob()172 FastScheduler::checkDoJob()
173 {
174 /*
175 * Job buffer overload protetction
176 * If the job buffer B is filled over a certain limit start
177 * to execute the signals in the job buffer's
178 */
179 if (getBOccupancy() < MAX_OCCUPANCY) {
180 return 0;
181 } else {
182 doJob();
183 return 1;
184 }//if
185 }//FastScheduler::checkDoJob()
186
187 inline
188 void
reportJob(Priority aPriority)189 FastScheduler::reportJob(Priority aPriority)
190 {
191 Uint32 tJobCounter = globalData.JobCounter;
192 Uint64 tJobLap = globalData.JobLap;
193 theJobPriority[tJobCounter] = (Uint8)aPriority;
194 globalData.JobCounter = (tJobCounter + 1) & 4095;
195 globalData.JobLap = tJobLap + 1;
196 }
197
198 inline
199 Priority
highestAvailablePrio() const200 FastScheduler::highestAvailablePrio() const
201 {
202 return (Priority)globalData.highestAvailablePrio;
203 }
204
205 inline
206 void
highestAvailablePrio(Priority prio)207 FastScheduler::highestAvailablePrio(Priority prio)
208 {
209 globalData.highestAvailablePrio = (Uint32)prio;
210 }
211
212 inline
213 Signal*
getVMSignals()214 FastScheduler::getVMSignals()
215 {
216 return &globalData.VMSignals[0];
217 }
218
219
220 // Inserts of a protocol object into the Job Buffer.
221 inline
222 void
execute(const SignalHeader * const sh,Uint8 prio,const Uint32 * const theData,const Uint32 secPtrI[3])223 FastScheduler::execute(const SignalHeader * const sh, Uint8 prio,
224 const Uint32 * const theData, const Uint32 secPtrI[3]){
225 #ifdef VM_TRACE
226 if (prio >= LEVEL_IDLE)
227 prio_level_error();
228 #endif
229
230 theJobBuffers[prio].insert(sh, theData, secPtrI);
231 if (prio < (Uint8)highestAvailablePrio())
232 highestAvailablePrio((Priority)prio);
233 }
234
235 inline
236 void
execute(Signal * signal,Priority prio,BlockNumber bnr,GlobalSignalNumber gsn)237 FastScheduler::execute(Signal* signal, Priority prio,
238 BlockNumber bnr, GlobalSignalNumber gsn)
239 {
240 #ifdef VM_TRACE
241 if (prio >= LEVEL_IDLE)
242 prio_level_error();
243 #endif
244 theJobBuffers[prio].insert(signal, bnr, gsn);
245 if (prio < highestAvailablePrio())
246 highestAvailablePrio(prio);
247 }
248
249 inline
250 void
insertTimeQueue(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 aIndex)251 FastScheduler::insertTimeQueue(Signal* signal, BlockNumber bnr,
252 GlobalSignalNumber gsn, Uint32 aIndex)
253 {
254 theJobBuffers[3].insert(signal, bnr, gsn, aIndex);
255 }
256
257 inline
258 void
scheduleTimeQueue(Uint32 aIndex)259 FastScheduler::scheduleTimeQueue(Uint32 aIndex)
260 {
261 Signal* signal = getVMSignals();
262 theJobBuffers[3].retrieve(signal, aIndex);
263 theJobBuffers[0].insert
264 (signal,
265 (BlockNumber)signal->header.theReceiversBlockNumber,
266 (GlobalSignalNumber)signal->header.theVerId_signalNumber);
267 if (highestAvailablePrio() > JBA)
268 highestAvailablePrio(JBA);
269
270 signal->header.m_noOfSections = 0; // Or else sendPacked might pick it up
271 }
272
273 inline
274 Uint32
getWritePtr() const275 APZJobBuffer::getWritePtr() const
276 {
277 return wPtr;
278 }
279
280 inline
281 Uint32
getReadPtr() const282 APZJobBuffer::getReadPtr() const
283 {
284 return rPtr;
285 }
286
287 inline
288 Uint32
getOccupancy() const289 APZJobBuffer::getOccupancy() const
290 {
291 return theOccupancy;
292 }
293
294 inline
295 Uint32
getBufSize() const296 APZJobBuffer::getBufSize() const
297 {
298 return bufSize;
299 }
300
301 inline
302 void
retrieve(Signal * signal,Uint32 myRptr)303 APZJobBuffer::retrieve(Signal* signal, Uint32 myRptr)
304 {
305 register BufferEntry& buf = buffer[myRptr];
306
307 buf.header.theSignalId = globalData.theSignalId++;
308
309 signal->header = buf.header;
310
311 Uint32 *from = (Uint32*) &buf.theDataRegister[0];
312 Uint32 *to = (Uint32*) &signal->theData[0];
313 Uint32 noOfWords = buf.header.theLength + buf.header.m_noOfSections;
314 for(; noOfWords; noOfWords--)
315 *to++ = *from++;
316 // Copy sections references (copy all without if-statements)
317 return;
318 }
319
320 inline
321 void
retrieveDump(Signal * signal,Uint32 myRptr)322 APZJobBuffer::retrieveDump(Signal* signal, Uint32 myRptr)
323 {
324 /**
325 * Note that signal id is not taken from global data
326 */
327
328 register BufferEntry& buf = buffer[myRptr];
329 signal->header = buf.header;
330
331 Uint32 *from = (Uint32*) &buf.theDataRegister[0];
332 Uint32 *to = (Uint32*) &signal->theData[0];
333 Uint32 noOfWords = buf.header.theLength;
334 for(; noOfWords; noOfWords--)
335 *to++ = *from++;
336 return;
337 }
338
339 inline
340 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn)341 APZJobBuffer::insert(Signal* signal,
342 BlockNumber bnr, GlobalSignalNumber gsn)
343 {
344 Uint32 tOccupancy = theOccupancy + 1;
345 Uint32 myWPtr = wPtr;
346 if (tOccupancy < bufSize) {
347 register BufferEntry& buf = buffer[myWPtr];
348 Uint32 cond = (++myWPtr == bufSize) - 1;
349 wPtr = myWPtr & cond;
350 theOccupancy = tOccupancy;
351 signal2buffer(signal, bnr, gsn, buf);
352 //---------------------------------------------------------
353 // Prefetch of buffer[wPtr] is done here. We prefetch for
354 // write both the first cache line and the next 64 byte
355 // entry
356 //---------------------------------------------------------
357 NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
358 NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
359 } else {
360 jbuf_error();
361 }//if
362 }
363
364
365 inline
366 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 myWPtr)367 APZJobBuffer::insert(Signal* signal, BlockNumber bnr,
368 GlobalSignalNumber gsn, Uint32 myWPtr)
369 {
370 register BufferEntry& buf = buffer[myWPtr];
371 signal2buffer(signal, bnr, gsn, buf);
372 }
373
374 #endif
375