1 /* MtDec.c -- Multi-thread Decoder
2 2018-03-02 : Igor Pavlov : Public domain */
3
4 #include "Precomp.h"
5
6 // #define SHOW_DEBUG_INFO
7
8 // #include <stdio.h>
9
10 #ifdef SHOW_DEBUG_INFO
11 #include <stdio.h>
12 #endif
13
14 #ifdef SHOW_DEBUG_INFO
15 #define PRF(x) x
16 #else
17 #define PRF(x)
18 #endif
19
20 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
21
22 #include "MtDec.h"
23
24 #ifndef _7ZIP_ST
25
MtProgress_Init(CMtProgress * p,ICompressProgress * progress)26 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
27 {
28 p->progress = progress;
29 p->res = SZ_OK;
30 p->totalInSize = 0;
31 p->totalOutSize = 0;
32 }
33
34
MtProgress_Progress_ST(CMtProgress * p)35 SRes MtProgress_Progress_ST(CMtProgress *p)
36 {
37 if (p->res == SZ_OK && p->progress)
38 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
39 p->res = SZ_ERROR_PROGRESS;
40 return p->res;
41 }
42
43
MtProgress_ProgressAdd(CMtProgress * p,UInt64 inSize,UInt64 outSize)44 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
45 {
46 SRes res;
47 CriticalSection_Enter(&p->cs);
48
49 p->totalInSize += inSize;
50 p->totalOutSize += outSize;
51 if (p->res == SZ_OK && p->progress)
52 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
53 p->res = SZ_ERROR_PROGRESS;
54 res = p->res;
55
56 CriticalSection_Leave(&p->cs);
57 return res;
58 }
59
60
MtProgress_GetError(CMtProgress * p)61 SRes MtProgress_GetError(CMtProgress *p)
62 {
63 SRes res;
64 CriticalSection_Enter(&p->cs);
65 res = p->res;
66 CriticalSection_Leave(&p->cs);
67 return res;
68 }
69
70
MtProgress_SetError(CMtProgress * p,SRes res)71 void MtProgress_SetError(CMtProgress *p, SRes res)
72 {
73 CriticalSection_Enter(&p->cs);
74 if (p->res == SZ_OK)
75 p->res = res;
76 CriticalSection_Leave(&p->cs);
77 }
78
79
80 #define RINOK_THREAD(x) RINOK(x)
81
82
ArEvent_OptCreate_And_Reset(CEvent * p)83 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
84 {
85 if (Event_IsCreated(p))
86 return Event_Reset(p);
87 return AutoResetEvent_CreateNotSignaled(p);
88 }
89
90
91
92 typedef struct
93 {
94 void *next;
95 void *pad[3];
96 } CMtDecBufLink;
97
98 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
99 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
100
101
102
103 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
104
105
MtDecThread_CreateEvents(CMtDecThread * t)106 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
107 {
108 WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
109 if (wres == 0)
110 {
111 wres = ArEvent_OptCreate_And_Reset(&t->canRead);
112 if (wres == 0)
113 return SZ_OK;
114 }
115 return wres;
116 }
117
118
MtDecThread_CreateAndStart(CMtDecThread * t)119 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
120 {
121 WRes wres = MtDecThread_CreateEvents(t);
122 // wres = 17; // for test
123 if (wres == 0)
124 {
125 if (Thread_WasCreated(&t->thread))
126 return SZ_OK;
127 wres = Thread_Create(&t->thread, ThreadFunc, t);
128 if (wres == 0)
129 return SZ_OK;
130 }
131 return MY_SRes_HRESULT_FROM_WRes(wres);
132 }
133
134
MtDecThread_FreeInBufs(CMtDecThread * t)135 void MtDecThread_FreeInBufs(CMtDecThread *t)
136 {
137 if (t->inBuf)
138 {
139 void *link = t->inBuf;
140 t->inBuf = NULL;
141 do
142 {
143 void *next = ((CMtDecBufLink *)link)->next;
144 ISzAlloc_Free(t->mtDec->alloc, link);
145 link = next;
146 }
147 while (link);
148 }
149 }
150
151
MtDecThread_CloseThread(CMtDecThread * t)152 static void MtDecThread_CloseThread(CMtDecThread *t)
153 {
154 if (Thread_WasCreated(&t->thread))
155 {
156 Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
157 Event_Set(&t->canRead);
158 Thread_Wait(&t->thread);
159 Thread_Close(&t->thread);
160 }
161
162 Event_Close(&t->canRead);
163 Event_Close(&t->canWrite);
164 }
165
MtDec_CloseThreads(CMtDec * p)166 static void MtDec_CloseThreads(CMtDec *p)
167 {
168 unsigned i;
169 for (i = 0; i < MTDEC__THREADS_MAX; i++)
170 MtDecThread_CloseThread(&p->threads[i]);
171 }
172
MtDecThread_Destruct(CMtDecThread * t)173 static void MtDecThread_Destruct(CMtDecThread *t)
174 {
175 MtDecThread_CloseThread(t);
176 MtDecThread_FreeInBufs(t);
177 }
178
179
180
FullRead(ISeqInStream * stream,Byte * data,size_t * processedSize)181 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
182 {
183 size_t size = *processedSize;
184 *processedSize = 0;
185 while (size != 0)
186 {
187 size_t cur = size;
188 SRes res = ISeqInStream_Read(stream, data, &cur);
189 *processedSize += cur;
190 data += cur;
191 size -= cur;
192 RINOK(res);
193 if (cur == 0)
194 return SZ_OK;
195 }
196 return SZ_OK;
197 }
198
199
MtDec_GetError_Spec(CMtDec * p,UInt64 interruptIndex,Bool * wasInterrupted)200 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, Bool *wasInterrupted)
201 {
202 SRes res;
203 CriticalSection_Enter(&p->mtProgress.cs);
204 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
205 res = p->mtProgress.res;
206 CriticalSection_Leave(&p->mtProgress.cs);
207 return res;
208 }
209
MtDec_Progress_GetError_Spec(CMtDec * p,UInt64 inSize,UInt64 outSize,UInt64 interruptIndex,Bool * wasInterrupted)210 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, Bool *wasInterrupted)
211 {
212 SRes res;
213 CriticalSection_Enter(&p->mtProgress.cs);
214
215 p->mtProgress.totalInSize += inSize;
216 p->mtProgress.totalOutSize += outSize;
217 if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
218 if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
219 p->mtProgress.res = SZ_ERROR_PROGRESS;
220
221 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
222 res = p->mtProgress.res;
223
224 CriticalSection_Leave(&p->mtProgress.cs);
225
226 return res;
227 }
228
MtDec_Interrupt(CMtDec * p,UInt64 interruptIndex)229 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
230 {
231 CriticalSection_Enter(&p->mtProgress.cs);
232 if (!p->needInterrupt || interruptIndex < p->interruptIndex)
233 {
234 p->interruptIndex = interruptIndex;
235 p->needInterrupt = True;
236 }
237 CriticalSection_Leave(&p->mtProgress.cs);
238 }
239
MtDec_GetCrossBuff(CMtDec * p)240 Byte *MtDec_GetCrossBuff(CMtDec *p)
241 {
242 Byte *cr = p->crossBlock;
243 if (!cr)
244 {
245 cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
246 if (!cr)
247 return NULL;
248 p->crossBlock = cr;
249 }
250 return MTDEC__DATA_PTR_FROM_LINK(cr);
251 }
252
253
254 /*
255 ThreadFunc2() returns:
256 0 - in all normal cases (even for stream error or memory allocation error)
257 (!= 0) - WRes error return by system threading function
258 */
259
260 // #define MTDEC_ProgessStep (1 << 22)
261 #define MTDEC_ProgessStep (1 << 0)
262
ThreadFunc2(CMtDecThread * t)263 static WRes ThreadFunc2(CMtDecThread *t)
264 {
265 CMtDec *p = t->mtDec;
266
267 PRF_STR_INT("ThreadFunc2", t->index);
268
269 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
270
271 for (;;)
272 {
273 SRes res, codeRes;
274 Bool wasInterrupted, isAllocError, overflow, finish;
275 SRes threadingErrorSRes;
276 Bool needCode, needWrite, needContinue;
277
278 size_t inDataSize_Start;
279 UInt64 inDataSize;
280 // UInt64 inDataSize_Full;
281
282 UInt64 blockIndex;
283
284 UInt64 inPrev = 0;
285 UInt64 outPrev = 0;
286 UInt64 inCodePos;
287 UInt64 outCodePos;
288
289 Byte *afterEndData = NULL;
290 size_t afterEndData_Size = 0;
291
292 Bool canCreateNewThread = False;
293 // CMtDecCallbackInfo parse;
294 CMtDecThread *nextThread;
295
296 PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
297
298 RINOK_THREAD(Event_Wait(&t->canRead));
299 if (p->exitThread)
300 return 0;
301
302 PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
303
304 // if (t->index == 3) return 19; // for test
305
306 blockIndex = p->blockIndex++;
307
308 // PRF(printf("\ncanRead\n"))
309
310 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
311
312 finish = p->readWasFinished;
313 needCode = False;
314 needWrite = False;
315 isAllocError = False;
316 overflow = False;
317
318 inDataSize_Start = 0;
319 inDataSize = 0;
320 // inDataSize_Full = 0;
321
322 if (res == SZ_OK && !wasInterrupted)
323 {
324 // if (p->inStream)
325 {
326 CMtDecBufLink *prev = NULL;
327 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
328 size_t crossSize = p->crossEnd - p->crossStart;
329
330 PRF(printf("\ncrossSize = %d\n", crossSize));
331
332 for (;;)
333 {
334 if (!link)
335 {
336 link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
337 if (!link)
338 {
339 finish = True;
340 // p->allocError_for_Read_BlockIndex = blockIndex;
341 isAllocError = True;
342 break;
343 }
344 link->next = NULL;
345 if (prev)
346 {
347 // static unsigned g_num = 0;
348 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
349 prev->next = link;
350 }
351 else
352 t->inBuf = (void *)link;
353 }
354
355 {
356 Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
357 Byte *parseData = data;
358 size_t size;
359
360 if (crossSize != 0)
361 {
362 inDataSize = crossSize;
363 // inDataSize_Full = inDataSize;
364 inDataSize_Start = crossSize;
365 size = crossSize;
366 parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
367 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
368 (int)p->crossStart, (int)p->crossEnd, (int)finish));
369 }
370 else
371 {
372 size = p->inBufSize;
373
374 res = FullRead(p->inStream, data, &size);
375
376 // size = 10; // test
377
378 inDataSize += size;
379 // inDataSize_Full = inDataSize;
380 if (!prev)
381 inDataSize_Start = size;
382
383 p->readProcessed += size;
384 finish = (size != p->inBufSize);
385 if (finish)
386 p->readWasFinished = True;
387
388 // res = E_INVALIDARG; // test
389
390 if (res != SZ_OK)
391 {
392 // PRF(printf("\nRead error = %d\n", res))
393 // we want to decode all data before error
394 p->readRes = res;
395 // p->readError_BlockIndex = blockIndex;
396 p->readWasFinished = True;
397 finish = True;
398 res = SZ_OK;
399 // break;
400 }
401
402 if (inDataSize - inPrev >= MTDEC_ProgessStep)
403 {
404 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
405 if (res != SZ_OK || wasInterrupted)
406 break;
407 inPrev = inDataSize;
408 }
409 }
410
411 {
412 CMtDecCallbackInfo parse;
413
414 parse.startCall = (prev == NULL);
415 parse.src = parseData;
416 parse.srcSize = size;
417 parse.srcFinished = finish;
418 parse.canCreateNewThread = True;
419
420 // PRF(printf("\nParse size = %d\n", (unsigned)size))
421
422 p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
423
424 needWrite = True;
425 canCreateNewThread = parse.canCreateNewThread;
426
427 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
428
429 if (
430 // parseRes != SZ_OK ||
431 // inDataSize - (size - parse.srcSize) > p->inBlockMax
432 // ||
433 parse.state == MTDEC_PARSE_OVERFLOW
434 // || wasInterrupted
435 )
436 {
437 // Overflow or Parse error - switch from MT decoding to ST decoding
438 finish = True;
439 overflow = True;
440
441 {
442 PRF(printf("\n Overflow"));
443 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
444 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
445 }
446
447 if (crossSize != 0)
448 memcpy(data, parseData, size);
449 p->crossStart = 0;
450 p->crossEnd = 0;
451 break;
452 }
453
454 if (crossSize != 0)
455 {
456 memcpy(data, parseData, parse.srcSize);
457 p->crossStart += parse.srcSize;
458 }
459
460 if (parse.state != MTDEC_PARSE_CONTINUE || finish)
461 {
462 // we don't need to parse in current thread anymore
463
464 if (parse.state == MTDEC_PARSE_END)
465 finish = True;
466
467 needCode = True;
468 // p->crossFinished = finish;
469
470 if (parse.srcSize == size)
471 {
472 // full parsed - no cross transfer
473 p->crossStart = 0;
474 p->crossEnd = 0;
475 break;
476 }
477
478 if (parse.state == MTDEC_PARSE_END)
479 {
480 p->crossStart = 0;
481 p->crossEnd = 0;
482
483 if (crossSize != 0)
484 memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
485 afterEndData_Size = size - parse.srcSize;
486 afterEndData = parseData + parse.srcSize;
487
488 // we reduce data size to required bytes (parsed only)
489 inDataSize -= (size - parse.srcSize);
490 if (!prev)
491 inDataSize_Start = parse.srcSize;
492 break;
493 }
494
495 {
496 // partial parsed - need cross transfer
497 if (crossSize != 0)
498 inDataSize = parse.srcSize; // it's only parsed now
499 else
500 {
501 // partial parsed - is not in initial cross block - we need to copy new data to cross block
502 Byte *cr = MtDec_GetCrossBuff(p);
503 if (!cr)
504 {
505 {
506 PRF(printf("\ncross alloc error error\n"));
507 // res = SZ_ERROR_MEM;
508 finish = True;
509 // p->allocError_for_Read_BlockIndex = blockIndex;
510 isAllocError = True;
511 break;
512 }
513 }
514
515 {
516 size_t crSize = size - parse.srcSize;
517 inDataSize -= crSize;
518 p->crossEnd = crSize;
519 p->crossStart = 0;
520 memcpy(cr, parseData + parse.srcSize, crSize);
521 }
522 }
523
524 // inDataSize_Full = inDataSize;
525 if (!prev)
526 inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
527
528 finish = False;
529 break;
530 }
531 }
532
533 if (parse.srcSize != size)
534 {
535 res = SZ_ERROR_FAIL;
536 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
537 break;
538 }
539 }
540 }
541
542 prev = link;
543 link = link->next;
544
545 if (crossSize != 0)
546 {
547 crossSize = 0;
548 p->crossStart = 0;
549 p->crossEnd = 0;
550 }
551 }
552 }
553
554 if (res == SZ_OK)
555 res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
556 }
557
558 codeRes = SZ_OK;
559
560 if (res == SZ_OK && needCode && !wasInterrupted)
561 {
562 codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
563 if (codeRes != SZ_OK)
564 {
565 needCode = False;
566 finish = True;
567 // SZ_ERROR_MEM is expected error here.
568 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
569 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
570 }
571 }
572
573 if (res != SZ_OK || wasInterrupted)
574 finish = True;
575
576 nextThread = NULL;
577 threadingErrorSRes = SZ_OK;
578
579 if (!finish)
580 {
581 if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
582 {
583 SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
584 if (res2 == SZ_OK)
585 {
586 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
587 p->numStartedThreads++;
588 }
589 else
590 {
591 PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
592 if (p->numStartedThreads == 1)
593 {
594 // if only one thread is possible, we leave muti-threading code
595 finish = True;
596 needCode = False;
597 threadingErrorSRes = res2;
598 }
599 else
600 p->numStartedThreads_Limit = p->numStartedThreads;
601 }
602 }
603
604 if (!finish)
605 {
606 unsigned nextIndex = t->index + 1;
607 nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
608 RINOK_THREAD(Event_Set(&nextThread->canRead))
609 // We have started executing for new iteration (with next thread)
610 // And that next thread now is responsible for possible exit from decoding (threading_code)
611 }
612 }
613
614 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
615 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
616 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
617 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
618 // - otherwise we stop decoding and exit from ThreadFunc2()
619
620 // Don't change (finish) variable in the further code
621
622
623 // ---------- CODE ----------
624
625 inPrev = 0;
626 outPrev = 0;
627 inCodePos = 0;
628 outCodePos = 0;
629
630 if (res == SZ_OK && needCode && codeRes == SZ_OK)
631 {
632 Bool isStartBlock = True;
633 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
634
635 for (;;)
636 {
637 size_t inSize;
638 int stop;
639
640 if (isStartBlock)
641 inSize = inDataSize_Start;
642 else
643 {
644 UInt64 rem = inDataSize - inCodePos;
645 inSize = p->inBufSize;
646 if (inSize > rem)
647 inSize = (size_t)rem;
648 }
649
650 inCodePos += inSize;
651 stop = True;
652
653 codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
654 (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
655 (inCodePos == inDataSize), // srcFinished
656 &inCodePos, &outCodePos, &stop);
657
658 if (codeRes != SZ_OK)
659 {
660 PRF(printf("\nCode Interrupt error = %x\n", codeRes));
661 // we interrupt only later blocks
662 MtDec_Interrupt(p, blockIndex);
663 break;
664 }
665
666 if (stop || inCodePos == inDataSize)
667 break;
668
669 {
670 const UInt64 inDelta = inCodePos - inPrev;
671 const UInt64 outDelta = outCodePos - outPrev;
672 if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
673 {
674 // Sleep(1);
675 res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
676 if (res != SZ_OK || wasInterrupted)
677 break;
678 inPrev = inCodePos;
679 outPrev = outCodePos;
680 }
681 }
682
683 link = link->next;
684 isStartBlock = False;
685 }
686 }
687
688
689 // ---------- WRITE ----------
690
691 RINOK_THREAD(Event_Wait(&t->canWrite));
692
693 {
694 Bool isErrorMode = False;
695 Bool canRecode = True;
696 Bool needWriteToStream = needWrite;
697
698 if (p->exitThread) return 0; // it's never executed in normal cases
699
700 if (p->wasInterrupted)
701 wasInterrupted = True;
702 else
703 {
704 if (codeRes != SZ_OK) // || !needCode // check it !!!
705 {
706 p->wasInterrupted = True;
707 p->codeRes = codeRes;
708 if (codeRes == SZ_ERROR_MEM)
709 isAllocError = True;
710 }
711
712 if (threadingErrorSRes)
713 {
714 p->wasInterrupted = True;
715 p->threadingErrorSRes = threadingErrorSRes;
716 needWriteToStream = False;
717 }
718 if (isAllocError)
719 {
720 p->wasInterrupted = True;
721 p->isAllocError = True;
722 needWriteToStream = False;
723 }
724 if (overflow)
725 {
726 p->wasInterrupted = True;
727 p->overflow = True;
728 needWriteToStream = False;
729 }
730 }
731
732 if (needCode)
733 {
734 if (wasInterrupted)
735 {
736 inCodePos = 0;
737 outCodePos = 0;
738 }
739 {
740 const UInt64 inDelta = inCodePos - inPrev;
741 const UInt64 outDelta = outCodePos - outPrev;
742 // if (inDelta != 0 || outDelta != 0)
743 res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
744 }
745 }
746
747 needContinue = (!finish);
748
749 // if (res == SZ_OK && needWrite && !wasInterrupted)
750 if (needWrite)
751 {
752 // p->inProcessed += inCodePos;
753
754 res = p->mtCallback->Write(p->mtCallbackObject, t->index,
755 res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
756 afterEndData, afterEndData_Size,
757 &needContinue,
758 &canRecode);
759
760 // res= E_INVALIDARG; // for test
761
762 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
763 PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
764
765 if (res != SZ_OK)
766 {
767 PRF(printf("\nWrite error = %d\n", res));
768 isErrorMode = True;
769 p->wasInterrupted = True;
770 }
771 if (res != SZ_OK
772 || (!needContinue && !finish))
773 {
774 PRF(printf("\nWrite Interrupt error = %x\n", res));
775 MtDec_Interrupt(p, blockIndex);
776 }
777 }
778
779 if (canRecode)
780 if (!needCode
781 || res != SZ_OK
782 || p->wasInterrupted
783 || codeRes != SZ_OK
784 || wasInterrupted
785 || p->numFilledThreads != 0
786 || isErrorMode)
787 {
788 if (p->numFilledThreads == 0)
789 p->filledThreadStart = t->index;
790 if (inDataSize != 0 || !finish)
791 {
792 t->inDataSize_Start = inDataSize_Start;
793 t->inDataSize = inDataSize;
794 p->numFilledThreads++;
795 }
796 PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
797 PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
798 }
799
800 if (!finish)
801 {
802 RINOK_THREAD(Event_Set(&nextThread->canWrite));
803 }
804 else
805 {
806 if (needContinue)
807 {
808 // we restore decoding with new iteration
809 RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
810 }
811 else
812 {
813 // we exit from decoding
814 if (t->index == 0)
815 return SZ_OK;
816 p->exitThread = True;
817 }
818 RINOK_THREAD(Event_Set(&p->threads[0].canRead));
819 }
820 }
821 }
822 }
823
824 #ifdef _WIN32
825 #define USE_ALLOCA
826 #endif
827
828 #ifdef USE_ALLOCA
829 #ifdef _WIN32
830 #include <malloc.h>
831 #else
832 #include <stdlib.h>
833 #endif
834 #endif
835
836
ThreadFunc1(void * pp)837 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
838 {
839 WRes res;
840
841 CMtDecThread *t = (CMtDecThread *)pp;
842 CMtDec *p;
843
844 // fprintf(stdout, "\n%d = %p\n", t->index, &t);
845
846 res = ThreadFunc2(t);
847 p = t->mtDec;
848 if (res == 0)
849 return p->exitThreadWRes;
850 {
851 // it's unexpected situation for some threading function error
852 if (p->exitThreadWRes == 0)
853 p->exitThreadWRes = res;
854 PRF(printf("\nthread exit error = %d\n", res));
855 p->exitThread = True;
856 Event_Set(&p->threads[0].canRead);
857 Event_Set(&p->threads[0].canWrite);
858 MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
859 }
860 return res;
861 }
862
ThreadFunc(void * pp)863 static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
864 {
865 CMtDecThread *t = (CMtDecThread *)pp;
866
867 // fprintf(stderr, "\n%d = %p - before", t->index, &t);
868 #ifdef USE_ALLOCA
869 t->allocaPtr = alloca(t->index * 128);
870 #endif
871 return ThreadFunc1(pp);
872 }
873
874
MtDec_PrepareRead(CMtDec * p)875 int MtDec_PrepareRead(CMtDec *p)
876 {
877 if (p->crossBlock && p->crossStart == p->crossEnd)
878 {
879 ISzAlloc_Free(p->alloc, p->crossBlock);
880 p->crossBlock = NULL;
881 }
882
883 {
884 unsigned i;
885 for (i = 0; i < MTDEC__THREADS_MAX; i++)
886 if (i > p->numStartedThreads
887 || p->numFilledThreads <=
888 (i >= p->filledThreadStart ?
889 i - p->filledThreadStart :
890 i + p->numStartedThreads - p->filledThreadStart))
891 MtDecThread_FreeInBufs(&p->threads[i]);
892 }
893
894 return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
895 }
896
897
MtDec_Read(CMtDec * p,size_t * inLim)898 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
899 {
900 while (p->numFilledThreads != 0)
901 {
902 CMtDecThread *t = &p->threads[p->filledThreadStart];
903
904 if (*inLim != 0)
905 {
906 {
907 void *link = t->inBuf;
908 void *next = ((CMtDecBufLink *)link)->next;
909 ISzAlloc_Free(p->alloc, link);
910 t->inBuf = next;
911 }
912
913 if (t->inDataSize == 0)
914 {
915 MtDecThread_FreeInBufs(t);
916 if (--p->numFilledThreads == 0)
917 break;
918 if (++p->filledThreadStart == p->numStartedThreads)
919 p->filledThreadStart = 0;
920 t = &p->threads[p->filledThreadStart];
921 }
922 }
923
924 {
925 size_t lim = t->inDataSize_Start;
926 if (lim != 0)
927 t->inDataSize_Start = 0;
928 else
929 {
930 UInt64 rem = t->inDataSize;
931 lim = p->inBufSize;
932 if (lim > rem)
933 lim = (size_t)rem;
934 }
935 t->inDataSize -= lim;
936 *inLim = lim;
937 return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
938 }
939 }
940
941 {
942 size_t crossSize = p->crossEnd - p->crossStart;
943 if (crossSize != 0)
944 {
945 const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
946 *inLim = crossSize;
947 p->crossStart = 0;
948 p->crossEnd = 0;
949 return data;
950 }
951 *inLim = 0;
952 if (p->crossBlock)
953 {
954 ISzAlloc_Free(p->alloc, p->crossBlock);
955 p->crossBlock = NULL;
956 }
957 return NULL;
958 }
959 }
960
961
MtDec_Construct(CMtDec * p)962 void MtDec_Construct(CMtDec *p)
963 {
964 unsigned i;
965
966 p->inBufSize = (size_t)1 << 18;
967
968 p->numThreadsMax = 0;
969
970 p->inStream = NULL;
971
972 // p->inData = NULL;
973 // p->inDataSize = 0;
974
975 p->crossBlock = NULL;
976 p->crossStart = 0;
977 p->crossEnd = 0;
978
979 p->numFilledThreads = 0;
980
981 p->progress = NULL;
982 p->alloc = NULL;
983
984 p->mtCallback = NULL;
985 p->mtCallbackObject = NULL;
986
987 p->allocatedBufsSize = 0;
988
989 for (i = 0; i < MTDEC__THREADS_MAX; i++)
990 {
991 CMtDecThread *t = &p->threads[i];
992 t->mtDec = p;
993 t->index = i;
994 t->inBuf = NULL;
995 Event_Construct(&t->canRead);
996 Event_Construct(&t->canWrite);
997 Thread_Construct(&t->thread);
998 }
999
1000 // Event_Construct(&p->finishedEvent);
1001
1002 CriticalSection_Init(&p->mtProgress.cs);
1003 }
1004
1005
MtDec_Free(CMtDec * p)1006 static void MtDec_Free(CMtDec *p)
1007 {
1008 unsigned i;
1009
1010 p->exitThread = True;
1011
1012 for (i = 0; i < MTDEC__THREADS_MAX; i++)
1013 MtDecThread_Destruct(&p->threads[i]);
1014
1015 // Event_Close(&p->finishedEvent);
1016
1017 if (p->crossBlock)
1018 {
1019 ISzAlloc_Free(p->alloc, p->crossBlock);
1020 p->crossBlock = NULL;
1021 }
1022 }
1023
1024
MtDec_Destruct(CMtDec * p)1025 void MtDec_Destruct(CMtDec *p)
1026 {
1027 MtDec_Free(p);
1028
1029 CriticalSection_Delete(&p->mtProgress.cs);
1030 }
1031
1032
MtDec_Code(CMtDec * p)1033 SRes MtDec_Code(CMtDec *p)
1034 {
1035 unsigned i;
1036
1037 p->inProcessed = 0;
1038
1039 p->blockIndex = 1; // it must be larger than not_defined index (0)
1040 p->isAllocError = False;
1041 p->overflow = False;
1042 p->threadingErrorSRes = SZ_OK;
1043
1044 p->needContinue = True;
1045
1046 p->readWasFinished = False;
1047 p->needInterrupt = False;
1048 p->interruptIndex = (UInt64)(Int64)-1;
1049
1050 p->readProcessed = 0;
1051 p->readRes = SZ_OK;
1052 p->codeRes = SZ_OK;
1053 p->wasInterrupted = False;
1054
1055 p->crossStart = 0;
1056 p->crossEnd = 0;
1057
1058 p->filledThreadStart = 0;
1059 p->numFilledThreads = 0;
1060
1061 {
1062 unsigned numThreads = p->numThreadsMax;
1063 if (numThreads > MTDEC__THREADS_MAX)
1064 numThreads = MTDEC__THREADS_MAX;
1065 p->numStartedThreads_Limit = numThreads;
1066 p->numStartedThreads = 0;
1067 }
1068
1069 if (p->inBufSize != p->allocatedBufsSize)
1070 {
1071 for (i = 0; i < MTDEC__THREADS_MAX; i++)
1072 {
1073 CMtDecThread *t = &p->threads[i];
1074 if (t->inBuf)
1075 MtDecThread_FreeInBufs(t);
1076 }
1077 if (p->crossBlock)
1078 {
1079 ISzAlloc_Free(p->alloc, p->crossBlock);
1080 p->crossBlock = NULL;
1081 }
1082
1083 p->allocatedBufsSize = p->inBufSize;
1084 }
1085
1086 MtProgress_Init(&p->mtProgress, p->progress);
1087
1088 // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
1089 p->exitThread = False;
1090 p->exitThreadWRes = 0;
1091
1092 {
1093 WRes wres;
1094 WRes sres;
1095 CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1096 // wres = MtDecThread_CreateAndStart(nextThread);
1097 wres = MtDecThread_CreateEvents(nextThread);
1098 if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1099 if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1100 if (wres == 0) { wres = ThreadFunc(nextThread);
1101 if (wres != 0)
1102 {
1103 p->needContinue = False;
1104 MtDec_CloseThreads(p);
1105 }}}}
1106
1107 // wres = 17; // for test
1108 // wres = Event_Wait(&p->finishedEvent);
1109
1110 sres = MY_SRes_HRESULT_FROM_WRes(wres);
1111
1112 if (sres != 0)
1113 p->threadingErrorSRes = sres;
1114
1115 if (
1116 // wres == 0
1117 // wres != 0
1118 // || p->mtc.codeRes == SZ_ERROR_MEM
1119 p->isAllocError
1120 || p->threadingErrorSRes != SZ_OK
1121 || p->overflow)
1122 {
1123 // p->needContinue = True;
1124 }
1125 else
1126 p->needContinue = False;
1127
1128 if (p->needContinue)
1129 return SZ_OK;
1130
1131 // if (sres != SZ_OK)
1132 return sres;
1133 // return E_FAIL;
1134 }
1135 }
1136
1137 #endif
1138