1 /* -------------------------------------------------------------------------- *
2 * Simbody(tm): SimTKcommon *
3 * -------------------------------------------------------------------------- *
4 * This is part of the SimTK biosimulation toolkit originating from *
5 * Simbios, the NIH National Center for Physics-Based Simulation of *
6 * Biological Structures at Stanford, funded under the NIH Roadmap for *
7 * Medical Research, grant U54 GM072970. See https://simtk.org/home/simbody. *
8 * *
9 * Portions copyright (c) 2008-12 Stanford University and the Authors. *
10 * Authors: Peter Eastman *
11 * Contributors: *
12 * *
13 * Licensed under the Apache License, Version 2.0 (the "License"); you may *
14 * not use this file except in compliance with the License. You may obtain a *
15 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0. *
16 * *
17 * Unless required by applicable law or agreed to in writing, software *
18 * distributed under the License is distributed on an "AS IS" BASIS, *
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
20 * See the License for the specific language governing permissions and *
21 * limitations under the License. *
22 * -------------------------------------------------------------------------- */
23
24 #include "ParallelExecutorImpl.h"
25 #include "SimTKcommon/internal/ParallelExecutor.h"
26
27 #include <iostream>
28 #include <string>
29 #include <algorithm>
30 #include <mutex>
31
32 #if defined(__FreeBSD__) || defined(__DragonFly__)
33 #include <sys/types.h>
34 #include <sys/sysctl.h>
35 #endif
36
37 using namespace std;
38
39 namespace SimTK {
40
41 static void threadBody(ThreadInfo& info);
42
ParallelExecutorImpl()43 ParallelExecutorImpl::ParallelExecutorImpl() : finished(false) {
44
45 //By default, we use the total number of processors available of the
46 //computer (including hyperthreads)
47 numMaxThreads = ParallelExecutor::getNumProcessors();
48 if(numMaxThreads <= 0)
49 numMaxThreads = 1;
50 }
ParallelExecutorImpl(int numThreads)51 ParallelExecutorImpl::ParallelExecutorImpl(int numThreads) : finished(false) {
52
53 // Set the maximum number of threads that we can use
54 SimTK_APIARGCHECK_ALWAYS(numThreads > 0, "ParallelExecutorImpl",
55 "ParallelExecutorImpl", "Number of threads must be positive.");
56 numMaxThreads = numThreads;
57 }
~ParallelExecutorImpl()58 ParallelExecutorImpl::~ParallelExecutorImpl() {
59
60 // Notify the threads that they should exit.
61
62 std::unique_lock<std::mutex> lock(runMutex);
63 finished = true;
64 for (int i = 0; i < (int) threads.size(); ++i)
65 threadInfo[i].running = true;
66 runCondition.notify_all();
67 lock.unlock();
68
69 // Wait until all the threads have finished.
70
71 for (int i = 0; i < (int) threads.size(); ++i)
72 threads[i].join();
73 }
clone() const74 ParallelExecutorImpl* ParallelExecutorImpl::clone() const {
75 return new ParallelExecutorImpl(numMaxThreads);
76 }
execute(ParallelExecutor::Task & task,int times)77 void ParallelExecutorImpl::execute(ParallelExecutor::Task& task, int times) {
78 if (min(times, numMaxThreads) == 1) {
79 //(1) NON-PARALLEL CASE:
80 // Nothing is actually going to get done in parallel, so we might as well
81 // just execute the task directly and save the threading overhead.
82 task.initialize();
83 for (int i = 0; i < times; ++i)
84 task.execute(i);
85 task.finish();
86 return;
87 }
88
89 //(2) PARALLEL CASE:
90 // We launch the maximum number of threads and save them for later use
91 if(threads.size() < (size_t)numMaxThreads)
92 {
93 // We do not support numMaxThreads changing for a given instance of
94 // ParallelExecutor.
95 assert(threads.size() == 0);
96 threadInfo.reserve(numMaxThreads);
97 threads.resize(numMaxThreads);
98 for (int i = 0; i < numMaxThreads; ++i) {
99 threadInfo.emplace_back(i, this);
100 threads[i] = std::thread(threadBody, std::ref(threadInfo[i]));
101 }
102 }
103
104 // Initialize fields to execute the new task.
105 std::unique_lock<std::mutex> lock(runMutex);
106 currentTask = &task;
107 currentTaskCount = times;
108 waitingThreadCount = 0;
109 for (int i = 0; i < (int)threadInfo.size(); ++i) {
110 threadInfo[i].running = true;
111 }
112
113 // Wake up the worker threads and wait until they finish.
114 runCondition.notify_all();
115 waitCondition.wait(lock,
116 [&] { return waitingThreadCount == (int)threads.size(); });
117 lock.unlock();
118 }
incrementWaitingThreads()119 void ParallelExecutorImpl::incrementWaitingThreads() {
120 std::lock_guard<std::mutex> lock(runMutex);
121 getCurrentTask().finish();
122 waitingThreadCount++;
123 if (waitingThreadCount == (int)threads.size()) {
124 waitCondition.notify_one();
125 }
126 }
127
128 thread_local bool ParallelExecutorImpl::isWorker(false);
129
130 /**
131 * This function contains the code executed by the worker threads.
132 */
133
threadBody(ThreadInfo & info)134 void threadBody(ThreadInfo& info) {
135 ParallelExecutorImpl::isWorker = true;
136 ParallelExecutorImpl& executor = *info.executor;
137 int threadCount = executor.getThreadCount();
138 while (!executor.isFinished()) {
139
140 // Wait for a Task to come in.
141
142 std::unique_lock<std::mutex> lock(executor.getMutex());
143 executor.getCondition().wait(lock, [&] { return info.running; });
144 lock.unlock();
145 if (!executor.isFinished()) {
146
147 // Execute the task for all the indices belonging to this thread.
148
149 int count = executor.getCurrentTaskCount();
150 ParallelExecutor::Task& task = executor.getCurrentTask();
151 task.initialize();
152 int index = info.index;
153
154 try {
155 while (index < count) {
156 task.execute(index);
157 index += threadCount;
158 }
159 }
160 catch (const std::exception& ex) {
161 std::cerr <<"The parallel task threw an unhandled exception:"<< std::endl;
162 std::cerr <<ex.what()<< std::endl;
163 }
164 catch (...) {
165 std::cerr <<"The parallel task threw an error."<< std::endl;
166 }
167 info.running = false;
168 executor.incrementWaitingThreads();
169 }
170 }
171 }
172
ParallelExecutor()173 ParallelExecutor::ParallelExecutor() : HandleBase(new ParallelExecutorImpl()) {
174 }
175
ParallelExecutor(int numThreads)176 ParallelExecutor::ParallelExecutor(int numThreads) : HandleBase(new ParallelExecutorImpl(numThreads)) {
177 }
178
clone() const179 ParallelExecutor* ParallelExecutor::clone() const{
180 return new ParallelExecutor(getMaxThreads());
181 }
182
execute(Task & task,int times)183 void ParallelExecutor::execute(Task& task, int times) {
184 updImpl().execute(task, times);
185 }
186
187 #ifdef __APPLE__
188 #include <sys/sysctl.h>
189 #include <dlfcn.h>
190 #elif _WIN32
191 #include <windows.h>
192 #elif __linux__
193 #include <dlfcn.h>
194 #include <unistd.h>
195 #elif __FreeBSD__ || __DragonFly__
196 #include <dlfcn.h>
197 #include <unistd.h>
198 #else
199 #error "Architecture unsupported"
200 #endif
201
getNumProcessors()202 int ParallelExecutor::getNumProcessors() {
203 #ifdef __APPLE__
204 int ncpu,retval;
205 size_t len = 4;
206
207 retval = sysctlbyname( "hw.physicalcpu", &ncpu, &len, NULL, 0 );
208 if( retval == 0 ) {
209 return (ncpu );
210 } else {
211 return(1);
212 }
213 #else
214 #ifdef __linux__
215 long nProcessorsOnline = sysconf(_SC_NPROCESSORS_ONLN);
216 if( nProcessorsOnline == -1 ) {
217 return(1);
218 } else {
219 return( (int)nProcessorsOnline );
220 }
221 #else
222 #if defined(__FreeBSD__) || defined(__DragonFly__)
223 int ncpu,retval;
224 size_t len = 4;
225
226 retval = sysctlbyname( "hw.ncpu", &ncpu, &len, NULL, 0 );
227 if( retval == 0 ) {
228 return (ncpu );
229 } else {
230 return(1);
231 }
232 #else
233 // Windows
234
235 SYSTEM_INFO siSysInfo;
236 int ncpu;
237
238 // Copy the hardware information to the SYSTEM_INFO structure.
239
240 GetSystemInfo(&siSysInfo);
241
242 // Display the contents of the SYSTEM_INFO structure.
243
244 ncpu = siSysInfo.dwNumberOfProcessors;
245 if( ncpu < 1 ) ncpu = 1;
246 return(ncpu);
247 #endif
248 #endif
249 #endif
250 }
251
isWorkerThread()252 bool ParallelExecutor::isWorkerThread() {
253 return ParallelExecutorImpl::isWorker;
254 }
getMaxThreads() const255 int ParallelExecutor::getMaxThreads() const{
256 return getImpl().getMaxThreads();
257 }
258
259 } // namespace SimTK
260