1 /*
2 * Copyright (C) 2010 Sebastian Held (sebastian.held@gmx.de)
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 //#define ENABLE_DEBUG_TIME
19
20 #ifdef ENABLE_DEBUG_TIME
21 #define DEBUG_TIME(x) x;
22 #else
23 #define DEBUG_TIME(x) ;
24 #endif
25
26
27
28 #include "engine_multithread.h"
29 #include "extensions/engine_extension.h"
30 #include "tools/array_ops.h"
31
32 #include "boost/date_time/posix_time/posix_time.hpp"
33 #include "boost/date_time/gregorian/gregorian.hpp"
34 #include <iomanip>
35
36 #ifndef SSE_CORRECT_DENORMALS
37 #include <xmmintrin.h>
38 #endif
39
40 //! \brief construct an Engine_Multithread instance
41 //! it's the responsibility of the caller to free the returned pointer
New(const Operator_Multithread * op,unsigned int numThreads)42 Engine_Multithread* Engine_Multithread::New(const Operator_Multithread* op, unsigned int numThreads)
43 {
44 cout << "Create FDTD engine (compressed SSE + multi-threading)" << endl;
45 Engine_Multithread* e = new Engine_Multithread(op);
46 e->setNumThreads( numThreads );
47 e->Init();
48 return e;
49 }
50
Engine_Multithread(const Operator_Multithread * op)51 Engine_Multithread::Engine_Multithread(const Operator_Multithread* op) : ENGINE_MULTITHREAD_BASE(op)
52 {
53 m_Op_MT = op;
54 m_type = SSE;
55 m_IterateBarrier = 0;
56 m_startBarrier = 0;
57 m_stopBarrier = 0;
58
59 #ifdef ENABLE_DEBUG_TIME
60 m_MPI_Barrier = 0;
61 #endif
62 }
63
~Engine_Multithread()64 Engine_Multithread::~Engine_Multithread()
65 {
66 #ifdef ENABLE_DEBUG_TIME
67 NS_Engine_Multithread::DBG().cout() << "Engine_Multithread::~Engine_Multithread()" << endl;
68 std::map<boost::thread::id, std::vector<double> >::iterator it;
69 for (it=m_timer_list.begin(); it!=m_timer_list.end(); it++)
70 {
71 NS_Engine_Multithread::DBG().cout() << "*** DEBUG Thread: " << it->first << std::endl;
72 std::vector<double>::iterator it2;
73 for (it2=it->second.begin(); it2<it->second.end();)
74 {
75 NS_Engine_Multithread::DBG().cout() << "after voltage update, before barrier1: " << fixed << setprecision(6) << *(it2++) << std::endl;
76 NS_Engine_Multithread::DBG().cout() << "after barrier1, before barrier2: " << fixed << setprecision(6) << *(it2++) << std::endl;
77 NS_Engine_Multithread::DBG().cout() << "after barrier2, before current update: " << fixed << setprecision(6) << *(it2++) << std::endl;
78 NS_Engine_Multithread::DBG().cout() << "after current update, before barrier3: " << fixed << setprecision(6) << *(it2++) << std::endl;
79 NS_Engine_Multithread::DBG().cout() << "after barrier3: " << fixed << setprecision(6) << *(it2++) << std::endl;
80 }
81 }
82 #endif
83
84 Reset();
85 }
86
setNumThreads(unsigned int numThreads)87 void Engine_Multithread::setNumThreads( unsigned int numThreads )
88 {
89 m_numThreads = numThreads;
90 }
91
Init()92 void Engine_Multithread::Init()
93 {
94 m_stopThreads = true;
95 ENGINE_MULTITHREAD_BASE::Init();
96
97 // initialize threads
98 m_stopThreads = false;
99 if (m_numThreads == 0)
100 m_numThreads = boost::thread::hardware_concurrency();
101
102 vector<unsigned int> m_Start_Lines;
103 vector<unsigned int> m_Stop_Lines;
104 m_Op_MT->CalcStartStopLines( m_numThreads, m_Start_Lines, m_Stop_Lines );
105
106 if (g_settings.GetVerboseLevel()>0)
107 cout << "Multithreaded engine using " << m_numThreads << " threads. Utilization: (";
108 m_IterateBarrier = new boost::barrier(m_numThreads); // numThread workers
109
110 m_startBarrier = new boost::barrier(m_numThreads+1); // numThread workers + 1 controller
111 m_stopBarrier = new boost::barrier(m_numThreads+1); // numThread workers + 1 controller
112 #ifdef MPI_SUPPORT
113 m_MPI_Barrier = 0;
114 #endif
115
116 for (unsigned int n=0; n<m_numThreads; n++)
117 {
118 unsigned int start = m_Start_Lines.at(n);
119 unsigned int stop = m_Stop_Lines.at(n);
120 unsigned int stop_h = stop;
121 if (n == m_numThreads-1)
122 {
123 // last thread
124 stop_h = stop-1;
125 if (g_settings.GetVerboseLevel()>0)
126 cout << stop-start+1 << ")" << endl;
127 }
128 else
129 if (g_settings.GetVerboseLevel()>0)
130 cout << stop-start+1 << ";";
131 // NS_Engine_Multithread::DBG().cout() << "###DEBUG## Thread " << n << ": start=" << start << " stop=" << stop << " stop_h=" << stop_h << std::endl;
132 boost::thread *t = new boost::thread( NS_Engine_Multithread::thread(this,start,stop,stop_h,n) );
133 m_thread_group.add_thread( t );
134 }
135
136 for (size_t n=0; n<m_Eng_exts.size(); ++n)
137 m_Eng_exts.at(n)->SetNumberOfThreads(m_numThreads);
138 }
139
Reset()140 void Engine_Multithread::Reset()
141 {
142 if (!m_stopThreads) // prevent multiple invocations
143 {
144 ClearExtensions(); //prevent extensions from interfering with thread reset...
145
146 // stop the threads
147 //NS_Engine_Multithread::DBG().cout() << "stopping all threads" << endl;
148 m_iterTS = 1;
149 m_startBarrier->wait(); // start the threads
150 m_stopThreads = true;
151 m_stopBarrier->wait(); // wait for the threads to finish
152 m_thread_group.join_all(); // wait for termination
153 delete m_IterateBarrier;
154 m_IterateBarrier = 0;
155 delete m_startBarrier;
156 m_startBarrier = 0;
157 delete m_stopBarrier;
158 m_stopBarrier = 0;
159 }
160
161 ENGINE_MULTITHREAD_BASE::Reset();
162 }
163
IterateTS(unsigned int iterTS)164 bool Engine_Multithread::IterateTS(unsigned int iterTS)
165 {
166 m_iterTS = iterTS;
167
168 //cout << "bool Engine_Multithread::IterateTS(): starting threads ...";
169 m_startBarrier->wait(); // start the threads
170
171 //cout << "... threads started";
172
173 m_stopBarrier->wait(); // wait for the threads to finish <iterTS> time steps
174 return true;
175 }
176
DoPreVoltageUpdates(int threadID)177 void Engine_Multithread::DoPreVoltageUpdates(int threadID)
178 {
179 //execute extensions in reverse order -> highest priority gets access to the voltages last
180 for (int n=m_Eng_exts.size()-1; n>=0; --n)
181 {
182 m_Eng_exts.at(n)->DoPreVoltageUpdates(threadID);
183 m_IterateBarrier->wait();
184 }
185
186 }
187
DoPostVoltageUpdates(int threadID)188 void Engine_Multithread::DoPostVoltageUpdates(int threadID)
189 {
190 //execute extensions in normal order -> highest priority gets access to the voltages first
191 for (size_t n=0; n<m_Eng_exts.size(); ++n)
192 {
193 m_Eng_exts.at(n)->DoPostVoltageUpdates(threadID);
194 m_IterateBarrier->wait();
195 }
196 }
197
Apply2Voltages(int threadID)198 void Engine_Multithread::Apply2Voltages(int threadID)
199 {
200 //execute extensions in normal order -> highest priority gets access to the voltages first
201 for (size_t n=0; n<m_Eng_exts.size(); ++n)
202 {
203 m_Eng_exts.at(n)->Apply2Voltages(threadID);
204 m_IterateBarrier->wait();
205 }
206 }
207
DoPreCurrentUpdates(int threadID)208 void Engine_Multithread::DoPreCurrentUpdates(int threadID)
209 {
210 //execute extensions in reverse order -> highest priority gets access to the currents last
211 for (int n=m_Eng_exts.size()-1; n>=0; --n)
212 {
213 m_Eng_exts.at(n)->DoPreCurrentUpdates(threadID);
214 m_IterateBarrier->wait();
215 }
216 }
217
DoPostCurrentUpdates(int threadID)218 void Engine_Multithread::DoPostCurrentUpdates(int threadID)
219 {
220 //execute extensions in normal order -> highest priority gets access to the currents first
221 for (size_t n=0; n<m_Eng_exts.size(); ++n)
222 {
223 m_Eng_exts.at(n)->DoPostCurrentUpdates(threadID);
224 m_IterateBarrier->wait();
225 }
226 }
227
Apply2Current(int threadID)228 void Engine_Multithread::Apply2Current(int threadID)
229 {
230 //execute extensions in normal order -> highest priority gets access to the currents first
231 for (size_t n=0; n<m_Eng_exts.size(); ++n)
232 {
233 m_Eng_exts.at(n)->Apply2Current(threadID);
234 m_IterateBarrier->wait();
235 }
236 }
237
238 //
239 // *************************************************************************************************************************
240 //
241 namespace NS_Engine_Multithread
242 {
243
thread(Engine_Multithread * ptr,unsigned int start,unsigned int stop,unsigned int stop_h,unsigned int threadID)244 thread::thread( Engine_Multithread* ptr, unsigned int start, unsigned int stop, unsigned int stop_h, unsigned int threadID )
245 {
246 m_enginePtr = ptr;
247 m_start = start;
248 m_stop = stop;
249 m_stop_h = stop_h;
250 m_threadID = threadID;
251 }
252
operator ()()253 void thread::operator()()
254 {
255 //std::cout << "thread::operator() Parameters: " << m_start << " " << m_stop << std::endl;
256 //DBG().cout() << "Thread " << m_threadID << " (" << boost::this_thread::get_id() << ") started." << endl;
257
258 // speed up the calculation of denormal floating point values (flush-to-zero)
259 #ifndef SSE_CORRECT_DENORMALS
260 unsigned int oldMXCSR = _mm_getcsr(); //read the old MXCSR setting
261 unsigned int newMXCSR = oldMXCSR | 0x8040; // set DAZ and FZ bits
262 _mm_setcsr( newMXCSR ); //write the new MXCSR setting to the MXCSR
263 #endif
264
265 while (!m_enginePtr->m_stopThreads)
266 {
267 // wait for start
268 //DBG().cout() << "Thread " << m_threadID << " (" << boost::this_thread::get_id() << ") waiting..." << endl;
269 m_enginePtr->m_startBarrier->wait();
270 //cout << "Thread " << boost::this_thread::get_id() << " waiting... started." << endl;
271
272 DEBUG_TIME( Timer timer1 );
273
274 for (unsigned int iter=0; iter<m_enginePtr->m_iterTS; ++iter)
275 {
276 // pre voltage stuff...
277 m_enginePtr->DoPreVoltageUpdates(m_threadID);
278
279 //voltage updates
280 m_enginePtr->UpdateVoltages(m_start,m_stop-m_start+1);
281
282 // record time
283 DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
284
285 //cout << "Thread " << boost::this_thread::get_id() << " m_barrier1 waiting..." << endl;
286 m_enginePtr->m_IterateBarrier->wait();
287
288 // record time
289 DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
290
291 //post voltage stuff...
292 m_enginePtr->DoPostVoltageUpdates(m_threadID);
293 m_enginePtr->Apply2Voltages(m_threadID);
294
295 #ifdef MPI_SUPPORT
296 if (m_threadID==0)
297 {
298 if (m_enginePtr->m_MPI_Barrier)
299 m_enginePtr->m_MPI_Barrier->wait();
300 m_enginePtr->SendReceiveVoltages();
301 }
302 m_enginePtr->m_IterateBarrier->wait();
303 #endif
304
305 // record time
306 DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
307
308 //pre current stuff
309 m_enginePtr->DoPreCurrentUpdates(m_threadID);
310
311 //current updates
312 m_enginePtr->UpdateCurrents(m_start,m_stop_h-m_start+1);
313
314 // record time
315 DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
316 m_enginePtr->m_IterateBarrier->wait();
317
318 // record time
319 DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
320
321 //post current stuff
322 m_enginePtr->DoPostCurrentUpdates(m_threadID);
323 m_enginePtr->Apply2Current(m_threadID);
324
325 #ifdef MPI_SUPPORT
326 if (m_threadID==0)
327 {
328 if (m_enginePtr->m_MPI_Barrier)
329 m_enginePtr->m_MPI_Barrier->wait();
330 m_enginePtr->SendReceiveCurrents();
331 }
332 m_enginePtr->m_IterateBarrier->wait();
333 #endif
334
335 if (m_threadID == 0)
336 ++m_enginePtr->numTS; // only the first thread increments numTS
337 }
338
339 m_enginePtr->m_stopBarrier->wait();
340 }
341
342 //DBG().cout() << "Thread " << m_threadID << " (" << boost::this_thread::get_id() << ") finished." << endl;
343 }
344
345 } // namespace
346
347