1 /****************************************************************************/ 2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo 3 // Copyright (C) 2004-2019 German Aerospace Center (DLR) and others. 4 // This program and the accompanying materials 5 // are made available under the terms of the Eclipse Public License v2.0 6 // which accompanies this distribution, and is available at 7 // http://www.eclipse.org/legal/epl-v20.html 8 // SPDX-License-Identifier: EPL-2.0 9 /****************************************************************************/ 10 /// @file FXWorkerThread.h 11 /// @author Michael Behrisch 12 /// @date 2014-07-13 13 /// @version $Id$ 14 /// 15 // A thread class together with a pool and a task for parallelized computation 16 /****************************************************************************/ 17 18 #ifndef FXWorkerThread_h 19 #define FXWorkerThread_h 20 21 // #define WORKLOAD_PROFILING 22 // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING 23 // undefine to use summary report only 24 #define WORKLOAD_INTERVAL 100 25 26 // =========================================================================== 27 // included modules 28 // =========================================================================== 29 #include <config.h> 30 31 #include <list> 32 #include <vector> 33 #include <fx.h> 34 #ifdef WORKLOAD_PROFILING 35 #include <chrono> 36 #include <utils/common/MsgHandler.h> 37 #include <utils/common/ToString.h> 38 #endif 39 #include <utils/common/UtilExceptions.h> 40 41 42 // =========================================================================== 43 // class definitions 44 // =========================================================================== 45 /** 46 * @class FXWorkerThread 47 * @brief A thread repeatingly calculating incoming tasks 48 */ 49 class FXWorkerThread : public FXThread { 50 51 public: 52 /** 53 * @class FXWorkerThread::Task 54 * @brief Abstract superclass of a task to be run with an index to keep track of pending tasks 55 */ 56 class Task { 57 public: 58 /// @brief Desctructor ~Task()59 virtual ~Task() {}; 60 61 /** @brief Abstract method which in subclasses should contain the computations to be performed. 62 * 63 * If there is data to be shared among several tasks (but not among several threads) it can be put in the 64 * a thread class subclassing the FXWorkerThread. the instance of the thread is then made available 65 * via the context parameter. 66 * 67 * @param[in] context The thread which runs the task 68 */ 69 virtual void run(FXWorkerThread* context) = 0; 70 71 /** @brief Sets the running index of this task. 72 * 73 * Every task receive an index which is unique among all pending tasks of the same thread pool. 74 * 75 * @param[in] newIndex the index to assign 76 */ setIndex(const int newIndex)77 void setIndex(const int newIndex) { 78 myIndex = newIndex; 79 } 80 private: 81 /// @brief the index of the task, valid only after the task has been added to the pool 82 int myIndex; 83 }; 84 85 /** 86 * @class FXWorkerThread::Pool 87 * @brief A pool of worker threads which distributes the tasks and collects the results 88 */ 89 class Pool { 90 public: 91 /** @brief Constructor 92 * 93 * May initialize the pool with a given number of workers. 94 * 95 * @param[in] numThreads the number of threads to create 96 */ myPoolMutex(true)97 Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr) 98 #ifdef WORKLOAD_PROFILING 99 , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.) 100 #endif 101 { 102 #ifdef WORKLOAD_PROFILING 103 long long int timeDiff = 0; 104 for (int i = 0; i < 100; i++) { 105 const auto begin = std::chrono::high_resolution_clock::now(); 106 const auto end = std::chrono::high_resolution_clock::now(); 107 timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count(); 108 } 109 //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl; 110 #endif 111 while (numThreads > 0) { 112 new FXWorkerThread(*this); 113 numThreads--; 114 } 115 } 116 117 /** @brief Destructor 118 * 119 * Stopping and deleting all workers by calling clear. 120 */ ~Pool()121 virtual ~Pool() { 122 clear(); 123 } 124 125 /** @brief Stops and deletes all worker threads. 126 */ clear()127 void clear() { 128 for (FXWorkerThread* const worker : myWorkers) { 129 delete worker; 130 } 131 myWorkers.clear(); 132 } 133 134 /** @brief Adds the given thread to the pool. 135 * 136 * @param[in] w the thread to add 137 */ addWorker(FXWorkerThread * const w)138 void addWorker(FXWorkerThread* const w) { 139 myWorkers.push_back(w); 140 } 141 142 /** @brief Gives a number to the given task and assigns it to the worker with the given index. 143 * If the index is negative, assign to the next (round robin) one. 144 * 145 * @param[in] t the task to add 146 * @param[in] index index of the worker thread to use or -1 for an arbitrary one 147 */ 148 void add(Task* const t, int index = -1) { 149 if (index < 0) { 150 index = myRunningIndex % myWorkers.size(); 151 } 152 #ifdef WORKLOAD_PROFILING 153 if (myRunningIndex == 0) { 154 for (FXWorkerThread* const worker : myWorkers) { 155 worker->startProfile(); 156 } 157 myProfileStart = std::chrono::high_resolution_clock::now(); 158 } 159 #endif 160 t->setIndex(myRunningIndex++); 161 myWorkers[index]->add(t); 162 } 163 164 /** @brief Adds the given tasks to the list of finished tasks. 165 * 166 * Locks the internal mutex and appends the finished tasks. This is to be called by the worker thread only. 167 * 168 * @param[in] tasks the tasks to add 169 */ addFinished(std::list<Task * > & tasks)170 void addFinished(std::list<Task*>& tasks) { 171 myMutex.lock(); 172 myFinishedTasks.splice(myFinishedTasks.end(), tasks); 173 myCondition.signal(); 174 myMutex.unlock(); 175 } 176 setException(ProcessError & e)177 void setException(ProcessError& e) { 178 myMutex.lock(); 179 if (myException == nullptr) { 180 myException = new ProcessError(e); 181 } 182 myMutex.unlock(); 183 } 184 185 /// @brief waits for all tasks to be finished 186 void waitAll(const bool deleteFinished = true) { 187 myMutex.lock(); 188 while ((int)myFinishedTasks.size() < myRunningIndex) { 189 myCondition.wait(myMutex); 190 } 191 #ifdef WORKLOAD_PROFILING 192 if (myRunningIndex > 0) { 193 const auto end = std::chrono::high_resolution_clock::now(); 194 const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count(); 195 double minLoad = std::numeric_limits<double>::max(); 196 double maxLoad = 0.; 197 for (FXWorkerThread* const worker : myWorkers) { 198 const double load = worker->endProfile(elapsed); 199 minLoad = MIN2(minLoad, load); 200 maxLoad = MAX2(maxLoad, load); 201 } 202 #ifdef WORKLOAD_INTERVAL 203 myTotalMaxLoad += maxLoad; 204 myTotalSpread += maxLoad / minLoad; 205 myNumBatches++; 206 if (myNumBatches % WORKLOAD_INTERVAL == 0) { 207 WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL)); 208 myTotalMaxLoad = 0.; 209 myTotalSpread = 0.; 210 } 211 #endif 212 } 213 #endif 214 if (deleteFinished) { 215 for (Task* task : myFinishedTasks) { 216 delete task; 217 } 218 } 219 ProcessError* toRaise = myException; 220 myException = nullptr; 221 myFinishedTasks.clear(); 222 myRunningIndex = 0; 223 myMutex.unlock(); 224 if (toRaise != nullptr) { 225 throw* toRaise; 226 } 227 } 228 229 /** @brief Checks whether there are currently more pending tasks than threads. 230 * 231 * This is only a rough estimate because the tasks are already assigned and there could be an idle thread even though the 232 * number of tasks is large. 233 * 234 * @return whether there are enough tasks to let all threads work 235 */ isFull()236 bool isFull() const { 237 return myRunningIndex - (int)myFinishedTasks.size() >= size(); 238 } 239 240 /** @brief Returns the number of threads in the pool. 241 * 242 * @return the number of threads 243 */ size()244 int size() const { 245 return (int)myWorkers.size(); 246 } 247 248 /// @brief locks the pool mutex lock()249 void lock() { 250 myPoolMutex.lock(); 251 } 252 253 /// @brief unlocks the pool mutex unlock()254 void unlock() { 255 myPoolMutex.unlock(); 256 } 257 258 private: 259 /// @brief the current worker threads 260 std::vector<FXWorkerThread*> myWorkers; 261 /// @brief the internal mutex for the task list 262 FXMutex myMutex; 263 /// @brief the pool mutex for external sync 264 FXMutex myPoolMutex; 265 /// @brief the semaphore to wait on for finishing all tasks 266 FXCondition myCondition; 267 /// @brief list of finished tasks 268 std::list<Task*> myFinishedTasks; 269 /// @brief the running index for the next task 270 int myRunningIndex; 271 /// @brief the exception from a child thread 272 ProcessError* myException; 273 #ifdef WORKLOAD_PROFILING 274 /// @brief the number of finished batch runs 275 int myNumBatches; 276 /// @brief the sum over the maximum loads 277 double myTotalMaxLoad; 278 /// @brief the sum over the load spreads 279 double myTotalSpread; 280 /// @brief the time when profiling started 281 std::chrono::high_resolution_clock::time_point myProfileStart; 282 #endif 283 }; 284 285 public: 286 /** @brief Constructor 287 * 288 * Adds the thread to the given pool and starts it. 289 * 290 * @param[in] pool the pool for this thread 291 */ FXWorkerThread(Pool & pool)292 FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false) 293 #ifdef WORKLOAD_PROFILING 294 , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0) 295 #endif 296 { 297 pool.addWorker(this); 298 start(); 299 } 300 301 /** @brief Destructor 302 * 303 * Stops the thread by calling stop. 304 */ ~FXWorkerThread()305 virtual ~FXWorkerThread() { 306 stop(); 307 #ifdef WORKLOAD_PROFILING 308 const double load = 100. * myTotalBusyTime / myTotalTime; 309 WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) + 310 " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) + 311 "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task."); 312 #endif 313 } 314 315 /** @brief Adds the given task to this thread to be calculated 316 * 317 * @param[in] t the task to add 318 */ add(Task * t)319 void add(Task* t) { 320 myMutex.lock(); 321 myTasks.push_back(t); 322 myCondition.signal(); 323 myMutex.unlock(); 324 } 325 326 /** @brief Main execution method of this thread. 327 * 328 * Checks for new tasks, calculates them and puts them in the finished list of the pool until being stopped. 329 * 330 * @return always 0 331 */ run()332 FXint run() { 333 while (!myStopped) { 334 myMutex.lock(); 335 while (!myStopped && myTasks.empty()) { 336 myCondition.wait(myMutex); 337 } 338 if (myStopped) { 339 myMutex.unlock(); 340 break; 341 } 342 myCurrentTasks.splice(myCurrentTasks.end(), myTasks); 343 myMutex.unlock(); 344 try { 345 for (Task* const t : myCurrentTasks) { 346 #ifdef WORKLOAD_PROFILING 347 const auto before = std::chrono::high_resolution_clock::now(); 348 #endif 349 t->run(this); 350 #ifdef WORKLOAD_PROFILING 351 const auto after = std::chrono::high_resolution_clock::now(); 352 myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count(); 353 myCounter++; 354 #endif 355 } 356 } catch (ProcessError& e) { 357 myPool.setException(e); 358 } 359 myPool.addFinished(myCurrentTasks); 360 } 361 return 0; 362 } 363 364 /** @brief Stops the thread 365 * 366 * The currently running task will be finished but all further tasks are discarded. 367 */ stop()368 void stop() { 369 myMutex.lock(); 370 myStopped = true; 371 myCondition.signal(); 372 myMutex.unlock(); 373 join(); 374 } 375 376 #ifdef WORKLOAD_PROFILING startProfile()377 void startProfile() { 378 myBusyTime = 0; 379 } 380 endProfile(const long long int time)381 double endProfile(const long long int time) { 382 myTotalTime += time; 383 myTotalBusyTime += myBusyTime; 384 return time == 0 ? 100. : 100. * myBusyTime / time; 385 } 386 #endif 387 388 private: 389 /// @brief the pool for this thread 390 Pool& myPool; 391 /// @brief the mutex for the task list 392 FXMutex myMutex; 393 /// @brief the semaphore when waiting for new tasks 394 FXCondition myCondition; 395 /// @brief the list of pending tasks 396 std::list<Task*> myTasks; 397 /// @brief the list of tasks which are currently calculated 398 std::list<Task*> myCurrentTasks; 399 /// @brief whether we are still running 400 bool myStopped; 401 #ifdef WORKLOAD_PROFILING 402 /// @brief counting completed tasks 403 int myCounter; 404 /// @brief the time spent in calculations during the current batch 405 long long int myBusyTime; 406 /// @brief the total busy time 407 long long int myTotalBusyTime; 408 /// @brief the total time while anyone had tasks 409 long long int myTotalTime; 410 #endif 411 }; 412 413 414 #endif 415