1 /*
2 *	Copyright (C) 2010 Sebastian Held (sebastian.held@gmx.de)
3 *
4 *	This program is free software: you can redistribute it and/or modify
5 *	it under the terms of the GNU General Public License as published by
6 *	the Free Software Foundation, either version 3 of the License, or
7 *	(at your option) any later version.
8 *
9 *	This program is distributed in the hope that it will be useful,
10 *	but WITHOUT ANY WARRANTY; without even the implied warranty of
11 *	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 *	GNU General Public License for more details.
13 *
14 *	You should have received a copy of the GNU General Public License
15 *	along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17 
18 //#define ENABLE_DEBUG_TIME
19 
20 #ifdef ENABLE_DEBUG_TIME
21 #define DEBUG_TIME(x) x;
22 #else
23 #define DEBUG_TIME(x) ;
24 #endif
25 
26 
27 
28 #include "engine_multithread.h"
29 #include "extensions/engine_extension.h"
30 #include "tools/array_ops.h"
31 
32 #include "boost/date_time/posix_time/posix_time.hpp"
33 #include "boost/date_time/gregorian/gregorian.hpp"
34 #include <iomanip>
35 
36 #ifndef SSE_CORRECT_DENORMALS
37 #include <xmmintrin.h>
38 #endif
39 
40 //! \brief construct an Engine_Multithread instance
41 //! it's the responsibility of the caller to free the returned pointer
New(const Operator_Multithread * op,unsigned int numThreads)42 Engine_Multithread* Engine_Multithread::New(const Operator_Multithread* op, unsigned int numThreads)
43 {
44 	cout << "Create FDTD engine (compressed SSE + multi-threading)" << endl;
45 	Engine_Multithread* e = new Engine_Multithread(op);
46 	e->setNumThreads( numThreads );
47 	e->Init();
48 	return e;
49 }
50 
Engine_Multithread(const Operator_Multithread * op)51 Engine_Multithread::Engine_Multithread(const Operator_Multithread* op) : ENGINE_MULTITHREAD_BASE(op)
52 {
53 	m_Op_MT = op;
54 	m_type = SSE;
55 	m_IterateBarrier = 0;
56 	m_startBarrier = 0;
57 	m_stopBarrier = 0;
58 
59 #ifdef ENABLE_DEBUG_TIME
60 	m_MPI_Barrier = 0;
61 #endif
62 }
63 
~Engine_Multithread()64 Engine_Multithread::~Engine_Multithread()
65 {
66 #ifdef ENABLE_DEBUG_TIME
67 	NS_Engine_Multithread::DBG().cout() << "Engine_Multithread::~Engine_Multithread()" << endl;
68 	std::map<boost::thread::id, std::vector<double> >::iterator it;
69 	for (it=m_timer_list.begin(); it!=m_timer_list.end(); it++)
70 	{
71 		NS_Engine_Multithread::DBG().cout() << "*** DEBUG Thread: " << it->first << std::endl;
72 		std::vector<double>::iterator it2;
73 		for (it2=it->second.begin(); it2<it->second.end();)
74 		{
75 			NS_Engine_Multithread::DBG().cout() << "after voltage update, before barrier1: " << fixed << setprecision(6) << *(it2++) << std::endl;
76 			NS_Engine_Multithread::DBG().cout() << "after barrier1, before barrier2: "       << fixed << setprecision(6) << *(it2++) << std::endl;
77 			NS_Engine_Multithread::DBG().cout() << "after barrier2, before current update: " << fixed << setprecision(6) << *(it2++) << std::endl;
78 			NS_Engine_Multithread::DBG().cout() << "after current update, before barrier3: " << fixed << setprecision(6) << *(it2++) << std::endl;
79 			NS_Engine_Multithread::DBG().cout() << "after barrier3: "                        << fixed << setprecision(6) << *(it2++) << std::endl;
80 		}
81 	}
82 #endif
83 
84 	Reset();
85 }
86 
setNumThreads(unsigned int numThreads)87 void Engine_Multithread::setNumThreads( unsigned int numThreads )
88 {
89 	m_numThreads = numThreads;
90 }
91 
Init()92 void Engine_Multithread::Init()
93 {
94 	m_stopThreads = true;
95 	ENGINE_MULTITHREAD_BASE::Init();
96 
97 	// initialize threads
98 	m_stopThreads = false;
99 	if (m_numThreads == 0)
100 		m_numThreads = boost::thread::hardware_concurrency();
101 
102 	vector<unsigned int> m_Start_Lines;
103 	vector<unsigned int> m_Stop_Lines;
104 	m_Op_MT->CalcStartStopLines( m_numThreads, m_Start_Lines, m_Stop_Lines );
105 
106 	if (g_settings.GetVerboseLevel()>0)
107 		cout << "Multithreaded engine using " << m_numThreads << " threads. Utilization: (";
108 	m_IterateBarrier = new boost::barrier(m_numThreads); // numThread workers
109 
110 	m_startBarrier = new boost::barrier(m_numThreads+1); // numThread workers + 1 controller
111 	m_stopBarrier = new boost::barrier(m_numThreads+1); // numThread workers + 1 controller
112 #ifdef MPI_SUPPORT
113 	m_MPI_Barrier = 0;
114 #endif
115 
116 	for (unsigned int n=0; n<m_numThreads; n++)
117 	{
118 		unsigned int start = m_Start_Lines.at(n);
119 		unsigned int stop = m_Stop_Lines.at(n);
120 		unsigned int stop_h = stop;
121 		if (n == m_numThreads-1)
122 		{
123 			// last thread
124 			stop_h = stop-1;
125 			if (g_settings.GetVerboseLevel()>0)
126 				cout << stop-start+1 << ")" << endl;
127 		}
128 		else
129 			if (g_settings.GetVerboseLevel()>0)
130 				cout << stop-start+1 << ";";
131 //		NS_Engine_Multithread::DBG().cout() << "###DEBUG## Thread " << n << ": start=" << start << " stop=" << stop  << " stop_h=" << stop_h << std::endl;
132 		boost::thread *t = new boost::thread( NS_Engine_Multithread::thread(this,start,stop,stop_h,n) );
133 		m_thread_group.add_thread( t );
134 	}
135 
136 	for (size_t n=0; n<m_Eng_exts.size(); ++n)
137 		m_Eng_exts.at(n)->SetNumberOfThreads(m_numThreads);
138 }
139 
Reset()140 void Engine_Multithread::Reset()
141 {
142 	if (!m_stopThreads) // prevent multiple invocations
143 	{
144 		ClearExtensions(); //prevent extensions from interfering with thread reset...
145 
146 		// stop the threads
147 		//NS_Engine_Multithread::DBG().cout() << "stopping all threads" << endl;
148 		m_iterTS = 1;
149 		m_startBarrier->wait(); // start the threads
150 		m_stopThreads = true;
151 		m_stopBarrier->wait(); // wait for the threads to finish
152 		m_thread_group.join_all(); // wait for termination
153 		delete m_IterateBarrier;
154 		m_IterateBarrier = 0;
155 		delete m_startBarrier;
156 		m_startBarrier = 0;
157 		delete m_stopBarrier;
158 		m_stopBarrier = 0;
159 	}
160 
161 	ENGINE_MULTITHREAD_BASE::Reset();
162 }
163 
IterateTS(unsigned int iterTS)164 bool Engine_Multithread::IterateTS(unsigned int iterTS)
165 {
166 	m_iterTS = iterTS;
167 
168 	//cout << "bool Engine_Multithread::IterateTS(): starting threads ...";
169 	m_startBarrier->wait(); // start the threads
170 
171 	//cout << "... threads started";
172 
173 	m_stopBarrier->wait(); // wait for the threads to finish <iterTS> time steps
174 	return true;
175 }
176 
DoPreVoltageUpdates(int threadID)177 void Engine_Multithread::DoPreVoltageUpdates(int threadID)
178 {
179 	//execute extensions in reverse order -> highest priority gets access to the voltages last
180 	for (int n=m_Eng_exts.size()-1; n>=0; --n)
181 	{
182 		m_Eng_exts.at(n)->DoPreVoltageUpdates(threadID);
183 		m_IterateBarrier->wait();
184 	}
185 
186 }
187 
DoPostVoltageUpdates(int threadID)188 void Engine_Multithread::DoPostVoltageUpdates(int threadID)
189 {
190 	//execute extensions in normal order -> highest priority gets access to the voltages first
191 	for (size_t n=0; n<m_Eng_exts.size(); ++n)
192 	{
193 		m_Eng_exts.at(n)->DoPostVoltageUpdates(threadID);
194 		m_IterateBarrier->wait();
195 	}
196 }
197 
Apply2Voltages(int threadID)198 void Engine_Multithread::Apply2Voltages(int threadID)
199 {
200 	//execute extensions in normal order -> highest priority gets access to the voltages first
201 	for (size_t n=0; n<m_Eng_exts.size(); ++n)
202 	{
203 		m_Eng_exts.at(n)->Apply2Voltages(threadID);
204 		m_IterateBarrier->wait();
205 	}
206 }
207 
DoPreCurrentUpdates(int threadID)208 void Engine_Multithread::DoPreCurrentUpdates(int threadID)
209 {
210 	//execute extensions in reverse order -> highest priority gets access to the currents last
211 	for (int n=m_Eng_exts.size()-1; n>=0; --n)
212 	{
213 		m_Eng_exts.at(n)->DoPreCurrentUpdates(threadID);
214 		m_IterateBarrier->wait();
215 	}
216 }
217 
DoPostCurrentUpdates(int threadID)218 void Engine_Multithread::DoPostCurrentUpdates(int threadID)
219 {
220 	//execute extensions in normal order -> highest priority gets access to the currents first
221 	for (size_t n=0; n<m_Eng_exts.size(); ++n)
222 	{
223 		m_Eng_exts.at(n)->DoPostCurrentUpdates(threadID);
224 		m_IterateBarrier->wait();
225 	}
226 }
227 
Apply2Current(int threadID)228 void Engine_Multithread::Apply2Current(int threadID)
229 {
230 	//execute extensions in normal order -> highest priority gets access to the currents first
231 	for (size_t n=0; n<m_Eng_exts.size(); ++n)
232 	{
233 		m_Eng_exts.at(n)->Apply2Current(threadID);
234 		m_IterateBarrier->wait();
235 	}
236 }
237 
238 //
239 // *************************************************************************************************************************
240 //
241 namespace NS_Engine_Multithread
242 {
243 
thread(Engine_Multithread * ptr,unsigned int start,unsigned int stop,unsigned int stop_h,unsigned int threadID)244 thread::thread( Engine_Multithread* ptr, unsigned int start, unsigned int stop, unsigned int stop_h, unsigned int threadID )
245 {
246 	m_enginePtr = ptr;
247 	m_start = start;
248 	m_stop = stop;
249 	m_stop_h = stop_h;
250 	m_threadID = threadID;
251 }
252 
operator ()()253 void thread::operator()()
254 {
255 	//std::cout << "thread::operator() Parameters: " << m_start << " " << m_stop << std::endl;
256 	//DBG().cout() << "Thread " << m_threadID << " (" << boost::this_thread::get_id() << ") started." << endl;
257 
258 	// speed up the calculation of denormal floating point values (flush-to-zero)
259 #ifndef SSE_CORRECT_DENORMALS
260 	unsigned int oldMXCSR = _mm_getcsr(); //read the old MXCSR setting
261 	unsigned int newMXCSR = oldMXCSR | 0x8040; // set DAZ and FZ bits
262 	_mm_setcsr( newMXCSR ); //write the new MXCSR setting to the MXCSR
263 #endif
264 
265 	while (!m_enginePtr->m_stopThreads)
266 	{
267 		// wait for start
268 		//DBG().cout() << "Thread " << m_threadID << " (" << boost::this_thread::get_id() << ") waiting..." << endl;
269 		m_enginePtr->m_startBarrier->wait();
270 		//cout << "Thread " << boost::this_thread::get_id() << " waiting... started." << endl;
271 
272 		DEBUG_TIME( Timer timer1 );
273 
274 		for (unsigned int iter=0; iter<m_enginePtr->m_iterTS; ++iter)
275 		{
276 			// pre voltage stuff...
277 			m_enginePtr->DoPreVoltageUpdates(m_threadID);
278 
279 			//voltage updates
280 			m_enginePtr->UpdateVoltages(m_start,m_stop-m_start+1);
281 
282 			// record time
283 			DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
284 
285 			//cout << "Thread " << boost::this_thread::get_id() << " m_barrier1 waiting..." << endl;
286 			m_enginePtr->m_IterateBarrier->wait();
287 
288 			// record time
289 			DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
290 
291 			//post voltage stuff...
292 			m_enginePtr->DoPostVoltageUpdates(m_threadID);
293 			m_enginePtr->Apply2Voltages(m_threadID);
294 
295 #ifdef MPI_SUPPORT
296 			if (m_threadID==0)
297 			{
298 				if (m_enginePtr->m_MPI_Barrier)
299 					m_enginePtr->m_MPI_Barrier->wait();
300 				m_enginePtr->SendReceiveVoltages();
301 			}
302 			m_enginePtr->m_IterateBarrier->wait();
303 #endif
304 
305 			// record time
306 			DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
307 
308 			//pre current stuff
309 			m_enginePtr->DoPreCurrentUpdates(m_threadID);
310 
311 			//current updates
312 			m_enginePtr->UpdateCurrents(m_start,m_stop_h-m_start+1);
313 
314 			// record time
315 			DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
316 			m_enginePtr->m_IterateBarrier->wait();
317 
318 			// record time
319 			DEBUG_TIME( m_enginePtr->m_timer_list[boost::this_thread::get_id()].push_back( timer1.elapsed() ); )
320 
321 			//post current stuff
322 			m_enginePtr->DoPostCurrentUpdates(m_threadID);
323 			m_enginePtr->Apply2Current(m_threadID);
324 
325 #ifdef MPI_SUPPORT
326 			if (m_threadID==0)
327 			{
328 				if (m_enginePtr->m_MPI_Barrier)
329 					m_enginePtr->m_MPI_Barrier->wait();
330 				m_enginePtr->SendReceiveCurrents();
331 			}
332 			m_enginePtr->m_IterateBarrier->wait();
333 #endif
334 
335 			if (m_threadID == 0)
336 				++m_enginePtr->numTS; // only the first thread increments numTS
337 		}
338 
339 		m_enginePtr->m_stopBarrier->wait();
340 	}
341 
342 	//DBG().cout() << "Thread " << m_threadID << " (" << boost::this_thread::get_id() << ") finished." << endl;
343 }
344 
345 } // namespace
346 
347