1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1
18 #include "tbb/tbb_config.h"
19 #include "../../common/utility/utility.h"
20
21 #if __TBB_PREVIEW_ASYNC_MSG && __TBB_CPP11_LAMBDAS_PRESENT
22
23 #include <iostream>
24 #include <fstream>
25 #include <string>
26 #include <memory>
27 #include <queue>
28 #include <thread>
29
30 #include "bzlib.h"
31
32 #include "tbb/flow_graph.h"
33 #include "tbb/tick_count.h"
34 #include "tbb/concurrent_queue.h"
35
36 // TODO: change memory allocation/deallocation to be managed in constructor/destructor
37 struct Buffer {
38 size_t len;
39 char* b;
40 };
41
42 struct BufferMsg {
43
BufferMsgBufferMsg44 BufferMsg() {}
BufferMsgBufferMsg45 BufferMsg(Buffer& inputBuffer, Buffer& outputBuffer, size_t seqId, bool isLast = false)
46 : inputBuffer(inputBuffer), outputBuffer(outputBuffer), seqId(seqId), isLast(isLast) {}
47
createBufferMsgBufferMsg48 static BufferMsg createBufferMsg(size_t seqId, size_t chunkSize) {
49 Buffer inputBuffer;
50 inputBuffer.b = new char[chunkSize];
51 inputBuffer.len = chunkSize;
52
53 Buffer outputBuffer;
54 size_t compressedChunkSize = chunkSize * 1.01 + 600; // compression overhead
55 outputBuffer.b = new char[compressedChunkSize];
56 outputBuffer.len = compressedChunkSize;
57
58 return BufferMsg(inputBuffer, outputBuffer, seqId);
59 }
60
destroyBufferMsgBufferMsg61 static void destroyBufferMsg(const BufferMsg& destroyMsg) {
62 delete[] destroyMsg.inputBuffer.b;
63 delete[] destroyMsg.outputBuffer.b;
64 }
65
markLastBufferMsg66 void markLast(size_t lastId) {
67 isLast = true;
68 seqId = lastId;
69 }
70
71 size_t seqId;
72 Buffer inputBuffer;
73 Buffer outputBuffer;
74 bool isLast;
75 };
76
77 class BufferCompressor {
78 public:
79
BufferCompressor(int blockSizeIn100KB)80 BufferCompressor(int blockSizeIn100KB) : m_blockSize(blockSizeIn100KB) {}
81
operator ()(BufferMsg buffer) const82 BufferMsg operator()(BufferMsg buffer) const {
83 if (!buffer.isLast) {
84 unsigned int outSize = buffer.outputBuffer.len;
85 BZ2_bzBuffToBuffCompress(buffer.outputBuffer.b, &outSize,
86 buffer.inputBuffer.b, buffer.inputBuffer.len,
87 m_blockSize, 0, 30);
88 buffer.outputBuffer.len = outSize;
89 }
90 return buffer;
91 }
92
93 private:
94 int m_blockSize;
95 };
96
97 class IOOperations {
98 public:
99
IOOperations(std::ifstream & inputStream,std::ofstream & outputStream,size_t chunkSize)100 IOOperations(std::ifstream& inputStream, std::ofstream& outputStream, size_t chunkSize)
101 : m_inputStream(inputStream), m_outputStream(outputStream), m_chunkSize(chunkSize), m_chunksRead(0) {}
102
readChunk(Buffer & buffer)103 void readChunk(Buffer& buffer) {
104 m_inputStream.read(buffer.b, m_chunkSize);
105 buffer.len = static_cast<size_t>(m_inputStream.gcount());
106 m_chunksRead++;
107 }
108
writeChunk(const Buffer & buffer)109 void writeChunk(const Buffer& buffer) {
110 m_outputStream.write(buffer.b, buffer.len);
111 }
112
chunksRead() const113 size_t chunksRead() const {
114 return m_chunksRead;
115 }
116
chunkSize() const117 size_t chunkSize() const {
118 return m_chunkSize;
119 }
120
hasDataToRead() const121 bool hasDataToRead() const {
122 return m_inputStream.is_open() && !m_inputStream.eof();
123 }
124
125 private:
126
127 std::ifstream& m_inputStream;
128 std::ofstream& m_outputStream;
129
130 size_t m_chunkSize;
131 size_t m_chunksRead;
132 };
133
134 //-----------------------------------------------------------------------------------------------------------------------
135 //---------------------------------------Compression example based on async_node-----------------------------------------
136 //-----------------------------------------------------------------------------------------------------------------------
137
138 typedef tbb::flow::async_node< tbb::flow::continue_msg, BufferMsg > async_file_reader_node;
139 typedef tbb::flow::async_node< BufferMsg, tbb::flow::continue_msg > async_file_writer_node;
140
141 class AsyncNodeActivity {
142 public:
143
AsyncNodeActivity(IOOperations & io)144 AsyncNodeActivity(IOOperations& io)
145 : m_io(io), m_fileWriterThread(&AsyncNodeActivity::writingLoop, this) {}
146
~AsyncNodeActivity()147 ~AsyncNodeActivity() {
148 m_fileReaderThread.join();
149 m_fileWriterThread.join();
150 }
151
submitRead(async_file_reader_node::gateway_type & gateway)152 void submitRead(async_file_reader_node::gateway_type& gateway) {
153 gateway.reserve_wait();
154 std::thread(&AsyncNodeActivity::readingLoop, this, std::ref(gateway)).swap(m_fileReaderThread);
155 }
156
submitWrite(const BufferMsg & bufferMsg)157 void submitWrite(const BufferMsg& bufferMsg) {
158 m_writeQueue.push(bufferMsg);
159 }
160
161 private:
162
readingLoop(async_file_reader_node::gateway_type & gateway)163 void readingLoop(async_file_reader_node::gateway_type& gateway) {
164 while (m_io.hasDataToRead()) {
165 BufferMsg bufferMsg = BufferMsg::createBufferMsg(m_io.chunksRead(), m_io.chunkSize());
166 m_io.readChunk(bufferMsg.inputBuffer);
167 gateway.try_put(bufferMsg);
168 }
169 sendLastMessage(gateway);
170 gateway.release_wait();
171 }
172
writingLoop()173 void writingLoop() {
174 BufferMsg buffer;
175 m_writeQueue.pop(buffer);
176 while (!buffer.isLast) {
177 m_io.writeChunk(buffer.outputBuffer);
178 m_writeQueue.pop(buffer);
179 }
180 }
181
sendLastMessage(async_file_reader_node::gateway_type & gateway)182 void sendLastMessage(async_file_reader_node::gateway_type& gateway) {
183 BufferMsg lastMsg;
184 lastMsg.markLast(m_io.chunksRead());
185 gateway.try_put(lastMsg);
186 }
187
188 IOOperations& m_io;
189
190 tbb::concurrent_bounded_queue< BufferMsg > m_writeQueue;
191
192 std::thread m_fileReaderThread;
193 std::thread m_fileWriterThread;
194 };
195
fgCompressionAsyncNode(IOOperations & io,int blockSizeIn100KB)196 void fgCompressionAsyncNode(IOOperations& io, int blockSizeIn100KB) {
197 tbb::flow::graph g;
198
199 AsyncNodeActivity asyncNodeActivity(io);
200
201 async_file_reader_node file_reader(g, tbb::flow::unlimited, [&asyncNodeActivity](const tbb::flow::continue_msg& msg, async_file_reader_node::gateway_type& gateway) {
202 asyncNodeActivity.submitRead(gateway);
203 });
204
205 tbb::flow::function_node< BufferMsg, BufferMsg > compressor(g, tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
206
207 tbb::flow::sequencer_node< BufferMsg > ordering(g, [](const BufferMsg& bufferMsg)->size_t {
208 return bufferMsg.seqId;
209 });
210
211 // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
212 async_file_writer_node output_writer(g, tbb::flow::serial, [&asyncNodeActivity](const BufferMsg& bufferMsg, async_file_writer_node::gateway_type& gateway) {
213 asyncNodeActivity.submitWrite(bufferMsg);
214 });
215
216 make_edge(file_reader, compressor);
217 make_edge(compressor, ordering);
218 make_edge(ordering, output_writer);
219
220 file_reader.try_put(tbb::flow::continue_msg());
221
222 g.wait_for_all();
223 }
224
225 //-----------------------------------------------------------------------------------------------------------------------
226 //------------------------------------------Compression example based on async_msg---------------------------------------
227 //-----------------------------------------------------------------------------------------------------------------------
228
229 typedef tbb::flow::async_msg< BufferMsg > async_msg_type;
230
231 class AsyncMsgActivity {
232 public:
233
AsyncMsgActivity(tbb::flow::graph & g,IOOperations & io)234 AsyncMsgActivity(tbb::flow::graph& g, IOOperations& io)
235 : m_io(io), m_graph(g), m_fileReaderThread(&AsyncMsgActivity::readingLoop, this),
236 m_fileWriterThread(&AsyncMsgActivity::writingLoop, this)
237 {
238 // Graph synchronization starts here and ends
239 // when the last buffer was written in "writing thread"
240 m_graph.increment_wait_count();
241 }
242
~AsyncMsgActivity()243 ~AsyncMsgActivity() {
244 m_fileReaderThread.join();
245 m_fileWriterThread.join();
246
247 // Lets release resources that async
248 // activity and graph were acquired
249 freeBuffers();
250 }
251
submitRead(BufferMsg & bufferMsg)252 async_msg_type submitRead(BufferMsg& bufferMsg) {
253 async_msg_type msg;
254 work_type readWork = { bufferMsg, msg };
255 m_readQueue.push(readWork);
256 return msg;
257 }
258
submitWrite(const BufferMsg & bufferMsg)259 async_msg_type submitWrite(const BufferMsg& bufferMsg) {
260 async_msg_type msg;
261 work_type writeWork = { bufferMsg, msg };
262 m_writeQueue.push(writeWork);
263 return msg;
264 }
265
266 private:
267
268 struct work_type {
269 BufferMsg bufferMsg;
270 async_msg_type msg;
271 };
272
readingLoop()273 void readingLoop() {
274 work_type readWork;
275 m_readQueue.pop(readWork);
276
277 // Reading thread waits for buffers to be received
278 // (the graph reuses limited number of buffers)
279 // and reads the file while there is something to read
280 while (m_io.hasDataToRead()) {
281 readWork.bufferMsg.seqId = m_io.chunksRead();
282 m_io.readChunk(readWork.bufferMsg.inputBuffer);
283 readWork.msg.set(readWork.bufferMsg);
284 m_readQueue.pop(readWork);
285 }
286
287 // Pass message with an end flag to the graph
288 sendLastMessage(readWork);
289 }
290
sendLastMessage(work_type & work)291 void sendLastMessage(work_type& work) {
292 work.bufferMsg.markLast(m_io.chunksRead());
293 work.msg.set(work.bufferMsg);
294 }
295
writingLoop()296 void writingLoop() {
297 work_type writeWork;
298 m_writeQueue.pop(writeWork);
299
300 // Writing thread writes all buffers that it gets
301 // and reuses them. At the end all reusing buffers
302 // is stored in read queue
303 while (!writeWork.bufferMsg.isLast) {
304 m_io.writeChunk(writeWork.bufferMsg.outputBuffer);
305 writeWork.msg.set(writeWork.bufferMsg);
306 m_writeQueue.pop(writeWork);
307 }
308
309 // Store last message to the reading queue to free resources later
310 writeWork.msg.set(writeWork.bufferMsg);
311
312 // After all buffers have been written
313 // the synchronization ends
314 m_graph.decrement_wait_count();
315 }
316
freeBuffers()317 void freeBuffers() {
318 int buffersNumber = m_readQueue.size();
319 for (int i = 0; i < buffersNumber; i++) {
320 work_type workToDelete;
321 m_readQueue.pop(workToDelete);
322 BufferMsg::destroyBufferMsg(workToDelete.bufferMsg);
323 }
324 }
325
326 IOOperations& m_io;
327
328 tbb::flow::graph& m_graph;
329
330 tbb::concurrent_bounded_queue< work_type > m_writeQueue;
331 tbb::concurrent_bounded_queue< work_type > m_readQueue;
332
333 std::thread m_fileReaderThread;
334 std::thread m_fileWriterThread;
335 };
336
fgCompressionAsyncMsg(IOOperations & io,int blockSizeIn100KB,size_t memoryLimitIn1MB)337 void fgCompressionAsyncMsg(IOOperations& io, int blockSizeIn100KB, size_t memoryLimitIn1MB) {
338 // Memory limit sets the number of buffers that can be reused
339 int buffersNumber = memoryLimitIn1MB * 1000 * 1024 / io.chunkSize();
340
341 tbb::flow::graph g;
342
343 AsyncMsgActivity asyncMsgActivity(g, io);
344
345 tbb::flow::function_node< BufferMsg, async_msg_type > file_reader(g, tbb::flow::unlimited, [&asyncMsgActivity](BufferMsg bufferMsg) -> async_msg_type {
346 return asyncMsgActivity.submitRead(bufferMsg);
347 });
348
349 tbb::flow::function_node< BufferMsg, BufferMsg > compressor(g, tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
350
351 tbb::flow::sequencer_node< BufferMsg > ordering(g, [](const BufferMsg& bufferMsg) -> size_t {
352 return bufferMsg.seqId;
353 });
354
355 // The node is serial to preserve the right order of buffers set by the preceding sequencer_node
356 tbb::flow::function_node< BufferMsg, async_msg_type > output_writer(g, tbb::flow::serial, [&asyncMsgActivity](const BufferMsg& bufferMsg) -> async_msg_type {
357 return asyncMsgActivity.submitWrite(bufferMsg);
358 });
359
360 make_edge(file_reader, compressor);
361 make_edge(compressor, ordering);
362 make_edge(ordering, output_writer);
363 make_edge(output_writer, file_reader);
364
365 // Creating buffers to be reused in read/compress/write graph loop
366 for (int i = 0; i < buffersNumber; i++) {
367 BufferMsg reuseBufferMsg = BufferMsg::createBufferMsg(0, io.chunkSize());
368 file_reader.try_put(reuseBufferMsg);
369 }
370
371 g.wait_for_all();
372 }
373
374 //-----------------------------------------------------------------------------------------------------------------------
375 //---------------------------------------------Simple compression example------------------------------------------------
376 //-----------------------------------------------------------------------------------------------------------------------
377
fgCompression(IOOperations & io,int blockSizeIn100KB)378 void fgCompression(IOOperations& io, int blockSizeIn100KB) {
379 tbb::flow::graph g;
380
381 tbb::flow::input_node< BufferMsg > file_reader(g, [&io](BufferMsg& bufferMsg)->bool {
382 if (io.hasDataToRead()) {
383 bufferMsg = BufferMsg::createBufferMsg(io.chunksRead(), io.chunkSize());
384 io.readChunk(bufferMsg.inputBuffer);
385 return true;
386 }
387 return false;
388 });
389
390 tbb::flow::function_node< BufferMsg, BufferMsg > compressor(g, tbb::flow::unlimited, BufferCompressor(blockSizeIn100KB));
391
392 tbb::flow::sequencer_node< BufferMsg > ordering(g, [](const BufferMsg& buffer)->size_t {
393 return buffer.seqId;
394 });
395
396 tbb::flow::function_node< BufferMsg > output_writer(g, tbb::flow::serial, [&io](const BufferMsg& bufferMsg) {
397 io.writeChunk(bufferMsg.outputBuffer);
398 BufferMsg::destroyBufferMsg(bufferMsg);
399 });
400
401 make_edge(file_reader, compressor);
402 make_edge(compressor, ordering);
403 make_edge(ordering, output_writer);
404
405 file_reader.activate();
406 g.wait_for_all();
407 }
408
409 //-----------------------------------------------------------------------------------------------------------------------
410
endsWith(const std::string & str,const std::string & suffix)411 bool endsWith(const std::string& str, const std::string& suffix) {
412 return str.find(suffix, str.length() - suffix.length()) != std::string::npos;
413 }
414
415 //-----------------------------------------------------------------------------------------------------------------------
416
main(int argc,char * argv[])417 int main(int argc, char* argv[]) {
418 try {
419 tbb::tick_count mainStartTime = tbb::tick_count::now();
420
421 const std::string archiveExtension = ".bz2";
422 bool verbose = false;
423 std::string asyncType;
424 std::string inputFileName;
425 int blockSizeIn100KB = 1; // block size in 100KB chunks
426 size_t memoryLimitIn1MB = 1; // memory limit for compression in megabytes granularity
427
428 utility::parse_cli_arguments(argc, argv,
429 utility::cli_argument_pack()
430 //"-h" option for displaying help is present implicitly
431 .arg(blockSizeIn100KB, "-b", "\t block size in 100KB chunks, [1 .. 9]")
432 .arg(verbose, "-v", "verbose mode")
433 .arg(memoryLimitIn1MB, "-l", "used memory limit for compression algorithm in 1MB (minimum) granularity")
434 .arg(asyncType, "-a", "name of the used graph async implementation - can be async_node or async_msg")
435 .positional_arg(inputFileName, "filename", "input file name")
436 );
437
438 if (inputFileName.empty()) {
439 throw std::invalid_argument("Input file name is not specified. Try 'fgbzip2 -h' for more information.");
440 }
441
442 if (blockSizeIn100KB < 1 || blockSizeIn100KB > 9) {
443 throw std::invalid_argument("Incorrect block size. Try 'fgbzip2 -h' for more information.");
444 }
445
446 if (memoryLimitIn1MB < 1) {
447 throw std::invalid_argument("Incorrect memory limit size. Try 'fgbzip2 -h' for more information.");
448 }
449
450 if (verbose) std::cout << "Input file name: " << inputFileName << std::endl;
451 if (endsWith(inputFileName, archiveExtension)) {
452 throw std::invalid_argument("Input file already have " + archiveExtension + " extension.");
453 }
454
455 std::ifstream inputStream(inputFileName.c_str(), std::ios::in | std::ios::binary);
456 if (!inputStream.is_open()) {
457 throw std::invalid_argument("Cannot open " + inputFileName + " file.");
458 }
459
460 std::string outputFileName(inputFileName + archiveExtension);
461
462 std::ofstream outputStream(outputFileName.c_str(), std::ios::out | std::ios::binary | std::ios::trunc);
463 if (!outputStream.is_open()) {
464 throw std::invalid_argument("Cannot open " + outputFileName + " file.");
465 }
466
467 // General interface to work with I/O buffers operations
468 size_t chunkSize = blockSizeIn100KB * 100 * 1024;
469 IOOperations io(inputStream, outputStream, chunkSize);
470
471 if (asyncType.empty()) {
472 if (verbose) std::cout << "Running flow graph based compression algorithm." << std::endl;
473 fgCompression(io, blockSizeIn100KB);
474 } else if (asyncType == "async_node") {
475 if (verbose) std::cout << "Running flow graph based compression algorithm with async_node based asynchronous IO operations." << std::endl;
476 fgCompressionAsyncNode(io, blockSizeIn100KB);
477 } else if (asyncType == "async_msg") {
478 if (verbose) std::cout << "Running flow graph based compression algorithm with async_msg based asynchronous IO operations. Using limited memory: " << memoryLimitIn1MB << "MB." << std::endl;
479 fgCompressionAsyncMsg(io, blockSizeIn100KB, memoryLimitIn1MB);
480 }
481
482 inputStream.close();
483 outputStream.close();
484
485 utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds());
486
487 return 0;
488 } catch (std::exception& e) {
489 std::cerr << "Error occurred. Error text is : \"" << e.what() << "\"\n";
490 return -1;
491 }
492 }
493 #else
main()494 int main() {
495 utility::report_skipped();
496 return 0;
497 }
498 #endif /* __TBB_PREVIEW_ASYNC_NODE && __TBB_CPP11_LAMBDAS_PRESENT */
499