1 /* MtCoder.c -- Multi-thread Coder
2 2015-10-13 : Igor Pavlov : Public domain */
3 
4 #include "Precomp.h"
5 
6 #include "MtCoder.h"
7 
LoopThread_Construct(CLoopThread * p)8 void LoopThread_Construct(CLoopThread *p)
9 {
10   Thread_Construct(&p->thread);
11   Event_Construct(&p->startEvent);
12   Event_Construct(&p->finishedEvent);
13 }
14 
LoopThread_Close(CLoopThread * p)15 void LoopThread_Close(CLoopThread *p)
16 {
17   Thread_Close(&p->thread);
18   Event_Close(&p->startEvent);
19   Event_Close(&p->finishedEvent);
20 }
21 
LoopThreadFunc(void * pp)22 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
23 {
24   CLoopThread *p = (CLoopThread *)pp;
25   for (;;)
26   {
27     if (Event_Wait(&p->startEvent) != 0)
28       return SZ_ERROR_THREAD;
29     if (p->stop)
30       return 0;
31     p->res = p->func(p->param);
32     if (Event_Set(&p->finishedEvent) != 0)
33       return SZ_ERROR_THREAD;
34   }
35 }
36 
LoopThread_Create(CLoopThread * p)37 WRes LoopThread_Create(CLoopThread *p)
38 {
39   p->stop = 0;
40   RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
41   RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
42   return Thread_Create(&p->thread, LoopThreadFunc, p);
43 }
44 
LoopThread_StopAndWait(CLoopThread * p)45 WRes LoopThread_StopAndWait(CLoopThread *p)
46 {
47   p->stop = 1;
48   if (Event_Set(&p->startEvent) != 0)
49     return SZ_ERROR_THREAD;
50   return Thread_Wait(&p->thread);
51 }
52 
LoopThread_StartSubThread(CLoopThread * p)53 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
LoopThread_WaitSubThread(CLoopThread * p)54 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
55 
Progress(ICompressProgress * p,UInt64 inSize,UInt64 outSize)56 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
57 {
58   return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
59 }
60 
MtProgress_Init(CMtProgress * p,ICompressProgress * progress)61 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
62 {
63   unsigned i;
64   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
65     p->inSizes[i] = p->outSizes[i] = 0;
66   p->totalInSize = p->totalOutSize = 0;
67   p->progress = progress;
68   p->res = SZ_OK;
69 }
70 
MtProgress_Reinit(CMtProgress * p,unsigned index)71 static void MtProgress_Reinit(CMtProgress *p, unsigned index)
72 {
73   p->inSizes[index] = 0;
74   p->outSizes[index] = 0;
75 }
76 
77 #define UPDATE_PROGRESS(size, prev, total) \
78   if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
79 
MtProgress_Set(CMtProgress * p,unsigned index,UInt64 inSize,UInt64 outSize)80 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
81 {
82   SRes res;
83   CriticalSection_Enter(&p->cs);
84   UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
85   UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
86   if (p->res == SZ_OK)
87     p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
88   res = p->res;
89   CriticalSection_Leave(&p->cs);
90   return res;
91 }
92 
MtProgress_SetError(CMtProgress * p,SRes res)93 static void MtProgress_SetError(CMtProgress *p, SRes res)
94 {
95   CriticalSection_Enter(&p->cs);
96   if (p->res == SZ_OK)
97     p->res = res;
98   CriticalSection_Leave(&p->cs);
99 }
100 
MtCoder_SetError(CMtCoder * p,SRes res)101 static void MtCoder_SetError(CMtCoder* p, SRes res)
102 {
103   CriticalSection_Enter(&p->cs);
104   if (p->res == SZ_OK)
105     p->res = res;
106   CriticalSection_Leave(&p->cs);
107 }
108 
109 /* ---------- MtThread ---------- */
110 
CMtThread_Construct(CMtThread * p,CMtCoder * mtCoder)111 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
112 {
113   p->mtCoder = mtCoder;
114   p->outBuf = 0;
115   p->inBuf = 0;
116   Event_Construct(&p->canRead);
117   Event_Construct(&p->canWrite);
118   LoopThread_Construct(&p->thread);
119 }
120 
121 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
122 
CMtThread_CloseEvents(CMtThread * p)123 static void CMtThread_CloseEvents(CMtThread *p)
124 {
125   Event_Close(&p->canRead);
126   Event_Close(&p->canWrite);
127 }
128 
CMtThread_Destruct(CMtThread * p)129 static void CMtThread_Destruct(CMtThread *p)
130 {
131   CMtThread_CloseEvents(p);
132 
133   if (Thread_WasCreated(&p->thread.thread))
134   {
135     LoopThread_StopAndWait(&p->thread);
136     LoopThread_Close(&p->thread);
137   }
138 
139   if (p->mtCoder->alloc)
140     IAlloc_Free(p->mtCoder->alloc, p->outBuf);
141   p->outBuf = 0;
142 
143   if (p->mtCoder->alloc)
144     IAlloc_Free(p->mtCoder->alloc, p->inBuf);
145   p->inBuf = 0;
146 }
147 
148 #define MY_BUF_ALLOC(buf, size, newSize) \
149   if (buf == 0 || size != newSize) \
150   { IAlloc_Free(p->mtCoder->alloc, buf); \
151     size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
152     if (buf == 0) return SZ_ERROR_MEM; }
153 
CMtThread_Prepare(CMtThread * p)154 static SRes CMtThread_Prepare(CMtThread *p)
155 {
156   MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
157   MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
158 
159   p->stopReading = False;
160   p->stopWriting = False;
161   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
162   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
163 
164   return SZ_OK;
165 }
166 
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)167 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
168 {
169   size_t size = *processedSize;
170   *processedSize = 0;
171   while (size != 0)
172   {
173     size_t curSize = size;
174     SRes res = stream->Read(stream, data, &curSize);
175     *processedSize += curSize;
176     data += curSize;
177     size -= curSize;
178     RINOK(res);
179     if (curSize == 0)
180       return SZ_OK;
181   }
182   return SZ_OK;
183 }
184 
185 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads  - 1 ? 0 : p->index + 1]
186 
MtThread_Process(CMtThread * p,Bool * stop)187 static SRes MtThread_Process(CMtThread *p, Bool *stop)
188 {
189   CMtThread *next;
190   *stop = True;
191   if (Event_Wait(&p->canRead) != 0)
192     return SZ_ERROR_THREAD;
193 
194   next = GET_NEXT_THREAD(p);
195 
196   if (p->stopReading)
197   {
198     next->stopReading = True;
199     return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
200   }
201 
202   {
203     size_t size = p->mtCoder->blockSize;
204     size_t destSize = p->outBufSize;
205 
206     RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
207     next->stopReading = *stop = (size != p->mtCoder->blockSize);
208     if (Event_Set(&next->canRead) != 0)
209       return SZ_ERROR_THREAD;
210 
211     RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
212         p->outBuf, &destSize, p->inBuf, size, *stop));
213 
214     MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
215 
216     if (Event_Wait(&p->canWrite) != 0)
217       return SZ_ERROR_THREAD;
218     if (p->stopWriting)
219       return SZ_ERROR_FAIL;
220     if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
221       return SZ_ERROR_WRITE;
222     return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
223   }
224 }
225 
ThreadFunc(void * pp)226 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
227 {
228   CMtThread *p = (CMtThread *)pp;
229   for (;;)
230   {
231     Bool stop;
232     CMtThread *next = GET_NEXT_THREAD(p);
233     SRes res = MtThread_Process(p, &stop);
234     if (res != SZ_OK)
235     {
236       MtCoder_SetError(p->mtCoder, res);
237       MtProgress_SetError(&p->mtCoder->mtProgress, res);
238       next->stopReading = True;
239       next->stopWriting = True;
240       Event_Set(&next->canRead);
241       Event_Set(&next->canWrite);
242       return res;
243     }
244     if (stop)
245       return 0;
246   }
247 }
248 
MtCoder_Construct(CMtCoder * p)249 void MtCoder_Construct(CMtCoder* p)
250 {
251   unsigned i;
252   p->alloc = 0;
253   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
254   {
255     CMtThread *t = &p->threads[i];
256     t->index = i;
257     CMtThread_Construct(t, p);
258   }
259   CriticalSection_Init(&p->cs);
260   CriticalSection_Init(&p->mtProgress.cs);
261 }
262 
MtCoder_Destruct(CMtCoder * p)263 void MtCoder_Destruct(CMtCoder* p)
264 {
265   unsigned i;
266   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
267     CMtThread_Destruct(&p->threads[i]);
268   CriticalSection_Delete(&p->cs);
269   CriticalSection_Delete(&p->mtProgress.cs);
270 }
271 
MtCoder_Code(CMtCoder * p)272 SRes MtCoder_Code(CMtCoder *p)
273 {
274   unsigned i, numThreads = p->numThreads;
275   SRes res = SZ_OK;
276   p->res = SZ_OK;
277 
278   MtProgress_Init(&p->mtProgress, p->progress);
279 
280   for (i = 0; i < numThreads; i++)
281   {
282     RINOK(CMtThread_Prepare(&p->threads[i]));
283   }
284 
285   for (i = 0; i < numThreads; i++)
286   {
287     CMtThread *t = &p->threads[i];
288     CLoopThread *lt = &t->thread;
289 
290     if (!Thread_WasCreated(&lt->thread))
291     {
292       lt->func = ThreadFunc;
293       lt->param = t;
294 
295       if (LoopThread_Create(lt) != SZ_OK)
296       {
297         res = SZ_ERROR_THREAD;
298         break;
299       }
300     }
301   }
302 
303   if (res == SZ_OK)
304   {
305     unsigned j;
306     for (i = 0; i < numThreads; i++)
307     {
308       CMtThread *t = &p->threads[i];
309       if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
310       {
311         res = SZ_ERROR_THREAD;
312         p->threads[0].stopReading = True;
313         break;
314       }
315     }
316 
317     Event_Set(&p->threads[0].canWrite);
318     Event_Set(&p->threads[0].canRead);
319 
320     for (j = 0; j < i; j++)
321       LoopThread_WaitSubThread(&p->threads[j].thread);
322   }
323 
324   for (i = 0; i < numThreads; i++)
325     CMtThread_CloseEvents(&p->threads[i]);
326   return (res == SZ_OK) ? p->res : res;
327 }
328