1 /** @file the_Foundation/threadpool.h  Thread pool.
2 
3 @authors Copyright (c) 2017 Jaakko Keränen <jaakko.keranen@iki.fi>
4 
5 @par License
6 
7 Redistribution and use in source and binary forms, with or without
8 modification, are permitted provided that the following conditions are met:
9 
10 1. Redistributions of source code must retain the above copyright notice, this
11    list of conditions and the following disclaimer.
12 2. Redistributions in binary form must reproduce the above copyright notice,
13    this list of conditions and the following disclaimer in the documentation
14    and/or other materials provided with the distribution.
15 
16 <small>THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20 ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.</small>
26 */
27 
28 #include "the_Foundation/threadpool.h"
29 
30 void finish_Thread_(iThread *); // thread.c
31 
32 iDeclareClass(PooledThread)
33 
34 struct Impl_PooledThread {
35     iThread thread;
36     iThreadPool *pool;
37 };
38 
run_PooledThread_(iThread * thread)39 static iThreadResult run_PooledThread_(iThread *thread) {
40     iPooledThread *d = (iAny *) thread;
41     while (yield_ThreadPool(d->pool, 0.0)) { /* Keep going. */ }
42     return 0;
43 }
44 
init_PooledThread(iPooledThread * d,iThreadPool * pool)45 static void init_PooledThread(iPooledThread *d, iThreadPool *pool) {
46     init_Thread(&d->thread, run_PooledThread_);
47     setName_Thread(&d->thread, "PooledThread");
48     d->pool = pool;
49 }
50 
deinit_PooledThread(iPooledThread * d)51 static void deinit_PooledThread(iPooledThread *d) {
52     iUnused(d);
53 }
54 
iDefineSubclass(PooledThread,Thread)55 iDefineSubclass(PooledThread, Thread)
56 iDefineObjectConstructionArgs(PooledThread, (iThreadPool *pool), pool)
57 
58 iLocalDef void start_PooledThread(iPooledThread *d) { start_Thread(&d->thread); }
join_PooledThread(iPooledThread * d)59 iLocalDef void join_PooledThread (iPooledThread *d) { join_Thread(&d->thread); }
60 
61 /*-------------------------------------------------------------------------------------*/
62 
63 iDefineClass(ThreadPool)
iDefineObjectConstruction(ThreadPool)64 iDefineObjectConstruction(ThreadPool)
65 
66 static void startThreads_ThreadPool_(iThreadPool *d, int minThreads, int reservedCores) {
67     const int count = iMaxi(iMaxi(1, minThreads), idealConcurrentCount_Thread() - reservedCores);
68     for (int i = 0; i < count; ++i) {
69         iPooledThread *pt = new_PooledThread(d);
70         pushBack_ObjectList(d->threads, pt);
71         start_PooledThread(pt);
72         iRelease(pt);
73     }
74 }
75 
stopThreads_ThreadPool_(iThreadPool * d)76 static void stopThreads_ThreadPool_(iThreadPool *d) {
77     for (size_t count = size_ObjectList(d->threads); count; count--) {
78         put_Queue(&d->queue, d);
79     }
80     iForEach(ObjectList, i, d->threads) {
81         join_PooledThread((iPooledThread *) i.value->object);
82         remove_ObjectListIterator(&i);
83     }
84 }
85 
newLimits_ThreadPool(int minThreads,int reservedCores)86 iThreadPool *newLimits_ThreadPool(int minThreads, int reservedCores) {
87     iThreadPool *d = iNew(ThreadPool);
88     initLimits_ThreadPool(d, minThreads, reservedCores);
89     return d;
90 }
91 
init_ThreadPool(iThreadPool * d)92 void init_ThreadPool(iThreadPool *d) {
93     initLimits_ThreadPool(d, 0, 0);
94 }
95 
initLimits_ThreadPool(iThreadPool * d,int minThreads,int reservedCores)96 void initLimits_ThreadPool(iThreadPool *d, int minThreads, int reservedCores) {
97     init_Queue(&d->queue);
98     d->threads = new_ObjectList();
99     startThreads_ThreadPool_(d, minThreads, reservedCores);
100 }
101 
deinit_ThreadPool(iThreadPool * d)102 void deinit_ThreadPool(iThreadPool *d) {
103     stopThreads_ThreadPool_(d);
104     iRelease(d->threads);
105     deinit_Queue(&d->queue);
106 }
107 
run_ThreadPool(iThreadPool * d,iThread * thread)108 iThread *run_ThreadPool(iThreadPool *d, iThread *thread) {
109     if (thread) {
110         put_Queue(&d->queue, thread);
111     }
112     return thread;
113 }
114 
yield_ThreadPool(iThreadPool * d,double timeoutSeconds)115 iBool yield_ThreadPool(iThreadPool *d, double timeoutSeconds) {
116     iThread *job = NULL;
117     if (timeoutSeconds <= 0.0) {
118         job = (iAny *) take_Queue(&d->queue);
119     }
120     else {
121         job = (iAny *) takeTimeout_Queue(&d->queue, timeoutSeconds);
122     }
123     if (job == NULL || job == (void *) d) {
124         /* Terminated. */
125         return iFalse;
126     }
127     /* Run in the calling thread. */
128     iAssert(job->state == created_ThreadState);
129     iGuardMutex(&job->mutex, job->state = running_ThreadState);
130     job->result = job->run(job);
131     finish_Thread_(job);
132     iRelease(job);
133     return iTrue;
134 }
135