1 /*
2 Copyright (c) 2003, 2018, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #ifndef FastScheduler_H
26 #define FastScheduler_H
27
28 #include <VMSignal.hpp>
29 #include <kernel_types.h>
30 #include <Prio.hpp>
31 #include <SignalLoggerManager.hpp>
32 #include <SimulatedBlock.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <GlobalData.hpp>
35 #include <TransporterDefinitions.hpp>
36 #include <portlib/ndb_prefetch.h>
37 #include <portlib/NdbTick.h>
38
39 #define JAM_FILE_ID 244
40
41
42 #define MAX_SIGNALS_EXECUTED_BEFORE_ZERO_TIME_QUEUE_SCAN 100
43 #define MAX_OCCUPANCY 1024
44
45 #define JBASIZE 1280 // Jobs which have dead lines to meet use this level
46 #define JBBSIZE 4096 // Most jobs use this level
47 #define JBCSIZE 64 // Only used by STTOR and STTORRY currently
48 #define JBDSIZE 4096 // Time Queue uses this level for storage, not supported
49 // as priority level
50 void bnr_error();
51 void jbuf_error();
52 class Signal;
53 class Block;
54
55 class BufferEntry
56 {
57 public:
58 SignalHeader header;
59 Uint32 theDataRegister[25];
60 };
61
62 class APZJobBuffer
63 {
64 public:
65 APZJobBuffer();
66 ~APZJobBuffer();
67
68 void newBuffer(int size);
69
70 void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn);
71 void insert(const SignalHeader * const sh, const Uint32 * const theData, const Uint32 secPtrI[3]);
72 void insert(Signal* signal, BlockNumber bnr, GlobalSignalNumber gsn,
73 Uint32 myWPtr);
74
75 Uint32 retrieve(Signal *signal);
76 void retrieve(Signal *signal, Uint32 myRptr);
77
78 /**
79 * Used when dumping to trace file
80 */
81 void retrieveDump(Signal *signal, Uint32 myRptr);
82
83 void clear();
84 Uint32 getOccupancy() const;
85
86 Uint32 getReadPtr() const;
87 Uint32 getWritePtr() const;
88 Uint32 getBufSize() const;
89
90 private:
91 void signal2buffer(Signal* signal, BlockNumber bnr,
92 GlobalSignalNumber gsn, BufferEntry& buf);
93 Uint32 rPtr;
94 Uint32 wPtr;
95 Uint32 theOccupancy;
96 Uint32 bufSize;
97 BufferEntry* buffer;
98 BufferEntry* memRef;
99 };
100
101
102 class FastScheduler
103 {
104 public:
105 FastScheduler();
106 ~FastScheduler();
107
108 Uint32 doJob(Uint32 loopStartCount);
109 void postPoll();
110 int checkDoJob();
111
112 void activateSendPacked();
113
114 void execute(Signal* signal,
115 Priority prio,
116 BlockNumber bnr,
117 GlobalSignalNumber gsn);
118
119 void execute(const SignalHeader * const sh,
120 Uint8 prio, const Uint32 * const theData, const Uint32 secPtr[3]);
121
122 void clear();
123 Signal* getVMSignals();
124
125 Priority highestAvailablePrio() const;
126 Uint32 getBOccupancy() const;
127 void sendPacked();
128
129 void insertTimeQueue(Signal* aSignal, BlockNumber bnr,
130 GlobalSignalNumber gsn, Uint32 aIndex);
131 void scheduleTimeQueue(Uint32 aIndex);
132
133 /*
134 The following implement aspects of ErrorReporter that differs between
135 singlethreaded and multithread ndbd.
136 */
137
138 /* Called before dumping, intended to stop any still running processing. */
139 void traceDumpPrepare(NdbShutdownType&);
140 /* Number of threads to create trace files for (thread id 0 .. N-1). */
141 Uint32 traceDumpGetNumThreads();
142
143 int traceDumpGetCurrentThread(); // returns -1 if not found
144
145 /* Get jam() buffers etc. for specific thread. */
146 bool traceDumpGetJam(Uint32 thr_no,
147 const JamEvent * & thrdTheEmulatedJam,
148 Uint32 & thrdTheEmulatedJamIndex);
149 /* Produce a signal dump. */
150 void dumpSignalMemory(Uint32 thr_no, FILE * output);
151
152 void reportThreadConfigLoop(Uint32 expired_time, Uint32 extra_constant,
153 Uint32 *no_exec_loops, Uint32 *tot_exec_time,
154 Uint32 *no_extra_loops, Uint32 *tot_extra_time);
155
156 /* Get/Set high resolution timer in microseconds */
getHighResTimer()157 NDB_TICKS getHighResTimer() { return curr_ticks; }
getHighResTimerPtr()158 const NDB_TICKS* getHighResTimerPtr() { return &curr_ticks; }
setHighResTimer(NDB_TICKS ticks)159 void setHighResTimer(NDB_TICKS ticks)
160 { curr_ticks = ticks;}
161 private:
162 void highestAvailablePrio(Priority prio);
163 void reportJob(Priority aPriority);
164 void prio_level_error();
165
166 Uint32 theDoJobTotalCounter;
167 Uint32 theDoJobCallCounter;
168 NDB_TICKS curr_ticks;
169 Uint8 theJobPriority[4096];
170 APZJobBuffer theJobBuffers[JB_LEVELS];
171
172 void reportDoJobStatistics(Uint32 meanLoopCount);
173 };
174
175 inline
176 Uint32
getBOccupancy() const177 FastScheduler::getBOccupancy() const {
178 return theJobBuffers[JBB].getOccupancy();
179 }//FastScheduler::getBOccupancy()
180
181 inline
182 int
checkDoJob()183 FastScheduler::checkDoJob()
184 {
185 /*
186 * Job buffer overload protetction
187 * If the job buffer B is filled over a certain limit start
188 * to execute the signals in the job buffer's
189 */
190 if (getBOccupancy() < MAX_OCCUPANCY) {
191 return 0;
192 } else {
193 Uint32 loopStartCount = 0;
194 doJob(loopStartCount);
195 return 1;
196 }//if
197 }//FastScheduler::checkDoJob()
198
199 inline
200 void
reportJob(Priority aPriority)201 FastScheduler::reportJob(Priority aPriority)
202 {
203 Uint32 tJobCounter = globalData.JobCounter;
204 Uint64 tJobLap = globalData.JobLap;
205 theJobPriority[tJobCounter] = (Uint8)aPriority;
206 globalData.JobCounter = (tJobCounter + 1) & 4095;
207 globalData.JobLap = tJobLap + 1;
208 }
209
210 inline
211 Priority
highestAvailablePrio() const212 FastScheduler::highestAvailablePrio() const
213 {
214 return (Priority)globalData.highestAvailablePrio;
215 }
216
217 inline
218 void
highestAvailablePrio(Priority prio)219 FastScheduler::highestAvailablePrio(Priority prio)
220 {
221 globalData.highestAvailablePrio = (Uint32)prio;
222 }
223
224 inline
225 Signal*
getVMSignals()226 FastScheduler::getVMSignals()
227 {
228 return &globalData.VMSignals[0];
229 }
230
231
232 // Inserts of a protocol object into the Job Buffer.
233 inline
234 void
execute(const SignalHeader * const sh,Uint8 prio,const Uint32 * const theData,const Uint32 secPtrI[3])235 FastScheduler::execute(const SignalHeader * const sh, Uint8 prio,
236 const Uint32 * const theData, const Uint32 secPtrI[3]){
237 #ifdef VM_TRACE
238 if (prio >= LEVEL_IDLE)
239 prio_level_error();
240 #endif
241
242 theJobBuffers[prio].insert(sh, theData, secPtrI);
243 if (prio < (Uint8)highestAvailablePrio())
244 highestAvailablePrio((Priority)prio);
245 }
246
247 inline
248 void
execute(Signal * signal,Priority prio,BlockNumber bnr,GlobalSignalNumber gsn)249 FastScheduler::execute(Signal* signal, Priority prio,
250 BlockNumber bnr, GlobalSignalNumber gsn)
251 {
252 #ifdef VM_TRACE
253 if (prio >= LEVEL_IDLE)
254 prio_level_error();
255 #endif
256 theJobBuffers[prio].insert(signal, bnr, gsn);
257 if (prio < highestAvailablePrio())
258 highestAvailablePrio(prio);
259 }
260
261 inline
262 void
insertTimeQueue(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 aIndex)263 FastScheduler::insertTimeQueue(Signal* signal, BlockNumber bnr,
264 GlobalSignalNumber gsn, Uint32 aIndex)
265 {
266 theJobBuffers[3].insert(signal, bnr, gsn, aIndex);
267 }
268
269 inline
270 void
scheduleTimeQueue(Uint32 aIndex)271 FastScheduler::scheduleTimeQueue(Uint32 aIndex)
272 {
273 Signal* signal = getVMSignals();
274 theJobBuffers[3].retrieve(signal, aIndex);
275 theJobBuffers[0].insert
276 (signal,
277 (BlockNumber)signal->header.theReceiversBlockNumber,
278 (GlobalSignalNumber)signal->header.theVerId_signalNumber);
279 if (highestAvailablePrio() > JBA)
280 highestAvailablePrio(JBA);
281
282 signal->header.m_noOfSections = 0; // Or else sendPacked might pick it up
283 }
284
285 inline
286 Uint32
getWritePtr() const287 APZJobBuffer::getWritePtr() const
288 {
289 return wPtr;
290 }
291
292 inline
293 Uint32
getReadPtr() const294 APZJobBuffer::getReadPtr() const
295 {
296 return rPtr;
297 }
298
299 inline
300 Uint32
getOccupancy() const301 APZJobBuffer::getOccupancy() const
302 {
303 return theOccupancy;
304 }
305
306 inline
307 Uint32
getBufSize() const308 APZJobBuffer::getBufSize() const
309 {
310 return bufSize;
311 }
312
313 inline
314 void
retrieve(Signal * signal,Uint32 myRptr)315 APZJobBuffer::retrieve(Signal* signal, Uint32 myRptr)
316 {
317 BufferEntry& buf = buffer[myRptr];
318
319 buf.header.theSignalId = globalData.theSignalId++;
320
321 signal->header = buf.header;
322
323 Uint32 *from = (Uint32*) &buf.theDataRegister[0];
324 Uint32 *to = (Uint32*) &signal->theData[0];
325 Uint32 noOfWords = buf.header.theLength + buf.header.m_noOfSections;
326 for(; noOfWords; noOfWords--)
327 *to++ = *from++;
328 // Copy sections references (copy all without if-statements)
329 return;
330 }
331
332 inline
333 void
retrieveDump(Signal * signal,Uint32 myRptr)334 APZJobBuffer::retrieveDump(Signal* signal, Uint32 myRptr)
335 {
336 /**
337 * Note that signal id is not taken from global data
338 */
339
340 BufferEntry& buf = buffer[myRptr];
341 signal->header = buf.header;
342
343 Uint32 *from = (Uint32*) &buf.theDataRegister[0];
344 Uint32 *to = (Uint32*) &signal->theData[0];
345 Uint32 noOfWords = buf.header.theLength;
346 for(; noOfWords; noOfWords--)
347 *to++ = *from++;
348 return;
349 }
350
351 inline
352 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn)353 APZJobBuffer::insert(Signal* signal,
354 BlockNumber bnr, GlobalSignalNumber gsn)
355 {
356 Uint32 tOccupancy = theOccupancy + 1;
357 Uint32 myWPtr = wPtr;
358 if (tOccupancy < bufSize) {
359 BufferEntry& buf = buffer[myWPtr];
360 Uint32 cond = (++myWPtr == bufSize) - 1;
361 wPtr = myWPtr & cond;
362 theOccupancy = tOccupancy;
363 signal2buffer(signal, bnr, gsn, buf);
364 //---------------------------------------------------------
365 // Prefetch of buffer[wPtr] is done here. We prefetch for
366 // write both the first cache line and the next 64 byte
367 // entry
368 //---------------------------------------------------------
369 NDB_PREFETCH_WRITE((void*)&buffer[wPtr]);
370 NDB_PREFETCH_WRITE((void*)(((char*)&buffer[wPtr]) + 64));
371 } else {
372 jbuf_error();
373 }//if
374 }
375
376
377 inline
378 void
insert(Signal * signal,BlockNumber bnr,GlobalSignalNumber gsn,Uint32 myWPtr)379 APZJobBuffer::insert(Signal* signal, BlockNumber bnr,
380 GlobalSignalNumber gsn, Uint32 myWPtr)
381 {
382 BufferEntry& buf = buffer[myWPtr];
383 signal2buffer(signal, bnr, gsn, buf);
384 }
385
386
387 #undef JAM_FILE_ID
388
389 #endif
390