1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "ThreadConfig.hpp"
26 #include "Emulator.hpp"
27 #include "GlobalData.hpp"
28 #include "TimeQueue.hpp"
29 #include "TransporterRegistry.hpp"
30 #include "FastScheduler.hpp"
31 #include "pc.hpp"
32 
33 #include <GlobalSignalNumbers.h>
34 #include <BlockNumbers.h>
35 
36 #include <NdbSleep.h>
37 #include <NdbTick.h>
38 #include <NdbOut.hpp>
39 #include <WatchDog.hpp>
40 
41 #include <EventLogger.hpp>
42 extern EventLogger * g_eventLogger;
43 
44 #include <signaldata/StartOrd.hpp>
45 
ThreadConfig()46 ThreadConfig::ThreadConfig()
47 {
48 }
49 
~ThreadConfig()50 ThreadConfig::~ThreadConfig()
51 {
52 }
53 
54 void
init()55 ThreadConfig::init()
56 {
57 }
58 
59 /**
60  * For each millisecond that has passed since this function was last called:
61  *   Scan the job buffer and increment the internalMillisecCounter
62  *      with 1 to keep track of where we are
63  */
64 inline
65 void
scanTimeQueue()66 ThreadConfig::scanTimeQueue()
67 {
68   unsigned int maxCounter;
69   Uint64 currMilliSecond;
70   maxCounter = 0;
71   currMilliSecond = NdbTick_CurrentMillisecond();
72   if (currMilliSecond < globalData.internalMillisecCounter) {
73 //--------------------------------------------------------------------
74 // This could occur around 2036 or if the operator decides to change
75 // time backwards. We cannot know how long time has past since last
76 // time and we make a best try with 0 milliseconds.
77 //--------------------------------------------------------------------
78     g_eventLogger->warning("Time moved backwards with %llu ms",
79                            globalData.internalMillisecCounter-currMilliSecond);
80     globalData.internalMillisecCounter = currMilliSecond;
81   }//if
82   if (currMilliSecond > (globalData.internalMillisecCounter + 1500)) {
83 //--------------------------------------------------------------------
84 // Time has moved forward more than a second. Either it could happen
85 // if operator changed the time or if the OS has misbehaved badly.
86 // We set the new time to one second from the past.
87 //--------------------------------------------------------------------
88     g_eventLogger->warning("Time moved forward with %llu ms",
89                            currMilliSecond-globalData.internalMillisecCounter);
90     globalData.internalMillisecCounter = currMilliSecond - 1000;
91   }//if
92   while (((currMilliSecond - globalData.internalMillisecCounter) > 0) &&
93          (maxCounter < 20)){
94     globalData.internalMillisecCounter++;
95     maxCounter++;
96     globalTimeQueue.scanTable();
97   }//while
98 }//ThreadConfig::scanTimeQueue()
99 
100 
101 //--------------------------------------------------------------------
102 // ipControlLoop -- The main loop of ndb.
103 // Handles the scheduling of signal execution and input/output
104 // One lap in the loop should take approximately 10 milli seconds
105 // If the jobbuffer is empty and the laptime is less than 10 milliseconds
106 // at the end of the loop
107 // the TransporterRegistry is called in order to sleep on the IO ports
108 // waiting for another incoming signal to wake us up.
109 // The timeout value in this call is calculated as (10 ms - laptime)
110 // This would make ndb use less cpu while improving response time.
111 //--------------------------------------------------------------------
ipControlLoop(NdbThread *,Uint32 thread_index)112 void ThreadConfig::ipControlLoop(NdbThread*, Uint32 thread_index)
113 {
114   globalEmulatorData.theConfiguration->setAllLockCPU(true);
115 
116   Uint32 execute_loop_constant =
117         globalEmulatorData.theConfiguration->schedulerExecutionTimer();
118   Uint32 min_spin_time =
119     globalEmulatorData.theConfiguration->schedulerSpinTimer();
120   struct MicroSecondTimer start_micro, end_micro, statistics_start_micro;
121   struct MicroSecondTimer yield_micro;
122   Uint32 no_exec_loops = 0;
123   Uint32 no_extra_loops = 0;
124   Uint32 tot_exec_time = 0;
125   Uint32 tot_extra_time = 0;
126   Uint32 timeOutMillis;
127   Uint32 micros_passed;
128   bool spinning;
129   bool yield_flag= FALSE;
130   int res1 = 0;
131   int res2 = 0;
132   int res3 = 0;
133   Uint32 i = 0;
134   Uint32 exec_again;
135 
136 //--------------------------------------------------------------------
137 // initialise the counter that keeps track of the current millisecond
138 //--------------------------------------------------------------------
139   globalData.internalMillisecCounter = NdbTick_CurrentMillisecond();
140 
141   Uint32 *watchCounter = globalData.getWatchDogPtr();
142   globalEmulatorData.theWatchDog->registerWatchedThread(watchCounter, 0);
143 
144   res1 = NdbTick_getMicroTimer(&start_micro);
145   yield_micro = statistics_start_micro = end_micro = start_micro;
146   while (1)
147   {
148     timeOutMillis = 0;
149 //--------------------------------------------------------------------
150 // We send all messages buffered during execution of job buffers
151 //--------------------------------------------------------------------
152     globalData.incrementWatchDogCounter(6);
153     globalTransporterRegistry.performSend();
154 
155 //--------------------------------------------------------------------
156 // Now it is time to check all interfaces. We will send all buffers
157 // plus checking for any received messages.
158 //--------------------------------------------------------------------
159     if (i++ >= 20)
160     {
161       execute_loop_constant =
162         globalEmulatorData.theConfiguration->schedulerExecutionTimer();
163       min_spin_time =
164         globalEmulatorData.theConfiguration->schedulerSpinTimer();
165       globalData.incrementWatchDogCounter(5);
166       globalTransporterRegistry.update_connections();
167       i = 0;
168     }
169     spinning = false;
170     do
171     {
172 //--------------------------------------------------------------------
173 // We scan the time queue to see if there are any timed signals that
174 // is now ready to be executed.
175 //--------------------------------------------------------------------
176       globalData.incrementWatchDogCounter(2);
177       scanTimeQueue();
178 
179       if (LEVEL_IDLE == globalData.highestAvailablePrio)
180       {
181 //--------------------------------------------------------------------
182 // The buffers are empty, we need to wait for a while until we continue.
183 // We cannot wait forever since we can also have timed events.
184 //--------------------------------------------------------------------
185 // We set the time to sleep on sockets before waking up to 10
186 // milliseconds unless we have set spin timer to be larger than 0. In
187 // this case we spin checking for events on the transporter until we
188 // have expired the spin time.
189 //--------------------------------------------------------------------
190         timeOutMillis = 10;
191         if (min_spin_time && !yield_flag)
192         {
193           if (spinning)
194             res2 = NdbTick_getMicroTimer(&end_micro);
195           if (!(res1 + res2))
196           {
197             micros_passed =
198               (Uint32)NdbTick_getMicrosPassed(start_micro, end_micro);
199             if (micros_passed < min_spin_time)
200               timeOutMillis = 0;
201           }
202         }
203       }
204       if (spinning && timeOutMillis > 0 && i++ >= 20)
205       {
206         globalData.incrementWatchDogCounter(5);
207         globalTransporterRegistry.update_connections();
208         i = 0;
209       }
210 
211 //--------------------------------------------------------------------
212 // Perform receive before entering execute loop
213 //--------------------------------------------------------------------
214       globalData.incrementWatchDogCounter(7);
215       {
216         bool poll_flag;
217         if (yield_flag)
218         {
219           globalEmulatorData.theConfiguration->yield_main(thread_index, TRUE);
220           poll_flag= globalTransporterRegistry.pollReceive(timeOutMillis);
221           globalEmulatorData.theConfiguration->yield_main(thread_index, FALSE);
222           res3= NdbTick_getMicroTimer(&yield_micro);
223         }
224         else
225           poll_flag= globalTransporterRegistry.pollReceive(timeOutMillis);
226         if (poll_flag)
227         {
228           globalData.incrementWatchDogCounter(8);
229           globalTransporterRegistry.performReceive();
230         }
231         yield_flag= FALSE;
232       }
233       spinning = true;
234       globalScheduler.postPoll();
235 //--------------------------------------------------------------------
236 // In an idle system we will use this loop to wait either for external
237 // signal received or a message generated by the time queue.
238 //--------------------------------------------------------------------
239     } while (LEVEL_IDLE == globalData.highestAvailablePrio);
240 //--------------------------------------------------------------------
241 // Get current microsecond to ensure we will continue executing
242 // signals for at least a configured time while there are more
243 // signals to receive.
244 //--------------------------------------------------------------------
245     res1= NdbTick_getMicroTimer(&start_micro);
246     if ((res1 + res3) ||
247         ((Uint32)NdbTick_getMicrosPassed(start_micro, yield_micro) > 10000))
248       yield_flag= TRUE;
249     exec_again= 0;
250     do
251     {
252 //--------------------------------------------------------------------
253 // This is where the actual execution of signals occur. We execute
254 // until all buffers are empty or until we have executed 2048 signals.
255 //--------------------------------------------------------------------
256       globalScheduler.doJob();
257       if (unlikely(globalData.theRestartFlag == perform_stop))
258         goto out;
259 //--------------------------------------------------------------------
260 // Get timer after executing this set of jobs. If we have passed the
261 // maximum execution time we will break out of the loop always
262 // otherwise we will check for new received signals before executing
263 // the send of the buffers.
264 // By setting exec_loop_constant to 0 we go back to the traditional
265 // algorithm of sending once per receive instance.
266 //--------------------------------------------------------------------
267       if (!execute_loop_constant && !min_spin_time)
268         break;
269       res2= NdbTick_getMicroTimer(&end_micro);
270       if (res2)
271         break;
272       micros_passed = (Uint32)NdbTick_getMicrosPassed(start_micro, end_micro);
273       tot_exec_time += micros_passed;
274       if (no_exec_loops++ >= 8192)
275       {
276         Uint32 expired_time =
277           (Uint32)NdbTick_getMicrosPassed(statistics_start_micro, end_micro);
278         statistics_start_micro = end_micro;
279         globalScheduler.reportThreadConfigLoop(expired_time,
280                                                execute_loop_constant,
281                                                &no_exec_loops,
282                                                &tot_exec_time,
283                                                &no_extra_loops,
284                                                &tot_extra_time);
285       }
286       /*
287         Continue our execution if micros_passed since last round is smaller
288         than the configured constant. Given that we don't recall the
289         actual start time of this loop we insert an extra check to ensure we
290         don't enter an eternal loop here. We'll never execute more than
291         3 times before sending.
292       */
293       if (micros_passed > execute_loop_constant || (exec_again > 1))
294         break;
295       exec_again++;
296 //--------------------------------------------------------------------
297 // There were still time for execution left, we check if there are
298 // signals newly received on the transporters and if so we execute one
299 // more round before sending the buffered signals.
300 //--------------------------------------------------------------------
301       globalData.incrementWatchDogCounter(7);
302       if (!globalTransporterRegistry.pollReceive(0))
303         break;
304 
305       no_extra_loops++;
306       tot_extra_time += micros_passed;
307       start_micro = end_micro;
308       globalData.incrementWatchDogCounter(8);
309       globalTransporterRegistry.performReceive();
310     } while (1);
311   }
312 out:
313   globalData.incrementWatchDogCounter(6);
314   globalTransporterRegistry.performSend();
315 
316   globalEmulatorData.theWatchDog->unregisterWatchedThread(0);
317 
318 }//ThreadConfig::ipControlLoop()
319 
320 int
doStart(NodeState::StartLevel startLevel)321 ThreadConfig::doStart(NodeState::StartLevel startLevel){
322 
323   SignalHeader sh;
324   memset(&sh, 0, sizeof(SignalHeader));
325 
326   sh.theVerId_signalNumber   = GSN_START_ORD;
327   sh.theReceiversBlockNumber = CMVMI;
328   sh.theSendersBlockRef      = 0;
329   sh.theTrace                = 0;
330   sh.theSignalId             = 0;
331   sh.theLength               = StartOrd::SignalLength;
332 
333   union {
334     Uint32 theData[25];
335     StartOrd startOrd;
336   };
337   startOrd.restartInfo = 0;
338 
339   Uint32 secPtrI[3];
340   globalScheduler.execute(&sh, JBA, theData, secPtrI);
341   return 0;
342 }
343 
344