1 #include "rar.hpp"
2 
3 #ifdef RAR_SMP
4 #include "threadmisc.cpp"
5 
6 #ifdef _WIN_ALL
7 int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
8 #endif
9 
ThreadPool(uint MaxThreads)10 ThreadPool::ThreadPool(uint MaxThreads)
11 {
12   MaxAllowedThreads = MaxThreads;
13   if (MaxAllowedThreads>MaxPoolThreads)
14     MaxAllowedThreads=MaxPoolThreads;
15   if (MaxAllowedThreads==0)
16     MaxAllowedThreads=1;
17 
18   ThreadsCreatedCount=0;
19 
20   // If we have more threads than queue size, we'll hang on pool destroying,
21   // not releasing all waiting threads.
22   if (MaxAllowedThreads>ASIZE(TaskQueue))
23     MaxAllowedThreads=ASIZE(TaskQueue);
24 
25   Closing=false;
26 
27   bool Success = CriticalSectionCreate(&CritSection);
28 #ifdef _WIN_ALL
29   QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
30   NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
31   Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
32 #elif defined(_UNIX)
33   AnyActive = false;
34   QueuedTasksCnt = 0;
35   Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
36           pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
37           pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
38           pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
39 #endif
40   if (!Success)
41   {
42     ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
43     ErrHandler.Exit(RARX_FATAL);
44   }
45 
46   QueueTop = 0;
47   QueueBottom = 0;
48   ActiveThreads = 0;
49 }
50 
51 
~ThreadPool()52 ThreadPool::~ThreadPool()
53 {
54   WaitDone();
55   Closing=true;
56 
57 #ifdef _WIN_ALL
58   ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
59 #elif defined(_UNIX)
60   // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
61   // so lock is required. We would occassionally hang without it.
62   pthread_mutex_lock(&QueuedTasksCntMutex);
63   QueuedTasksCnt+=ASIZE(TaskQueue);
64   pthread_mutex_unlock(&QueuedTasksCntMutex);
65 
66   pthread_cond_broadcast(&QueuedTasksCntCond);
67 #endif
68 
69   for(uint I=0;I<ThreadsCreatedCount;I++)
70   {
71 #ifdef _WIN_ALL
72     // Waiting until the thread terminates.
73     CWaitForSingleObject(ThreadHandles[I]);
74 #endif
75     // Close the thread handle. In Unix it results in pthread_join call,
76     // which also waits for thread termination.
77     ThreadClose(ThreadHandles[I]);
78   }
79 
80   CriticalSectionDelete(&CritSection);
81 #ifdef _WIN_ALL
82   CloseHandle(QueuedTasksCnt);
83   CloseHandle(NoneActive);
84 #elif defined(_UNIX)
85   pthread_cond_destroy(&AnyActiveCond);
86   pthread_mutex_destroy(&AnyActiveMutex);
87   pthread_cond_destroy(&QueuedTasksCntCond);
88   pthread_mutex_destroy(&QueuedTasksCntMutex);
89 #endif
90 }
91 
92 
CreateThreads()93 void ThreadPool::CreateThreads()
94 {
95   for(uint I=0;I<MaxAllowedThreads;I++)
96   {
97     ThreadHandles[I] = ThreadCreate(PoolThread, this);
98     ThreadsCreatedCount++;
99 #ifdef _WIN_ALL
100     if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
101       SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
102 #endif
103   }
104 }
105 
106 
PoolThread(void * Param)107 NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
108 {
109   ((ThreadPool*)Param)->PoolThreadLoop();
110   return 0;
111 }
112 
113 
PoolThreadLoop()114 void ThreadPool::PoolThreadLoop()
115 {
116   QueueEntry Task;
117   while (GetQueuedTask(&Task))
118   {
119     Task.Proc(Task.Param);
120 
121     CriticalSectionStart(&CritSection);
122     if (--ActiveThreads == 0)
123     {
124 #ifdef _WIN_ALL
125       SetEvent(NoneActive);
126 #elif defined(_UNIX)
127       pthread_mutex_lock(&AnyActiveMutex);
128       AnyActive=false;
129       pthread_cond_signal(&AnyActiveCond);
130       pthread_mutex_unlock(&AnyActiveMutex);
131 #endif
132     }
133     CriticalSectionEnd(&CritSection);
134   }
135 }
136 
137 
GetQueuedTask(QueueEntry * Task)138 bool ThreadPool::GetQueuedTask(QueueEntry *Task)
139 {
140 #ifdef _WIN_ALL
141   CWaitForSingleObject(QueuedTasksCnt);
142 #elif defined(_UNIX)
143   pthread_mutex_lock(&QueuedTasksCntMutex);
144   while (QueuedTasksCnt==0)
145     cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
146   QueuedTasksCnt--;
147   pthread_mutex_unlock(&QueuedTasksCntMutex);
148 #endif
149 
150   if (Closing)
151     return false;
152 
153   CriticalSectionStart(&CritSection);
154 
155   *Task = TaskQueue[QueueBottom];
156   QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
157 
158   CriticalSectionEnd(&CritSection);
159 
160   return true;
161 }
162 
163 
164 // Add task to queue. We assume that it is always called from main thread,
165 // it allows to avoid any locks here. We process collected tasks only
166 // when WaitDone is called.
AddTask(PTHREAD_PROC Proc,void * Data)167 void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
168 {
169   if (ThreadsCreatedCount == 0)
170     CreateThreads();
171 
172   // If queue is full, wait until it is empty.
173   if (ActiveThreads>=ASIZE(TaskQueue))
174     WaitDone();
175 
176   TaskQueue[QueueTop].Proc = Proc;
177   TaskQueue[QueueTop].Param = Data;
178   QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
179   ActiveThreads++;
180 }
181 
182 
183 // Start queued tasks and wait until all threads are inactive.
184 // We assume that it is always called from main thread, when pool threads
185 // are sleeping yet.
WaitDone()186 void ThreadPool::WaitDone()
187 {
188   if (ActiveThreads==0)
189     return;
190 #ifdef _WIN_ALL
191   ResetEvent(NoneActive);
192   ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
193   CWaitForSingleObject(NoneActive);
194 #elif defined(_UNIX)
195   AnyActive=true;
196 
197   // Threads reset AnyActive before accessing QueuedTasksCnt and even
198   // preceding WaitDone() call does not guarantee that some slow thread
199   // is not accessing QueuedTasksCnt now. So lock is necessary.
200   pthread_mutex_lock(&QueuedTasksCntMutex);
201   QueuedTasksCnt+=ActiveThreads;
202   pthread_mutex_unlock(&QueuedTasksCntMutex);
203 
204   pthread_cond_broadcast(&QueuedTasksCntCond);
205 
206   pthread_mutex_lock(&AnyActiveMutex);
207   while (AnyActive)
208     cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
209   pthread_mutex_unlock(&AnyActiveMutex);
210 #endif
211 }
212 #endif // RAR_SMP
213