1 /* -------------------------------------------------------------------------- *
2  *                       Simbody(tm): SimTKcommon                             *
3  * -------------------------------------------------------------------------- *
4  * This is part of the SimTK biosimulation toolkit originating from           *
5  * Simbios, the NIH National Center for Physics-Based Simulation of           *
6  * Biological Structures at Stanford, funded under the NIH Roadmap for        *
7  * Medical Research, grant U54 GM072970. See https://simtk.org/home/simbody.  *
8  *                                                                            *
9  * Portions copyright (c) 2008-12 Stanford University and the Authors.        *
10  * Authors: Peter Eastman                                                     *
11  * Contributors:                                                              *
12  *                                                                            *
13  * Licensed under the Apache License, Version 2.0 (the "License"); you may    *
14  * not use this file except in compliance with the License. You may obtain a  *
15  * copy of the License at http://www.apache.org/licenses/LICENSE-2.0.         *
16  *                                                                            *
17  * Unless required by applicable law or agreed to in writing, software        *
18  * distributed under the License is distributed on an "AS IS" BASIS,          *
19  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   *
20  * See the License for the specific language governing permissions and        *
21  * limitations under the License.                                             *
22  * -------------------------------------------------------------------------- */
23 
24 #include "ParallelExecutorImpl.h"
25 #include "SimTKcommon/internal/ParallelExecutor.h"
26 
27 #include <iostream>
28 #include <string>
29 #include <algorithm>
30 #include <mutex>
31 
32 #if defined(__FreeBSD__) || defined(__DragonFly__)
33 #include <sys/types.h>
34 #include <sys/sysctl.h>
35 #endif
36 
37 using namespace std;
38 
39 namespace SimTK {
40 
41 static void threadBody(ThreadInfo& info);
42 
ParallelExecutorImpl()43 ParallelExecutorImpl::ParallelExecutorImpl() : finished(false) {
44 
45     //By default, we use the total number of processors available of the
46     //computer (including hyperthreads)
47     numMaxThreads = ParallelExecutor::getNumProcessors();
48     if(numMaxThreads <= 0)
49       numMaxThreads = 1;
50 }
ParallelExecutorImpl(int numThreads)51 ParallelExecutorImpl::ParallelExecutorImpl(int numThreads) : finished(false) {
52 
53     // Set the maximum number of threads that we can use
54     SimTK_APIARGCHECK_ALWAYS(numThreads > 0, "ParallelExecutorImpl",
55                  "ParallelExecutorImpl", "Number of threads must be positive.");
56     numMaxThreads = numThreads;
57 }
~ParallelExecutorImpl()58 ParallelExecutorImpl::~ParallelExecutorImpl() {
59 
60     // Notify the threads that they should exit.
61 
62     std::unique_lock<std::mutex> lock(runMutex);
63     finished = true;
64     for (int i = 0; i < (int) threads.size(); ++i)
65         threadInfo[i].running = true;
66     runCondition.notify_all();
67     lock.unlock();
68 
69     // Wait until all the threads have finished.
70 
71     for (int i = 0; i < (int) threads.size(); ++i)
72         threads[i].join();
73 }
clone() const74 ParallelExecutorImpl* ParallelExecutorImpl::clone() const {
75     return new ParallelExecutorImpl(numMaxThreads);
76 }
execute(ParallelExecutor::Task & task,int times)77 void ParallelExecutorImpl::execute(ParallelExecutor::Task& task, int times) {
78   if (min(times, numMaxThreads) == 1) {
79       //(1) NON-PARALLEL CASE:
80       // Nothing is actually going to get done in parallel, so we might as well
81       // just execute the task directly and save the threading overhead.
82       task.initialize();
83       for (int i = 0; i < times; ++i)
84           task.execute(i);
85       task.finish();
86       return;
87     }
88 
89     //(2) PARALLEL CASE:
90     // We launch the maximum number of threads and save them for later use
91     if(threads.size() < (size_t)numMaxThreads)
92     {
93       // We do not support numMaxThreads changing for a given instance of
94       // ParallelExecutor.
95       assert(threads.size() == 0);
96       threadInfo.reserve(numMaxThreads);
97       threads.resize(numMaxThreads);
98       for (int i = 0; i < numMaxThreads; ++i) {
99           threadInfo.emplace_back(i, this);
100           threads[i] = std::thread(threadBody, std::ref(threadInfo[i]));
101       }
102     }
103 
104     // Initialize fields to execute the new task.
105     std::unique_lock<std::mutex> lock(runMutex);
106     currentTask = &task;
107     currentTaskCount = times;
108     waitingThreadCount = 0;
109     for (int i = 0; i < (int)threadInfo.size(); ++i) {
110         threadInfo[i].running = true;
111     }
112 
113     // Wake up the worker threads and wait until they finish.
114     runCondition.notify_all();
115     waitCondition.wait(lock,
116         [&] { return waitingThreadCount == (int)threads.size(); });
117     lock.unlock();
118 }
incrementWaitingThreads()119 void ParallelExecutorImpl::incrementWaitingThreads() {
120     std::lock_guard<std::mutex> lock(runMutex);
121     getCurrentTask().finish();
122     waitingThreadCount++;
123     if (waitingThreadCount == (int)threads.size()) {
124         waitCondition.notify_one();
125     }
126 }
127 
128 thread_local bool ParallelExecutorImpl::isWorker(false);
129 
130 /**
131  * This function contains the code executed by the worker threads.
132  */
133 
threadBody(ThreadInfo & info)134 void threadBody(ThreadInfo& info) {
135     ParallelExecutorImpl::isWorker = true;
136     ParallelExecutorImpl& executor = *info.executor;
137     int threadCount = executor.getThreadCount();
138     while (!executor.isFinished()) {
139 
140         // Wait for a Task to come in.
141 
142         std::unique_lock<std::mutex> lock(executor.getMutex());
143         executor.getCondition().wait(lock, [&] { return info.running; });
144         lock.unlock();
145         if (!executor.isFinished()) {
146 
147             // Execute the task for all the indices belonging to this thread.
148 
149             int count = executor.getCurrentTaskCount();
150             ParallelExecutor::Task& task = executor.getCurrentTask();
151             task.initialize();
152             int index = info.index;
153 
154             try {
155                 while (index < count) {
156                     task.execute(index);
157                     index += threadCount;
158                 }
159             }
160             catch (const std::exception& ex) {
161                 std::cerr <<"The parallel task threw an unhandled exception:"<< std::endl;
162                 std::cerr <<ex.what()<< std::endl;
163             }
164             catch (...) {
165                 std::cerr <<"The parallel task threw an error."<< std::endl;
166             }
167             info.running = false;
168             executor.incrementWaitingThreads();
169         }
170     }
171 }
172 
ParallelExecutor()173 ParallelExecutor::ParallelExecutor() : HandleBase(new ParallelExecutorImpl()) {
174 }
175 
ParallelExecutor(int numThreads)176 ParallelExecutor::ParallelExecutor(int numThreads) : HandleBase(new ParallelExecutorImpl(numThreads)) {
177 }
178 
clone() const179 ParallelExecutor* ParallelExecutor::clone() const{
180     return new ParallelExecutor(getMaxThreads());
181 }
182 
execute(Task & task,int times)183 void ParallelExecutor::execute(Task& task, int times) {
184     updImpl().execute(task, times);
185 }
186 
187 #ifdef __APPLE__
188    #include <sys/sysctl.h>
189    #include <dlfcn.h>
190 #elif _WIN32
191    #include <windows.h>
192 #elif __linux__
193    #include <dlfcn.h>
194    #include <unistd.h>
195 #elif __FreeBSD__ || __DragonFly__
196    #include <dlfcn.h>
197    #include <unistd.h>
198 #else
199   #error "Architecture unsupported"
200 #endif
201 
getNumProcessors()202 int ParallelExecutor::getNumProcessors() {
203 #ifdef __APPLE__
204     int ncpu,retval;
205     size_t len = 4;
206 
207     retval = sysctlbyname( "hw.physicalcpu", &ncpu, &len, NULL, 0 );
208     if( retval == 0 ) {
209        return (ncpu );
210     } else {
211        return(1);
212     }
213 #else
214 #ifdef __linux__
215     long nProcessorsOnline     = sysconf(_SC_NPROCESSORS_ONLN);
216     if( nProcessorsOnline == -1 )  {
217         return(1);
218     } else {
219         return( (int)nProcessorsOnline );
220     }
221 #else
222 #if defined(__FreeBSD__) || defined(__DragonFly__)
223     int ncpu,retval;
224     size_t len = 4;
225 
226     retval = sysctlbyname( "hw.ncpu", &ncpu, &len, NULL, 0 );
227     if( retval == 0 ) {
228        return (ncpu );
229     } else {
230        return(1);
231     }
232 #else
233     // Windows
234 
235     SYSTEM_INFO siSysInfo;
236     int ncpu;
237 
238     // Copy the hardware information to the SYSTEM_INFO structure.
239 
240     GetSystemInfo(&siSysInfo);
241 
242     // Display the contents of the SYSTEM_INFO structure.
243 
244     ncpu =  siSysInfo.dwNumberOfProcessors;
245     if( ncpu < 1 ) ncpu = 1;
246     return(ncpu);
247 #endif
248 #endif
249 #endif
250 }
251 
isWorkerThread()252 bool ParallelExecutor::isWorkerThread() {
253     return ParallelExecutorImpl::isWorker;
254 }
getMaxThreads() const255 int ParallelExecutor::getMaxThreads() const{
256     return getImpl().getMaxThreads();
257 }
258 
259 } // namespace SimTK
260