1 /*
2 Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef FastScheduler_H
26 #define FastScheduler_H
27
28 #include <VMSignal.hpp>
29 #include <kernel_types.h>
30 #include <Prio.hpp>
31 #include <SignalLoggerManager.hpp>
32 #include <SimulatedBlock.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <GlobalData.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <portlib/ndb_prefetch.h>
37
38 #define JAM_FILE_ID 244
39
40
41 #define MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN 100
42 #define MAX_OCCUPANCY 1024
43
44 #define JBASIZE 1280 // Jobs which have dead lines to meet use this level
45 #define JBBSIZE 4096 // Most jobs use this level
46 #define JBCSIZE 64 // Only used by STTOR and STTORRY currently
47 #define JBDSIZE 4096 // Time Queue uses this level for storage, not supported
48 // as priority level
49 void bnr_error();
50 void jbuf_error();
51 class Signal;
52 class Block;
53
54 class BufferEntry
55 {
56 public:
57 SignalHeader header;
58 Uint32 theDataRegister[25];
59 };
60
61 class APZJobBuffer
62 {
63 public:
64 APZJobBuffer();
65 ~APZJobBuffer();
66
67 void newBuffer(int size);
68
69 void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn);
70 void insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]);
71 void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn,
72 Uint32 myWPtr);
73
74 Uint32 retrieve(Signal *signal);
75 void retrieve(Signal *signal, Uint32 myRptr);
76
77 /**
78 * Used when dumping to trace file
79 */
80 void retrieveDump(Signal *signal, Uint32 myRptr);
81
82 void clear();
83 Uint32 getOccupancy() const;
84
85 Uint32 getReadPtr() const;
86 Uint32 getWritePtr() const;
87 Uint32 getBufSize() const;
88
89 private:
90 void signal2buffer(Signal* signal, BlockNumber bnr,
91 GlobalSignalNumber gsn, BufferEntry& buf);
92 Uint32 rPtr;
93 Uint32 wPtr;
94 Uint32 theOccupancy;
95 Uint32 bufSize;
96 BufferEntry* buffer;
97 BufferEntry* memRef;
98 };
99
100
101 class FastScheduler
102 {
103 public:
104 FastScheduler();
105 ~FastScheduler();
106
107 Uint32 doJob(Uint32 loopStartCount);
108 void postPoll();
109 int checkDoJob();
110
111 void activateSendPacked();
112
113 void execute(Signal* signal,
114 Priority prio,
115 BlockNumber bnr,
116 GlobalSignalNumber gsn);
117
118 void execute(const SignalHeader * const sh,
119 Uint8 prio, const Uint32 * const theData, const Uint32 secPtr[3]);
120
121 void clear();
122 Signal* getVMSignals();
123
124 Priority highestAvailablePrio() const;
125 Uint32 getBOccupancy() const;
126 void sendPacked();
127
128 void insertTimeQueue(Signal* aSignal, BlockNumber bnr,
129 GlobalSignalNumber gsn, Uint32 aIndex);
130 void scheduleTimeQueue(Uint32 aIndex);
131
132 /*
133 The following implement aspects of ErrorReporter that differs between
134 singlethreaded and multithread ndbd.
135 */
136
137 /* Called before dumping, intended to stop any still running processing. */
138 void traceDumpPrepare(NdbShutdownType&);
139 /* Number of threads to create trace files for (thread id 0 .. N-1). */
140 Uint32 traceDumpGetNumThreads();
141
142 int traceDumpGetCurrentThread(); // returns -1 if not found
143
144 /* Get jam() buffers etc. for specific thread. */
145 bool traceDumpGetJam(Uint32 thr_no,
146 const JamEvent * & thrdTheEmulatedJam,
147 Uint32 & thrdTheEmulatedJamIndex);
148 /* Produce a signal dump. */
149 void dumpSignalMemory(Uint32 thr_no, FILE * output);
150
151 void reportThreadConfigLoop(Uint32 expired_time, Uint32 extra_constant,
152 Uint32 *no_exec_loops, Uint32 *tot_exec_time,
153 Uint32 *no_extra_loops, Uint32 *tot_extra_time);
154
155 private:
156 void highestAvailablePrio(Priority prio);
157 void reportJob(Priority aPriority);
158 void prio_level_error();
159
160 Uint32 theDoJobTotalCounter;
161 Uint32 theDoJobCallCounter;
162 Uint8 theJobPriority[4096];
163 APZJobBuffer theJobBuffers[JB_LEVELS];
164
165 void reportDoJobStatistics(Uint32 meanLoopCount);
166 };
167
168 inline
169 Uint32
getBOccupancy() const170 FastScheduler::getBOccupancy() const {
171 return theJobBuffers[JBB].getOccupancy();
172 }//FastScheduler::getBOccupancy()
173
174 inline
175 int
checkDoJob()176 FastScheduler::checkDoJob()
177 {
178 /*
179 * Job buffer overload protetction
180 * If the job buffer B is filled over a certain limit start
181 * to execute the signals in the job buffer's
182 */
183 if (getBOccupancy() < MAX_OCCUPANCY) {
184 return 0;
185 } else {
186 Uint32 loopStartCount = 0;
187 doJob(loopStartCount);
188 return 1;
189 }//if
190 }//FastScheduler::checkDoJob()
191
192 inline
193 void
reportJob(Priority aPriority)194 FastScheduler::reportJob(Priority aPriority)
195 {
196 Uint32 tJobCounter = globalData.JobCounter;
197 Uint64 tJobLap = globalData.JobLap;
198 theJobPriority[tJobCounter] = (Uint8)aPriority;
199 globalData.JobCounter = (tJobCounter + 1) & 4095;
200 globalData.JobLap = tJobLap + 1;
201 }
202
203 inline
204 Priority
highestAvailablePrio() const205 FastScheduler::highestAvailablePrio() const
206 {
207 return (Priority)globalData.highestAvailablePrio;
208 }
209
210 inline
211 void
highestAvailablePrio(Priority prio)212 FastScheduler::highestAvailablePrio(Priority prio)
213 {
214 globalData.highestAvailablePrio = (Uint32)prio;
215 }
216
217 inline
218 Signal*
getVMSignals()219 FastScheduler::getVMSignals()
220 {
221 return &globalData.VMSignals[0];
222 }
223
224
225 // Inserts of a protocol object into the Job Buffer.
226 inline
227 void
execute(const SignalHeader * const sh,Uint8 prio,const Uint32 * const theData,const Uint32 secPtrI[3])228 FastScheduler::execute(const SignalHeader * const sh, Uint8 prio,
229 const Uint32 * const theData, const Uint32 secPtrI[3]){
230 #ifdef VM_TRACE
231 if (prio >= LEVEL_IDLE)
232 prio_level_error();
233 #endif
234
235 theJobBuffers[prio].insert(sh, theData, secPtrI);
236 if (prio < (Uint8)highestAvailablePrio())
237 highestAvailablePrio((Priority)prio);
238 }
239
240 inline
241 void
execute(Signal * signal,Priority prio,BlockNumber bnr,GlobalSignalNumber gsn)242 FastScheduler::execute(Signal* signal, Priority prio,
243 BlockNumber bnr, GlobalSignalNumber gsn)
244 {
245 #ifdef VM_TRACE
246 if (prio >= LEVEL_IDLE)
247 prio_level_error();
248 #endif
249 theJobBuffers[prio].insert(signal, bnr, gsn);
250 if (prio < highestAvailablePrio())
251 highestAvailablePrio(prio);
252 }
253
254 inline
255 void
insertTimeQueue(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 aIndex)256 FastScheduler::insertTimeQueue(Signal* signal, BlockNumber bnr,
257 GlobalSignalNumber gsn, Uint32 aIndex)
258 {
259 theJobBuffers[3].insert(signal, bnr, gsn, aIndex);
260 }
261
262 inline
263 void
scheduleTimeQueue(Uint32 aIndex)264 FastScheduler::scheduleTimeQueue(Uint32 aIndex)
265 {
266 Signal* signal = getVMSignals();
267 theJobBuffers[3].retrieve(signal, aIndex);
268 theJobBuffers[0].insert
269 (signal,
270 (BlockNumber)signal->header.theReceiversBlockNumber,
271 (GlobalSignalNumber)signal->header.theVerId_signalNumber);
272 if (highestAvailablePrio() > JBA)
273 highestAvailablePrio(JBA);
274
275 signal->header.m_noOfSections = 0; // Or else sendPacked might pick it up
276 }
277
278 inline
279 Uint32
getWritePtr() const280 APZJobBuffer::getWritePtr() const
281 {
282 return wPtr;
283 }
284
285 inline
286 Uint32
getReadPtr() const287 APZJobBuffer::getReadPtr() const
288 {
289 return rPtr;
290 }
291
292 inline
293 Uint32
getOccupancy() const294 APZJobBuffer::getOccupancy() const
295 {
296 return theOccupancy;
297 }
298
299 inline
300 Uint32
getBufSize() const301 APZJobBuffer::getBufSize() const
302 {
303 return bufSize;
304 }
305
306 inline
307 void
retrieve(Signal * signal,Uint32 myRptr)308 APZJobBuffer::retrieve(Signal* signal, Uint32 myRptr)
309 {
310 register BufferEntry& buf = buffer[myRptr];
311
312 buf.header.theSignalId = globalData.theSignalId++;
313
314 signal->header = buf.header;
315
316 Uint32 *from = (Uint32*) &buf.theDataRegister[0];
317 Uint32 *to = (Uint32*) &signal->theData[0];
318 Uint32 noOfWords = buf.header.theLength + buf.header.m_noOfSections;
319 for(; noOfWords; noOfWords--)
320 *to++ = *from++;
321 // Copy sections references (copy all without if-statements)
322 return;
323 }
324
325 inline
326 void
retrieveDump(Signal * signal,Uint32 myRptr)327 APZJobBuffer::retrieveDump(Signal* signal, Uint32 myRptr)
328 {
329 /**
330 * Note that signal id is not taken from global data
331 */
332
333 register BufferEntry& buf = buffer[myRptr];
334 signal->header = buf.header;
335
336 Uint32 *from = (Uint32*) &buf.theDataRegister[0];
337 Uint32 *to = (Uint32*) &signal->theData[0];
338 Uint32 noOfWords = buf.header.theLength;
339 for(; noOfWords; noOfWords--)
340 *to++ = *from++;
341 return;
342 }
343
344 inline
345 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn)346 APZJobBuffer::insert(Signal* signal,
347 BlockNumber bnr, GlobalSignalNumber gsn)
348 {
349 Uint32 tOccupancy = theOccupancy + 1;
350 Uint32 myWPtr = wPtr;
351 if (tOccupancy < bufSize) {
352 register BufferEntry& buf = buffer[myWPtr];
353 Uint32 cond = (++myWPtr == bufSize) - 1;
354 wPtr = myWPtr & cond;
355 theOccupancy = tOccupancy;
356 signal2buffer(signal, bnr, gsn, buf);
357 //---------------------------------------------------------
358 // Prefetch of buffer[wPtr] is done here. We prefetch for
359 // write both the first cache line and the next 64 byte
360 // entry
361 //---------------------------------------------------------
362 NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
363 NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
364 } else {
365 jbuf_error();
366 }//if
367 }
368
369
370 inline
371 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 myWPtr)372 APZJobBuffer::insert(Signal* signal, BlockNumber bnr,
373 GlobalSignalNumber gsn, Uint32 myWPtr)
374 {
375 register BufferEntry& buf = buffer[myWPtr];
376 signal2buffer(signal, bnr, gsn, buf);
377 }
378
379
380 #undef JAM_FILE_ID
381
382 #endif
383