1 /* MtCoder.h -- Multi-thread Coder
2 2017-06-18 : Igor Pavlov : Public domain */
3 
4 #ifndef __MT_CODER_H
5 #define __MT_CODER_H
6 
7 #include "Threads.h"
8 
9 EXTERN_C_BEGIN
10 
11 /*
12   if (    defined MTCODER__USE_WRITE_THREAD) : main thread writes all data blocks to output stream
13   if (not defined MTCODER__USE_WRITE_THREAD) : any coder thread can write data blocks to output stream
14 */
15 /* #define MTCODER__USE_WRITE_THREAD */
16 
17 #ifndef _7ZIP_ST
18   #define MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads) ((numThreads) + (numThreads) / 8 + 1)
19   #define MTCODER__THREADS_MAX 64
20   #define MTCODER__BLOCKS_MAX (MTCODER__GET_NUM_BLOCKS_FROM_THREADS(MTCODER__THREADS_MAX) + 3)
21 #else
22   #define MTCODER__THREADS_MAX 1
23   #define MTCODER__BLOCKS_MAX 1
24 #endif
25 
26 
27 typedef struct
28 {
29   UInt64 inSize;
30   UInt64 outSize;
31 } CMtProgressSizes;
32 
33 
34 typedef struct
35 {
36   ICompressProgress *progress;
37   SRes res;
38   UInt64 totalInSize;
39   UInt64 totalOutSize;
40   CCriticalSection cs;
41   CMtProgressSizes sizes[MTCODER__THREADS_MAX];
42 } CMtProgress;
43 
44 
45 typedef struct
46 {
47   ICompressProgress vt;
48   CMtProgress *mtProgress;
49   unsigned index;
50 } CMtProgressThunk;
51 
52 void MtProgressThunk_CreateVTable(CMtProgressThunk *p);
53 
54 
55 
56 struct _CMtCoder;
57 
58 
59 typedef struct
60 {
61   struct _CMtCoder *mtCoder;
62   unsigned index;
63   int stop;
64   Byte *inBuf;
65 
66   CAutoResetEvent startEvent;
67   CThread thread;
68 } CMtCoderThread;
69 
70 
71 typedef struct
72 {
73   SRes (*Code)(void *p, unsigned coderIndex, unsigned outBufIndex,
74       const Byte *src, size_t srcSize, int finished);
75   SRes (*Write)(void *p, unsigned outBufIndex);
76 } IMtCoderCallback2;
77 
78 
79 typedef struct
80 {
81   SRes res;
82   unsigned bufIndex;
83   Bool finished;
84 } CMtCoderBlock;
85 
86 
87 typedef struct _CMtCoder
88 {
89   /* input variables */
90 
91   size_t blockSize;        /* size of input block */
92   unsigned numThreadsMax;
93   UInt64 expectedDataSize;
94 
95   ISeqInStream *inStream;
96   const Byte *inData;
97   size_t inDataSize;
98 
99   ICompressProgress *progress;
100   ISzAllocPtr allocBig;
101 
102   IMtCoderCallback2 *mtCallback;
103   void *mtCallbackObject;
104 
105 
106   /* internal variables */
107 
108   size_t allocatedBufsSize;
109 
110   CAutoResetEvent readEvent;
111   CSemaphore blocksSemaphore;
112 
113   Bool stopReading;
114   SRes readRes;
115 
116   #ifdef MTCODER__USE_WRITE_THREAD
117     CAutoResetEvent writeEvents[MTCODER__BLOCKS_MAX];
118   #else
119     CAutoResetEvent finishedEvent;
120     SRes writeRes;
121     unsigned writeIndex;
122     Byte ReadyBlocks[MTCODER__BLOCKS_MAX];
123     LONG numFinishedThreads;
124   #endif
125 
126   unsigned numStartedThreadsLimit;
127   unsigned numStartedThreads;
128 
129   unsigned numBlocksMax;
130   unsigned blockIndex;
131   UInt64 readProcessed;
132 
133   CCriticalSection cs;
134 
135   unsigned freeBlockHead;
136   unsigned freeBlockList[MTCODER__BLOCKS_MAX];
137 
138   CMtProgress mtProgress;
139   CMtCoderBlock blocks[MTCODER__BLOCKS_MAX];
140   CMtCoderThread threads[MTCODER__THREADS_MAX];
141 } CMtCoder;
142 
143 
144 void MtCoder_Construct(CMtCoder *p);
145 void MtCoder_Destruct(CMtCoder *p);
146 SRes MtCoder_Code(CMtCoder *p);
147 
148 
149 EXTERN_C_END
150 
151 #endif
152