1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1
18 #include "tbb/tbb_config.h"
19 #include "../../common/utility/utility.h"
20 
21 #if __TBB_PREVIEW_ASYNC_MSG && __TBB_CPP11_LAMBDAS_PRESENT
22 
23 #include <iostream>
24 #include <fstream>
25 #include <string>
26 #include <memory>
27 #include <queue>
28 #include <thread>
29 
30 #include "bzlib.h"
31 
32 #include "tbb/flow_graph.h"
33 #include "tbb/tick_count.h"
34 #include "tbb/concurrent_queue.h"
35 
36 // TODO: change memory allocation/deallocation to be managed in constructor/destructor
37 struct Buffer {
38     size_t len;
39     char* b;
40 };
41 
42 struct BufferMsg {
43 
BufferMsgBufferMsg44     BufferMsg() {}
BufferMsgBufferMsg45     BufferMsg(Buffer& inputBuffer, Buffer& outputBuffer, size_t seqId, bool isLast = false)
46         : inputBuffer(inputBuffer), outputBuffer(outputBuffer), seqId(seqId), isLast(isLast) {}
47 
createBufferMsgBufferMsg48     static BufferMsg createBufferMsg(size_t seqId, size_t chunkSize) {
49         Buffer inputBuffer;
50         inputBuffer.b = new char[chunkSize];
51         inputBuffer.len = chunkSize;
52 
53         Buffer outputBuffer;
54         size_t compressedChunkSize = chunkSize * 1.01 + 600; // compression overhead
55         outputBuffer.b = new char[compressedChunkSize];
56         outputBuffer.len = compressedChunkSize;
57 
58         return BufferMsg(inputBuffer, outputBuffer, seqId);
59     }
60 
destroyBufferMsgBufferMsg61     static void destroyBufferMsg(const BufferMsg& destroyMsg) {
62         delete[] destroyMsg.inputBuffer.b;
63         delete[] destroyMsg.outputBuffer.b;
64     }
65 
markLastBufferMsg66     void markLast(size_t lastId) {
67         isLast = true;
68         seqId = lastId;
69     }
70 
71     size_t seqId;
72     Buffer inputBuffer;
73     Buffer outputBuffer;
74     bool isLast;
75 };
76 
77 class BufferCompressor {
78 public:
79 
BufferCompressor(int blockSizeIn100KB)80     BufferCompressor(int blockSizeIn100KB) : m_blockSize(blockSizeIn100KB) {}
81 
operator ()(BufferMsg buffer) const82     BufferMsg operator()(BufferMsg buffer) const {
83         if (!buffer.isLast) {
84             unsigned int outSize = buffer.outputBuffer.len;
85             BZ2_bzBuffToBuffCompress(buffer.outputBuffer.b, &outSize,
86                 buffer.inputBuffer.b, buffer.inputBuffer.len,
87                 m_blockSize, 0, 30);
88             buffer.outputBuffer.len = outSize;
89         }
90         return buffer;
91     }
92 
93 private:
94     int m_blockSize;
95 };
96 
97 class IOOperations {
98 public:
99 
IOOperations(std::ifstream & inputStream,std::ofstream & outputStream,size_t chunkSize)100     IOOperations(std::ifstream& inputStream, std::ofstream& outputStream, size_t chunkSize)
101         : m_inputStream(inputStream), m_outputStream(outputStream), m_chunkSize(chunkSize), m_chunksRead(0) {}
102 
readChunk(Buffer & buffer)103     void readChunk(Buffer& buffer) {
104         m_inputStream.read(buffer.b, m_chunkSize);
105         buffer.len = static_cast<size_t>(m_inputStream.gcount());
106         m_chunksRead++;
107     }
108 
writeChunk(const Buffer & buffer)109     void writeChunk(const Buffer& buffer) {
110         m_outputStream.write(buffer.b, buffer.len);
111     }
112 
chunksRead() const113     size_t chunksRead() const {
114         return m_chunksRead;
115     }
116 
chunkSize() const117     size_t chunkSize() const {
118         return m_chunkSize;
119     }
120 
hasDataToRead() const121     bool hasDataToRead() const {
122         return m_inputStream.is_open() && !m_inputStream.eof();
123     }
124 
125 private:
126 
127     std::ifstream& m_inputStream;
128     std::ofstream& m_outputStream;
129 
130     size_t m_chunkSize;
131     size_t m_chunksRead;
132 };
133 
134 //-----------------------------------------------------------------------------------------------------------------------
135 //---------------------------------------Compression example based on async_node-----------------------------------------
136 //-----------------------------------------------------------------------------------------------------------------------
137 
138 typedef tbb::flow::async_node< tbb::flow::continue_msg, BufferMsg > async_file_reader_node;
139 typedef tbb::flow::async_node< BufferMsg, tbb::flow::continue_msg > async_file_writer_node;
140 
141 class AsyncNodeActivity {
142 public:
143 
AsyncNodeActivity(IOOperations & io)144     AsyncNodeActivity(IOOperations& io)
145         : m_io(io), m_fileWriterThread(&AsyncNodeActivity::writingLoop, this) {}
146 
~AsyncNodeActivity()147     ~AsyncNodeActivity() {
148         m_fileReaderThread.join();
149         m_fileWriterThread.join();
150     }
151 
submitRead(async_file_reader_node::gateway_type & gateway)152     void submitRead(async_file_reader_node::gateway_type& gateway) {
153         gateway.reserve_wait();
154         std::thread(&AsyncNodeActivity::readingLoop, this, std::ref(gateway)).swap(m_fileReaderThread);
155     }
156 
submitWrite(const BufferMsg & bufferMsg)157     void submitWrite(const BufferMsg& bufferMsg) {
158         m_writeQueue.push(bufferMsg);
159     }
160 
161 private:
162 
readingLoop(async_file_reader_node::gateway_type & gateway)163     void readingLoop(async_file_reader_node::gateway_type& gateway) {
164         while (m_io.hasDataToRead()) {
165             BufferMsg bufferMsg = BufferMsg::createBufferMsg(m_io.chunksRead(), m_io.chunkSize());
166             m_io.readChunk(bufferMsg.inputBuffer);
167             gateway.try_put(bufferMsg);
168         }
169         sendLastMessage(gateway);
170         gateway.release_wait();
171     }
172 
writingLoop()173     void writingLoop() {
174         BufferMsg buffer;
175         m_writeQueue.pop(buffer);
176         while (!buffer.isLast) {
177             m_io.writeChunk(buffer.outputBuffer);
178             m_writeQueue.pop(buffer);
179         }
180     }
181 
sendLastMessage(async_file_reader_node::gateway_type & gateway)182     void sendLastMessage(async_file_reader_node::gateway_type& gateway) {
183         BufferMsg lastMsg;
184         lastMsg.markLast(m_io.chunksRead());
185         gateway.try_put(lastMsg);
186     }
187 
188     IOOperations& m_io;
189 
190     tbb::concurrent_bounded_queue< BufferMsg > m_writeQueue;
191 
192     std::thread m_fileReaderThread;
193     std::thread m_fileWriterThread;
194 };
195 
fgCompressionAsyncNode(IOOperations & io,int blockSizeIn100KB)196 void fgCompressionAsyncNode(IOOperations& io, int blockSizeIn100KB) {
197     tbb::flow::graph g;
198 
199     AsyncNodeActivity asyncNodeActivity(io);
200 
201     async_file_reader_node file_reader(g, tbb::flow::unlimited, [&asyncNodeActivity](const tbb::flow::continue_msg& msg, async_file_reader_node::gateway_type& gateway) {
202         asyncNodeActivity.submitRead(gateway);
203     });
204 
205     tbb::flow::function_node< BufferMsg, BufferMsg > compressor(g, tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
206 
207     tbb::flow::sequencer_node< BufferMsg > ordering(g, [](const BufferMsg& bufferMsg)->size_t {
208         return bufferMsg.seqId;
209     });
210 
211     // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
212     async_file_writer_node output_writer(g, tbb::flow::serial, [&asyncNodeActivity](const BufferMsg& bufferMsg, async_file_writer_node::gateway_type& gateway) {
213         asyncNodeActivity.submitWrite(bufferMsg);
214     });
215 
216     make_edge(file_reader, compressor);
217     make_edge(compressor, ordering);
218     make_edge(ordering, output_writer);
219 
220     file_reader.try_put(tbb::flow::continue_msg());
221 
222     g.wait_for_all();
223 }
224 
225 //-----------------------------------------------------------------------------------------------------------------------
226 //------------------------------------------Compression example based on async_msg---------------------------------------
227 //-----------------------------------------------------------------------------------------------------------------------
228 
229 typedef tbb::flow::async_msg< BufferMsg > async_msg_type;
230 
231 class AsyncMsgActivity {
232 public:
233 
AsyncMsgActivity(tbb::flow::graph & g,IOOperations & io)234     AsyncMsgActivity(tbb::flow::graph& g, IOOperations& io)
235         : m_io(io), m_graph(g), m_fileReaderThread(&AsyncMsgActivity::readingLoop, this),
236           m_fileWriterThread(&AsyncMsgActivity::writingLoop, this)
237     {
238         // Graph synchronization starts here and ends
239         // when the last buffer was written in "writing thread"
240         m_graph.increment_wait_count();
241     }
242 
~AsyncMsgActivity()243     ~AsyncMsgActivity() {
244         m_fileReaderThread.join();
245         m_fileWriterThread.join();
246 
247         // Lets release resources that async
248         // activity and graph were acquired
249         freeBuffers();
250     }
251 
submitRead(BufferMsg & bufferMsg)252     async_msg_type submitRead(BufferMsg& bufferMsg) {
253         async_msg_type msg;
254         work_type readWork = { bufferMsg, msg };
255         m_readQueue.push(readWork);
256         return msg;
257     }
258 
submitWrite(const BufferMsg & bufferMsg)259     async_msg_type submitWrite(const BufferMsg& bufferMsg) {
260         async_msg_type msg;
261         work_type writeWork = { bufferMsg, msg };
262         m_writeQueue.push(writeWork);
263         return msg;
264     }
265 
266 private:
267 
268     struct work_type {
269         BufferMsg bufferMsg;
270         async_msg_type msg;
271     };
272 
readingLoop()273     void readingLoop() {
274         work_type readWork;
275         m_readQueue.pop(readWork);
276 
277         // Reading thread waits for buffers to be received
278         // (the graph reuses limited number of buffers)
279         // and reads the file while there is something to read
280         while (m_io.hasDataToRead()) {
281             readWork.bufferMsg.seqId = m_io.chunksRead();
282             m_io.readChunk(readWork.bufferMsg.inputBuffer);
283             readWork.msg.set(readWork.bufferMsg);
284             m_readQueue.pop(readWork);
285         }
286 
287         // Pass message with an end flag to the graph
288         sendLastMessage(readWork);
289     }
290 
sendLastMessage(work_type & work)291     void sendLastMessage(work_type& work) {
292         work.bufferMsg.markLast(m_io.chunksRead());
293         work.msg.set(work.bufferMsg);
294     }
295 
writingLoop()296     void writingLoop() {
297         work_type writeWork;
298         m_writeQueue.pop(writeWork);
299 
300         // Writing thread writes all buffers that it gets
301         // and reuses them. At the end all reusing buffers
302         // is stored in read queue
303         while (!writeWork.bufferMsg.isLast) {
304             m_io.writeChunk(writeWork.bufferMsg.outputBuffer);
305             writeWork.msg.set(writeWork.bufferMsg);
306             m_writeQueue.pop(writeWork);
307         }
308 
309         // Store last message to the reading queue to free resources later
310         writeWork.msg.set(writeWork.bufferMsg);
311 
312         // After all buffers have been written
313         // the synchronization ends
314         m_graph.decrement_wait_count();
315     }
316 
freeBuffers()317     void freeBuffers() {
318         int buffersNumber = m_readQueue.size();
319         for (int i = 0; i < buffersNumber; i++) {
320             work_type workToDelete;
321             m_readQueue.pop(workToDelete);
322             BufferMsg::destroyBufferMsg(workToDelete.bufferMsg);
323         }
324     }
325 
326     IOOperations& m_io;
327 
328     tbb::flow::graph& m_graph;
329 
330     tbb::concurrent_bounded_queue< work_type > m_writeQueue;
331     tbb::concurrent_bounded_queue< work_type > m_readQueue;
332 
333     std::thread m_fileReaderThread;
334     std::thread m_fileWriterThread;
335 };
336 
fgCompressionAsyncMsg(IOOperations & io,int blockSizeIn100KB,size_t memoryLimitIn1MB)337 void fgCompressionAsyncMsg(IOOperations& io, int blockSizeIn100KB, size_t memoryLimitIn1MB) {
338     // Memory limit sets the number of buffers that can be reused
339     int buffersNumber = memoryLimitIn1MB * 1000 * 1024 / io.chunkSize();
340 
341     tbb::flow::graph g;
342 
343     AsyncMsgActivity asyncMsgActivity(g, io);
344 
345     tbb::flow::function_node< BufferMsg, async_msg_type > file_reader(g, tbb::flow::unlimited, [&asyncMsgActivity](BufferMsg bufferMsg) -> async_msg_type {
346         return asyncMsgActivity.submitRead(bufferMsg);
347     });
348 
349     tbb::flow::function_node< BufferMsg, BufferMsg > compressor(g, tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
350 
351     tbb::flow::sequencer_node< BufferMsg > ordering(g, [](const BufferMsg& bufferMsg) -> size_t {
352         return bufferMsg.seqId;
353     });
354 
355     // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
356     tbb::flow::function_node< BufferMsg, async_msg_type > output_writer(g, tbb::flow::serial, [&asyncMsgActivity](const BufferMsg& bufferMsg) -> async_msg_type {
357         return asyncMsgActivity.submitWrite(bufferMsg);
358     });
359 
360     make_edge(file_reader, compressor);
361     make_edge(compressor, ordering);
362     make_edge(ordering, output_writer);
363     make_edge(output_writer, file_reader);
364 
365     // Creating buffers to be reused in read/compress/write graph loop
366     for (int i = 0; i < buffersNumber; i++) {
367         BufferMsg reuseBufferMsg = BufferMsg::createBufferMsg(0, io.chunkSize());
368         file_reader.try_put(reuseBufferMsg);
369     }
370 
371     g.wait_for_all();
372 }
373 
374 //-----------------------------------------------------------------------------------------------------------------------
375 //---------------------------------------------Simple compression example------------------------------------------------
376 //-----------------------------------------------------------------------------------------------------------------------
377 
fgCompression(IOOperations & io,int blockSizeIn100KB)378 void fgCompression(IOOperations& io, int blockSizeIn100KB) {
379     tbb::flow::graph g;
380 
381     tbb::flow::input_node< BufferMsg > file_reader(g, [&io](BufferMsg& bufferMsg)->bool {
382         if (io.hasDataToRead()) {
383             bufferMsg = BufferMsg::createBufferMsg(io.chunksRead(), io.chunkSize());
384             io.readChunk(bufferMsg.inputBuffer);
385             return true;
386         }
387         return false;
388     });
389 
390     tbb::flow::function_node< BufferMsg, BufferMsg > compressor(g, tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
391 
392     tbb::flow::sequencer_node< BufferMsg > ordering(g, [](const BufferMsg& buffer)->size_t {
393         return buffer.seqId;
394     });
395 
396     tbb::flow::function_node< BufferMsg > output_writer(g, tbb::flow::serial, [&io](const BufferMsg& bufferMsg) {
397         io.writeChunk(bufferMsg.outputBuffer);
398         BufferMsg::destroyBufferMsg(bufferMsg);
399     });
400 
401     make_edge(file_reader, compressor);
402     make_edge(compressor, ordering);
403     make_edge(ordering, output_writer);
404 
405     file_reader.activate();
406     g.wait_for_all();
407 }
408 
409 //-----------------------------------------------------------------------------------------------------------------------
410 
endsWith(const std::string & str,const std::string & suffix)411 bool endsWith(const std::string& str, const std::string& suffix) {
412     return str.find(suffix, str.length() - suffix.length()) != std::string::npos;
413 }
414 
415 //-----------------------------------------------------------------------------------------------------------------------
416 
main(int argc,char * argv[])417 int main(int argc, char* argv[]) {
418     try {
419         tbb::tick_count mainStartTime = tbb::tick_count::now();
420 
421         const std::string archiveExtension = ".bz2";
422         bool verbose = false;
423         std::string asyncType;
424         std::string inputFileName;
425         int blockSizeIn100KB = 1; // block size in 100KB chunks
426         size_t memoryLimitIn1MB = 1; // memory limit for compression in megabytes granularity
427 
428         utility::parse_cli_arguments(argc, argv,
429             utility::cli_argument_pack()
430             //"-h" option for displaying help is present implicitly
431             .arg(blockSizeIn100KB, "-b", "\t block size in 100KB chunks, [1 .. 9]")
432             .arg(verbose, "-v", "verbose mode")
433             .arg(memoryLimitIn1MB, "-l", "used memory limit for compression algorithm in 1MB (minimum) granularity")
434             .arg(asyncType, "-a", "name of the used graph async implementation - can be async_node or async_msg")
435             .positional_arg(inputFileName, "filename", "input file name")
436         );
437 
438         if (inputFileName.empty()) {
439             throw std::invalid_argument("Input file name is not specified. Try 'fgbzip2 -h' for more information.");
440         }
441 
442         if (blockSizeIn100KB < 1 || blockSizeIn100KB > 9) {
443             throw std::invalid_argument("Incorrect block size. Try 'fgbzip2 -h' for more information.");
444         }
445 
446         if (memoryLimitIn1MB < 1) {
447             throw std::invalid_argument("Incorrect memory limit size. Try 'fgbzip2 -h' for more information.");
448         }
449 
450         if (verbose) std::cout << "Input file name: " << inputFileName << std::endl;
451         if (endsWith(inputFileName, archiveExtension)) {
452             throw std::invalid_argument("Input file already have " + archiveExtension + " extension.");
453         }
454 
455         std::ifstream inputStream(inputFileName.c_str(), std::ios::in | std::ios::binary);
456         if (!inputStream.is_open()) {
457             throw std::invalid_argument("Cannot open " + inputFileName + " file.");
458         }
459 
460         std::string outputFileName(inputFileName + archiveExtension);
461 
462         std::ofstream outputStream(outputFileName.c_str(), std::ios::out | std::ios::binary | std::ios::trunc);
463         if (!outputStream.is_open()) {
464             throw std::invalid_argument("Cannot open " + outputFileName + " file.");
465         }
466 
467         // General interface to work with I/O buffers operations
468         size_t chunkSize = blockSizeIn100KB * 100 * 1024;
469         IOOperations io(inputStream, outputStream, chunkSize);
470 
471         if (asyncType.empty()) {
472             if (verbose) std::cout << "Running flow graph based compression algorithm." << std::endl;
473             fgCompression(io, blockSizeIn100KB);
474         } else if (asyncType == "async_node") {
475             if (verbose) std::cout << "Running flow graph based compression algorithm with async_node based asynchronous IO operations." << std::endl;
476             fgCompressionAsyncNode(io, blockSizeIn100KB);
477         } else if (asyncType == "async_msg") {
478             if (verbose) std::cout << "Running flow graph based compression algorithm with async_msg based asynchronous IO operations. Using limited memory: " << memoryLimitIn1MB << "MB." << std::endl;
479             fgCompressionAsyncMsg(io, blockSizeIn100KB, memoryLimitIn1MB);
480         }
481 
482         inputStream.close();
483         outputStream.close();
484 
485         utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds());
486 
487         return 0;
488     } catch (std::exception& e) {
489         std::cerr << "Error occurred. Error text is : \"" << e.what() << "\"\n";
490         return -1;
491     }
492 }
493 #else
main()494 int main() {
495     utility::report_skipped();
496     return 0;
497 }
498 #endif /* __TBB_PREVIEW_ASYNC_NODE && __TBB_CPP11_LAMBDAS_PRESENT */
499