1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "ThreadConfig.hpp"
26 #include "Emulator.hpp"
27 #include "GlobalData.hpp"
28 #include "TimeQueue.hpp"
29 #include "TransporterRegistry.hpp"
30 #include "FastScheduler.hpp"
31 #include "pc.hpp"
32
33 #include <GlobalSignalNumbers.h>
34 #include <BlockNumbers.h>
35
36 #include <NdbSleep.h>
37 #include <NdbTick.h>
38 #include <NdbOut.hpp>
39 #include <WatchDog.hpp>
40
41 #include <EventLogger.hpp>
42 extern EventLogger * g_eventLogger;
43
44 #include <signaldata/StartOrd.hpp>
45
ThreadConfig()46 ThreadConfig::ThreadConfig()
47 {
48 }
49
~ThreadConfig()50 ThreadConfig::~ThreadConfig()
51 {
52 }
53
54 void
init()55 ThreadConfig::init()
56 {
57 }
58
59 /**
60 * For each millisecond that has passed since this function was last called:
61 * Scan the job buffer and increment the internalMillisecCounter
62 * with 1 to keep track of where we are
63 */
64 inline
65 void
scanTimeQueue()66 ThreadConfig::scanTimeQueue()
67 {
68 unsigned int maxCounter;
69 Uint64 currMilliSecond;
70 maxCounter = 0;
71 currMilliSecond = NdbTick_CurrentMillisecond();
72 if (currMilliSecond < globalData.internalMillisecCounter) {
73 //--------------------------------------------------------------------
74 // This could occur around 2036 or if the operator decides to change
75 // time backwards. We cannot know how long time has past since last
76 // time and we make a best try with 0 milliseconds.
77 //--------------------------------------------------------------------
78 g_eventLogger->warning("Time moved backwards with %llu ms",
79 globalData.internalMillisecCounter-currMilliSecond);
80 globalData.internalMillisecCounter = currMilliSecond;
81 }//if
82 if (currMilliSecond > (globalData.internalMillisecCounter + 1500)) {
83 //--------------------------------------------------------------------
84 // Time has moved forward more than a second. Either it could happen
85 // if operator changed the time or if the OS has misbehaved badly.
86 // We set the new time to one second from the past.
87 //--------------------------------------------------------------------
88 g_eventLogger->warning("Time moved forward with %llu ms",
89 currMilliSecond-globalData.internalMillisecCounter);
90 globalData.internalMillisecCounter = currMilliSecond - 1000;
91 }//if
92 while (((currMilliSecond - globalData.internalMillisecCounter) > 0) &&
93 (maxCounter < 20)){
94 globalData.internalMillisecCounter++;
95 maxCounter++;
96 globalTimeQueue.scanTable();
97 }//while
98 }//ThreadConfig::scanTimeQueue()
99
100
101 //--------------------------------------------------------------------
102 // ipControlLoop -- The main loop of ndb.
103 // Handles the scheduling of signal execution and input/output
104 // One lap in the loop should take approximately 10 milli seconds
105 // If the jobbuffer is empty and the laptime is less than 10 milliseconds
106 // at the end of the loop
107 // the TransporterRegistry is called in order to sleep on the IO ports
108 // waiting for another incoming signal to wake us up.
109 // The timeout value in this call is calculated as (10 ms - laptime)
110 // This would make ndb use less cpu while improving response time.
111 //--------------------------------------------------------------------
ipControlLoop(NdbThread *,Uint32 thread_index)112 void ThreadConfig::ipControlLoop(NdbThread*, Uint32 thread_index)
113 {
114 globalEmulatorData.theConfiguration->setAllLockCPU(true);
115
116 Uint32 execute_loop_constant =
117 globalEmulatorData.theConfiguration->schedulerExecutionTimer();
118 Uint32 min_spin_time =
119 globalEmulatorData.theConfiguration->schedulerSpinTimer();
120 struct MicroSecondTimer start_micro, end_micro, statistics_start_micro;
121 struct MicroSecondTimer yield_micro;
122 Uint32 no_exec_loops = 0;
123 Uint32 no_extra_loops = 0;
124 Uint32 tot_exec_time = 0;
125 Uint32 tot_extra_time = 0;
126 Uint32 timeOutMillis;
127 Uint32 micros_passed;
128 bool spinning;
129 bool yield_flag= FALSE;
130 int res1 = 0;
131 int res2 = 0;
132 int res3 = 0;
133 Uint32 i = 0;
134 Uint32 exec_again;
135
136 //--------------------------------------------------------------------
137 // initialise the counter that keeps track of the current millisecond
138 //--------------------------------------------------------------------
139 globalData.internalMillisecCounter = NdbTick_CurrentMillisecond();
140
141 Uint32 *watchCounter = globalData.getWatchDogPtr();
142 globalEmulatorData.theWatchDog->registerWatchedThread(watchCounter, 0);
143
144 res1 = NdbTick_getMicroTimer(&start_micro);
145 yield_micro = statistics_start_micro = end_micro = start_micro;
146 while (1)
147 {
148 timeOutMillis = 0;
149 //--------------------------------------------------------------------
150 // We send all messages buffered during execution of job buffers
151 //--------------------------------------------------------------------
152 globalData.incrementWatchDogCounter(6);
153 globalTransporterRegistry.performSend();
154
155 //--------------------------------------------------------------------
156 // Now it is time to check all interfaces. We will send all buffers
157 // plus checking for any received messages.
158 //--------------------------------------------------------------------
159 if (i++ >= 20)
160 {
161 execute_loop_constant =
162 globalEmulatorData.theConfiguration->schedulerExecutionTimer();
163 min_spin_time =
164 globalEmulatorData.theConfiguration->schedulerSpinTimer();
165 globalData.incrementWatchDogCounter(5);
166 globalTransporterRegistry.update_connections();
167 i = 0;
168 }
169 spinning = false;
170 do
171 {
172 //--------------------------------------------------------------------
173 // We scan the time queue to see if there are any timed signals that
174 // is now ready to be executed.
175 //--------------------------------------------------------------------
176 globalData.incrementWatchDogCounter(2);
177 scanTimeQueue();
178
179 if (LEVEL_IDLE == globalData.highestAvailablePrio)
180 {
181 //--------------------------------------------------------------------
182 // The buffers are empty, we need to wait for a while until we continue.
183 // We cannot wait forever since we can also have timed events.
184 //--------------------------------------------------------------------
185 // We set the time to sleep on sockets before waking up to 10
186 // milliseconds unless we have set spin timer to be larger than 0. In
187 // this case we spin checking for events on the transporter until we
188 // have expired the spin time.
189 //--------------------------------------------------------------------
190 timeOutMillis = 10;
191 if (min_spin_time && !yield_flag)
192 {
193 if (spinning)
194 res2 = NdbTick_getMicroTimer(&end_micro);
195 if (!(res1 + res2))
196 {
197 micros_passed =
198 (Uint32)NdbTick_getMicrosPassed(start_micro, end_micro);
199 if (micros_passed < min_spin_time)
200 timeOutMillis = 0;
201 }
202 }
203 }
204 if (spinning && timeOutMillis > 0 && i++ >= 20)
205 {
206 globalData.incrementWatchDogCounter(5);
207 globalTransporterRegistry.update_connections();
208 i = 0;
209 }
210
211 //--------------------------------------------------------------------
212 // Perform receive before entering execute loop
213 //--------------------------------------------------------------------
214 globalData.incrementWatchDogCounter(7);
215 {
216 bool poll_flag;
217 if (yield_flag)
218 {
219 globalEmulatorData.theConfiguration->yield_main(thread_index, TRUE);
220 poll_flag= globalTransporterRegistry.pollReceive(timeOutMillis);
221 globalEmulatorData.theConfiguration->yield_main(thread_index, FALSE);
222 res3= NdbTick_getMicroTimer(&yield_micro);
223 }
224 else
225 poll_flag= globalTransporterRegistry.pollReceive(timeOutMillis);
226 if (poll_flag)
227 {
228 globalData.incrementWatchDogCounter(8);
229 globalTransporterRegistry.performReceive();
230 }
231 yield_flag= FALSE;
232 }
233 spinning = true;
234 globalScheduler.postPoll();
235 //--------------------------------------------------------------------
236 // In an idle system we will use this loop to wait either for external
237 // signal received or a message generated by the time queue.
238 //--------------------------------------------------------------------
239 } while (LEVEL_IDLE == globalData.highestAvailablePrio);
240 //--------------------------------------------------------------------
241 // Get current microsecond to ensure we will continue executing
242 // signals for at least a configured time while there are more
243 // signals to receive.
244 //--------------------------------------------------------------------
245 res1= NdbTick_getMicroTimer(&start_micro);
246 if ((res1 + res3) ||
247 ((Uint32)NdbTick_getMicrosPassed(start_micro, yield_micro) > 10000))
248 yield_flag= TRUE;
249 exec_again= 0;
250 do
251 {
252 //--------------------------------------------------------------------
253 // This is where the actual execution of signals occur. We execute
254 // until all buffers are empty or until we have executed 2048 signals.
255 //--------------------------------------------------------------------
256 globalScheduler.doJob();
257 if (unlikely(globalData.theRestartFlag == perform_stop))
258 goto out;
259 //--------------------------------------------------------------------
260 // Get timer after executing this set of jobs. If we have passed the
261 // maximum execution time we will break out of the loop always
262 // otherwise we will check for new received signals before executing
263 // the send of the buffers.
264 // By setting exec_loop_constant to 0 we go back to the traditional
265 // algorithm of sending once per receive instance.
266 //--------------------------------------------------------------------
267 if (!execute_loop_constant && !min_spin_time)
268 break;
269 res2= NdbTick_getMicroTimer(&end_micro);
270 if (res2)
271 break;
272 micros_passed = (Uint32)NdbTick_getMicrosPassed(start_micro, end_micro);
273 tot_exec_time += micros_passed;
274 if (no_exec_loops++ >= 8192)
275 {
276 Uint32 expired_time =
277 (Uint32)NdbTick_getMicrosPassed(statistics_start_micro, end_micro);
278 statistics_start_micro = end_micro;
279 globalScheduler.reportThreadConfigLoop(expired_time,
280 execute_loop_constant,
281 &no_exec_loops,
282 &tot_exec_time,
283 &no_extra_loops,
284 &tot_extra_time);
285 }
286 /*
287 Continue our execution if micros_passed since last round is smaller
288 than the configured constant. Given that we don't recall the
289 actual start time of this loop we insert an extra check to ensure we
290 don't enter an eternal loop here. We'll never execute more than
291 3 times before sending.
292 */
293 if (micros_passed > execute_loop_constant || (exec_again > 1))
294 break;
295 exec_again++;
296 //--------------------------------------------------------------------
297 // There were still time for execution left, we check if there are
298 // signals newly received on the transporters and if so we execute one
299 // more round before sending the buffered signals.
300 //--------------------------------------------------------------------
301 globalData.incrementWatchDogCounter(7);
302 if (!globalTransporterRegistry.pollReceive(0))
303 break;
304
305 no_extra_loops++;
306 tot_extra_time += micros_passed;
307 start_micro = end_micro;
308 globalData.incrementWatchDogCounter(8);
309 globalTransporterRegistry.performReceive();
310 } while (1);
311 }
312 out:
313 globalData.incrementWatchDogCounter(6);
314 globalTransporterRegistry.performSend();
315
316 globalEmulatorData.theWatchDog->unregisterWatchedThread(0);
317
318 }//ThreadConfig::ipControlLoop()
319
320 int
doStart(NodeState::StartLevel startLevel)321 ThreadConfig::doStart(NodeState::StartLevel startLevel){
322
323 SignalHeader sh;
324 memset(&sh, 0, sizeof(SignalHeader));
325
326 sh.theVerId_signalNumber = GSN_START_ORD;
327 sh.theReceiversBlockNumber = CMVMI;
328 sh.theSendersBlockRef = 0;
329 sh.theTrace = 0;
330 sh.theSignalId = 0;
331 sh.theLength = StartOrd::SignalLength;
332
333 union {
334 Uint32 theData[25];
335 StartOrd startOrd;
336 };
337 startOrd.restartInfo = 0;
338
339 Uint32 secPtrI[3];
340 globalScheduler.execute(&sh, JBA, theData, secPtrI);
341 return 0;
342 }
343
344