1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2004-2019 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials
5 // are made available under the terms of the Eclipse Public License v2.0
6 // which accompanies this distribution, and is available at
7 // http://www.eclipse.org/legal/epl-v20.html
8 // SPDX-License-Identifier: EPL-2.0
9 /****************************************************************************/
10 /// @file    FXWorkerThread.h
11 /// @author  Michael Behrisch
12 /// @date    2014-07-13
13 /// @version $Id$
14 ///
15 // A thread class together with a pool and a task for parallelized computation
16 /****************************************************************************/
17 
18 #ifndef FXWorkerThread_h
19 #define FXWorkerThread_h
20 
21 // #define WORKLOAD_PROFILING
22 // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
23 // undefine to use summary report only
24 #define WORKLOAD_INTERVAL 100
25 
26 // ===========================================================================
27 // included modules
28 // ===========================================================================
29 #include <config.h>
30 
31 #include <list>
32 #include <vector>
33 #include <fx.h>
34 #ifdef WORKLOAD_PROFILING
35 #include <chrono>
36 #include <utils/common/MsgHandler.h>
37 #include <utils/common/ToString.h>
38 #endif
39 #include <utils/common/UtilExceptions.h>
40 
41 
42 // ===========================================================================
43 // class definitions
44 // ===========================================================================
45 /**
46  * @class FXWorkerThread
47  * @brief A thread repeatingly calculating incoming tasks
48  */
49 class FXWorkerThread : public FXThread {
50 
51 public:
52     /**
53      * @class FXWorkerThread::Task
54      * @brief Abstract superclass of a task to be run with an index to keep track of pending tasks
55      */
56     class Task {
57     public:
58         /// @brief Desctructor
~Task()59         virtual ~Task() {};
60 
61         /** @brief Abstract method which in subclasses should contain the computations to be performed.
62          *
63          * If there is data to be shared among several tasks (but not among several threads) it can be put in the
64          *  a thread class subclassing the FXWorkerThread. the instance of the thread is then made available
65          *  via the context parameter.
66          *
67          * @param[in] context The thread which runs the task
68          */
69         virtual void run(FXWorkerThread* context) = 0;
70 
71         /** @brief Sets the running index of this task.
72          *
73          * Every task receive an index which is unique among all pending tasks of the same thread pool.
74          *
75          * @param[in] newIndex the index to assign
76          */
setIndex(const int newIndex)77         void setIndex(const int newIndex) {
78             myIndex = newIndex;
79         }
80     private:
81         /// @brief the index of the task, valid only after the task has been added to the pool
82         int myIndex;
83     };
84 
85     /**
86      * @class FXWorkerThread::Pool
87      * @brief A pool of worker threads which distributes the tasks and collects the results
88      */
89     class Pool {
90     public:
91         /** @brief Constructor
92          *
93          * May initialize the pool with a given number of workers.
94          *
95          * @param[in] numThreads the number of threads to create
96          */
myPoolMutex(true)97         Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
98 #ifdef WORKLOAD_PROFILING
99             , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
100 #endif
101         {
102 #ifdef WORKLOAD_PROFILING
103             long long int timeDiff = 0;
104             for (int i = 0; i < 100; i++) {
105                 const auto begin = std::chrono::high_resolution_clock::now();
106                 const auto end = std::chrono::high_resolution_clock::now();
107                 timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
108             }
109             //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
110 #endif
111             while (numThreads > 0) {
112                 new FXWorkerThread(*this);
113                 numThreads--;
114             }
115         }
116 
117         /** @brief Destructor
118          *
119          * Stopping and deleting all workers by calling clear.
120          */
~Pool()121         virtual ~Pool() {
122             clear();
123         }
124 
125         /** @brief Stops and deletes all worker threads.
126          */
clear()127         void clear() {
128             for (FXWorkerThread* const worker : myWorkers) {
129                 delete worker;
130             }
131             myWorkers.clear();
132         }
133 
134         /** @brief Adds the given thread to the pool.
135          *
136          * @param[in] w the thread to add
137          */
addWorker(FXWorkerThread * const w)138         void addWorker(FXWorkerThread* const w) {
139             myWorkers.push_back(w);
140         }
141 
142         /** @brief Gives a number to the given task and assigns it to the worker with the given index.
143          * If the index is negative, assign to the next (round robin) one.
144          *
145          * @param[in] t the task to add
146          * @param[in] index index of the worker thread to use or -1 for an arbitrary one
147          */
148         void add(Task* const t, int index = -1) {
149             if (index < 0) {
150                 index = myRunningIndex % myWorkers.size();
151             }
152 #ifdef WORKLOAD_PROFILING
153             if (myRunningIndex == 0) {
154                 for (FXWorkerThread* const worker : myWorkers) {
155                     worker->startProfile();
156                 }
157                 myProfileStart = std::chrono::high_resolution_clock::now();
158             }
159 #endif
160             t->setIndex(myRunningIndex++);
161             myWorkers[index]->add(t);
162         }
163 
164         /** @brief Adds the given tasks to the list of finished tasks.
165          *
166          * Locks the internal mutex and appends the finished tasks. This is to be called by the worker thread only.
167          *
168          * @param[in] tasks the tasks to add
169          */
addFinished(std::list<Task * > & tasks)170         void addFinished(std::list<Task*>& tasks) {
171             myMutex.lock();
172             myFinishedTasks.splice(myFinishedTasks.end(), tasks);
173             myCondition.signal();
174             myMutex.unlock();
175         }
176 
setException(ProcessError & e)177         void setException(ProcessError& e) {
178             myMutex.lock();
179             if (myException == nullptr) {
180                 myException = new ProcessError(e);
181             }
182             myMutex.unlock();
183         }
184 
185         /// @brief waits for all tasks to be finished
186         void waitAll(const bool deleteFinished = true) {
187             myMutex.lock();
188             while ((int)myFinishedTasks.size() < myRunningIndex) {
189                 myCondition.wait(myMutex);
190             }
191 #ifdef WORKLOAD_PROFILING
192             if (myRunningIndex > 0) {
193                 const auto end = std::chrono::high_resolution_clock::now();
194                 const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
195                 double minLoad = std::numeric_limits<double>::max();
196                 double maxLoad = 0.;
197                 for (FXWorkerThread* const worker : myWorkers) {
198                     const double load = worker->endProfile(elapsed);
199                     minLoad = MIN2(minLoad, load);
200                     maxLoad = MAX2(maxLoad, load);
201                 }
202 #ifdef WORKLOAD_INTERVAL
203                 myTotalMaxLoad += maxLoad;
204                 myTotalSpread += maxLoad / minLoad;
205                 myNumBatches++;
206                 if (myNumBatches % WORKLOAD_INTERVAL == 0) {
207                     WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
208                     myTotalMaxLoad = 0.;
209                     myTotalSpread = 0.;
210                 }
211 #endif
212             }
213 #endif
214             if (deleteFinished) {
215                 for (Task* task : myFinishedTasks) {
216                     delete task;
217                 }
218             }
219             ProcessError* toRaise = myException;
220             myException = nullptr;
221             myFinishedTasks.clear();
222             myRunningIndex = 0;
223             myMutex.unlock();
224             if (toRaise != nullptr) {
225                 throw* toRaise;
226             }
227         }
228 
229         /** @brief Checks whether there are currently more pending tasks than threads.
230          *
231          * This is only a rough estimate because the tasks are already assigned and there could be an idle thread even though the
232          *  number of tasks is large.
233          *
234          * @return whether there are enough tasks to let all threads work
235          */
isFull()236         bool isFull() const {
237             return myRunningIndex - (int)myFinishedTasks.size() >= size();
238         }
239 
240         /** @brief Returns the number of threads in the pool.
241          *
242          * @return the number of threads
243          */
size()244         int size() const {
245             return (int)myWorkers.size();
246         }
247 
248         /// @brief locks the pool mutex
lock()249         void lock() {
250             myPoolMutex.lock();
251         }
252 
253         /// @brief unlocks the pool mutex
unlock()254         void unlock() {
255             myPoolMutex.unlock();
256         }
257 
258     private:
259         /// @brief the current worker threads
260         std::vector<FXWorkerThread*> myWorkers;
261         /// @brief the internal mutex for the task list
262         FXMutex myMutex;
263         /// @brief the pool mutex for external sync
264         FXMutex myPoolMutex;
265         /// @brief the semaphore to wait on for finishing all tasks
266         FXCondition myCondition;
267         /// @brief list of finished tasks
268         std::list<Task*> myFinishedTasks;
269         /// @brief the running index for the next task
270         int myRunningIndex;
271         /// @brief the exception from a child thread
272         ProcessError* myException;
273 #ifdef WORKLOAD_PROFILING
274         /// @brief the number of finished batch runs
275         int myNumBatches;
276         /// @brief the sum over the maximum loads
277         double myTotalMaxLoad;
278         /// @brief the sum over the load spreads
279         double myTotalSpread;
280         /// @brief the time when profiling started
281         std::chrono::high_resolution_clock::time_point myProfileStart;
282 #endif
283     };
284 
285 public:
286     /** @brief Constructor
287      *
288      * Adds the thread to the given pool and starts it.
289      *
290      * @param[in] pool the pool for this thread
291      */
FXWorkerThread(Pool & pool)292     FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
293 #ifdef WORKLOAD_PROFILING
294         , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
295 #endif
296     {
297         pool.addWorker(this);
298         start();
299     }
300 
301     /** @brief Destructor
302      *
303      * Stops the thread by calling stop.
304      */
~FXWorkerThread()305     virtual ~FXWorkerThread() {
306         stop();
307 #ifdef WORKLOAD_PROFILING
308         const double load = 100. * myTotalBusyTime / myTotalTime;
309         WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
310                       " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
311                       "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
312 #endif
313     }
314 
315     /** @brief Adds the given task to this thread to be calculated
316      *
317      * @param[in] t the task to add
318      */
add(Task * t)319     void add(Task* t) {
320         myMutex.lock();
321         myTasks.push_back(t);
322         myCondition.signal();
323         myMutex.unlock();
324     }
325 
326     /** @brief Main execution method of this thread.
327      *
328      * Checks for new tasks, calculates them and puts them in the finished list of the pool until being stopped.
329      *
330      * @return always 0
331      */
run()332     FXint run() {
333         while (!myStopped) {
334             myMutex.lock();
335             while (!myStopped && myTasks.empty()) {
336                 myCondition.wait(myMutex);
337             }
338             if (myStopped) {
339                 myMutex.unlock();
340                 break;
341             }
342             myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
343             myMutex.unlock();
344             try {
345                 for (Task* const t : myCurrentTasks) {
346 #ifdef WORKLOAD_PROFILING
347                     const auto before = std::chrono::high_resolution_clock::now();
348 #endif
349                     t->run(this);
350 #ifdef WORKLOAD_PROFILING
351                     const auto after = std::chrono::high_resolution_clock::now();
352                     myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
353                     myCounter++;
354 #endif
355                 }
356             } catch (ProcessError& e) {
357                 myPool.setException(e);
358             }
359             myPool.addFinished(myCurrentTasks);
360         }
361         return 0;
362     }
363 
364     /** @brief Stops the thread
365      *
366      * The currently running task will be finished but all further tasks are discarded.
367      */
stop()368     void stop() {
369         myMutex.lock();
370         myStopped = true;
371         myCondition.signal();
372         myMutex.unlock();
373         join();
374     }
375 
376 #ifdef WORKLOAD_PROFILING
startProfile()377     void startProfile() {
378         myBusyTime = 0;
379     }
380 
endProfile(const long long int time)381     double endProfile(const long long int time) {
382         myTotalTime += time;
383         myTotalBusyTime += myBusyTime;
384         return time == 0 ? 100. : 100. * myBusyTime / time;
385     }
386 #endif
387 
388 private:
389     /// @brief the pool for this thread
390     Pool& myPool;
391     /// @brief the mutex for the task list
392     FXMutex myMutex;
393     /// @brief the semaphore when waiting for new tasks
394     FXCondition myCondition;
395     /// @brief the list of pending tasks
396     std::list<Task*> myTasks;
397     /// @brief the list of tasks which are currently calculated
398     std::list<Task*> myCurrentTasks;
399     /// @brief whether we are still running
400     bool myStopped;
401 #ifdef WORKLOAD_PROFILING
402     /// @brief counting completed tasks
403     int myCounter;
404     /// @brief the time spent in calculations during the current batch
405     long long int myBusyTime;
406     /// @brief the total busy time
407     long long int myTotalBusyTime;
408     /// @brief the total time while anyone had tasks
409     long long int myTotalTime;
410 #endif
411 };
412 
413 
414 #endif
415