1 #include "rar.hpp"
2
3 #ifdef RAR_SMP
4 #include "threadmisc.cpp"
5
6 #ifdef _WIN_ALL
7 int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
8 #endif
9
ThreadPool(uint MaxThreads)10 ThreadPool::ThreadPool(uint MaxThreads)
11 {
12 MaxAllowedThreads = MaxThreads;
13 if (MaxAllowedThreads>MaxPoolThreads)
14 MaxAllowedThreads=MaxPoolThreads;
15 if (MaxAllowedThreads==0)
16 MaxAllowedThreads=1;
17
18 ThreadsCreatedCount=0;
19
20 // If we have more threads than queue size, we'll hang on pool destroying,
21 // not releasing all waiting threads.
22 if (MaxAllowedThreads>ASIZE(TaskQueue))
23 MaxAllowedThreads=ASIZE(TaskQueue);
24
25 Closing=false;
26
27 bool Success = CriticalSectionCreate(&CritSection);
28 #ifdef _WIN_ALL
29 QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
30 NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
31 Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
32 #elif defined(_UNIX)
33 AnyActive = false;
34 QueuedTasksCnt = 0;
35 Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
36 pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
37 pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
38 pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
39 #endif
40 if (!Success)
41 {
42 ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
43 ErrHandler.Exit(RARX_FATAL);
44 }
45
46 QueueTop = 0;
47 QueueBottom = 0;
48 ActiveThreads = 0;
49 }
50
51
~ThreadPool()52 ThreadPool::~ThreadPool()
53 {
54 WaitDone();
55 Closing=true;
56
57 #ifdef _WIN_ALL
58 ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
59 #elif defined(_UNIX)
60 // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
61 // so lock is required. We would occassionally hang without it.
62 pthread_mutex_lock(&QueuedTasksCntMutex);
63 QueuedTasksCnt+=ASIZE(TaskQueue);
64 pthread_mutex_unlock(&QueuedTasksCntMutex);
65
66 pthread_cond_broadcast(&QueuedTasksCntCond);
67 #endif
68
69 for(uint I=0;I<ThreadsCreatedCount;I++)
70 {
71 #ifdef _WIN_ALL
72 // Waiting until the thread terminates.
73 CWaitForSingleObject(ThreadHandles[I]);
74 #endif
75 // Close the thread handle. In Unix it results in pthread_join call,
76 // which also waits for thread termination.
77 ThreadClose(ThreadHandles[I]);
78 }
79
80 CriticalSectionDelete(&CritSection);
81 #ifdef _WIN_ALL
82 CloseHandle(QueuedTasksCnt);
83 CloseHandle(NoneActive);
84 #elif defined(_UNIX)
85 pthread_cond_destroy(&AnyActiveCond);
86 pthread_mutex_destroy(&AnyActiveMutex);
87 pthread_cond_destroy(&QueuedTasksCntCond);
88 pthread_mutex_destroy(&QueuedTasksCntMutex);
89 #endif
90 }
91
92
CreateThreads()93 void ThreadPool::CreateThreads()
94 {
95 for(uint I=0;I<MaxAllowedThreads;I++)
96 {
97 ThreadHandles[I] = ThreadCreate(PoolThread, this);
98 ThreadsCreatedCount++;
99 #ifdef _WIN_ALL
100 if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
101 SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
102 #endif
103 }
104 }
105
106
PoolThread(void * Param)107 NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
108 {
109 ((ThreadPool*)Param)->PoolThreadLoop();
110 return 0;
111 }
112
113
PoolThreadLoop()114 void ThreadPool::PoolThreadLoop()
115 {
116 QueueEntry Task;
117 while (GetQueuedTask(&Task))
118 {
119 Task.Proc(Task.Param);
120
121 CriticalSectionStart(&CritSection);
122 if (--ActiveThreads == 0)
123 {
124 #ifdef _WIN_ALL
125 SetEvent(NoneActive);
126 #elif defined(_UNIX)
127 pthread_mutex_lock(&AnyActiveMutex);
128 AnyActive=false;
129 pthread_cond_signal(&AnyActiveCond);
130 pthread_mutex_unlock(&AnyActiveMutex);
131 #endif
132 }
133 CriticalSectionEnd(&CritSection);
134 }
135 }
136
137
GetQueuedTask(QueueEntry * Task)138 bool ThreadPool::GetQueuedTask(QueueEntry *Task)
139 {
140 #ifdef _WIN_ALL
141 CWaitForSingleObject(QueuedTasksCnt);
142 #elif defined(_UNIX)
143 pthread_mutex_lock(&QueuedTasksCntMutex);
144 while (QueuedTasksCnt==0)
145 cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
146 QueuedTasksCnt--;
147 pthread_mutex_unlock(&QueuedTasksCntMutex);
148 #endif
149
150 if (Closing)
151 return false;
152
153 CriticalSectionStart(&CritSection);
154
155 *Task = TaskQueue[QueueBottom];
156 QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
157
158 CriticalSectionEnd(&CritSection);
159
160 return true;
161 }
162
163
164 // Add task to queue. We assume that it is always called from main thread,
165 // it allows to avoid any locks here. We process collected tasks only
166 // when WaitDone is called.
AddTask(PTHREAD_PROC Proc,void * Data)167 void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
168 {
169 if (ThreadsCreatedCount == 0)
170 CreateThreads();
171
172 // If queue is full, wait until it is empty.
173 if (ActiveThreads>=ASIZE(TaskQueue))
174 WaitDone();
175
176 TaskQueue[QueueTop].Proc = Proc;
177 TaskQueue[QueueTop].Param = Data;
178 QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
179 ActiveThreads++;
180 }
181
182
183 // Start queued tasks and wait until all threads are inactive.
184 // We assume that it is always called from main thread, when pool threads
185 // are sleeping yet.
WaitDone()186 void ThreadPool::WaitDone()
187 {
188 if (ActiveThreads==0)
189 return;
190 #ifdef _WIN_ALL
191 ResetEvent(NoneActive);
192 ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
193 CWaitForSingleObject(NoneActive);
194 #elif defined(_UNIX)
195 AnyActive=true;
196
197 // Threads reset AnyActive before accessing QueuedTasksCnt and even
198 // preceding WaitDone() call does not guarantee that some slow thread
199 // is not accessing QueuedTasksCnt now. So lock is necessary.
200 pthread_mutex_lock(&QueuedTasksCntMutex);
201 QueuedTasksCnt+=ActiveThreads;
202 pthread_mutex_unlock(&QueuedTasksCntMutex);
203
204 pthread_cond_broadcast(&QueuedTasksCntCond);
205
206 pthread_mutex_lock(&AnyActiveMutex);
207 while (AnyActive)
208 cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
209 pthread_mutex_unlock(&AnyActiveMutex);
210 #endif
211 }
212 #endif // RAR_SMP
213