1 /*
2  * Copyright (c) 2016-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9 #include "platform.h"   /* Large Files support, SET_BINARY_MODE */
10 #include "Pzstd.h"
11 #include "SkippableFrame.h"
12 #include "utils/FileSystem.h"
13 #include "utils/Range.h"
14 #include "utils/ScopeGuard.h"
15 #include "utils/ThreadPool.h"
16 #include "utils/WorkQueue.h"
17 
18 #include <chrono>
19 #include <cinttypes>
20 #include <cstddef>
21 #include <cstdio>
22 #include <memory>
23 #include <string>
24 
25 
26 namespace pzstd {
27 
28 namespace {
29 #ifdef _WIN32
30 const std::string nullOutput = "nul";
31 #else
32 const std::string nullOutput = "/dev/null";
33 #endif
34 }
35 
36 using std::size_t;
37 
fileSizeOrZero(const std::string & file)38 static std::uintmax_t fileSizeOrZero(const std::string &file) {
39   if (file == "-") {
40     return 0;
41   }
42   std::error_code ec;
43   auto size = file_size(file, ec);
44   if (ec) {
45     size = 0;
46   }
47   return size;
48 }
49 
handleOneInput(const Options & options,const std::string & inputFile,FILE * inputFd,const std::string & outputFile,FILE * outputFd,SharedState & state)50 static std::uint64_t handleOneInput(const Options &options,
51                              const std::string &inputFile,
52                              FILE* inputFd,
53                              const std::string &outputFile,
54                              FILE* outputFd,
55                              SharedState& state) {
56   auto inputSize = fileSizeOrZero(inputFile);
57   // WorkQueue outlives ThreadPool so in the case of error we are certain
58   // we don't accidentally try to call push() on it after it is destroyed
59   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
60   std::uint64_t bytesRead;
61   std::uint64_t bytesWritten;
62   {
63     // Initialize the (de)compression thread pool with numThreads
64     ThreadPool executor(options.numThreads);
65     // Run the reader thread on an extra thread
66     ThreadPool readExecutor(1);
67     if (!options.decompress) {
68       // Add a job that reads the input and starts all the compression jobs
69       readExecutor.add(
70           [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
71             bytesRead = asyncCompressChunks(
72                 state,
73                 outs,
74                 executor,
75                 inputFd,
76                 inputSize,
77                 options.numThreads,
78                 options.determineParameters());
79           });
80       // Start writing
81       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
82     } else {
83       // Add a job that reads the input and starts all the decompression jobs
84       readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
85         bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
86       });
87       // Start writing
88       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
89     }
90   }
91   if (!state.errorHolder.hasError()) {
92     std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
93     std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
94     if (!options.decompress) {
95       double ratio = static_cast<double>(bytesWritten) /
96                      static_cast<double>(bytesRead + !bytesRead);
97       state.log(INFO, "%-20s :%6.2f%%   (%6" PRIu64 " => %6" PRIu64
98                    " bytes, %s)\n",
99                    inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
100                    outputFileName.c_str());
101     } else {
102       state.log(INFO, "%-20s: %" PRIu64 " bytes \n",
103                    inputFileName.c_str(),bytesWritten);
104     }
105   }
106   return bytesWritten;
107 }
108 
openInputFile(const std::string & inputFile,ErrorHolder & errorHolder)109 static FILE *openInputFile(const std::string &inputFile,
110                            ErrorHolder &errorHolder) {
111   if (inputFile == "-") {
112     SET_BINARY_MODE(stdin);
113     return stdin;
114   }
115   // Check if input file is a directory
116   {
117     std::error_code ec;
118     if (is_directory(inputFile, ec)) {
119       errorHolder.setError("Output file is a directory -- ignored");
120       return nullptr;
121     }
122   }
123   auto inputFd = std::fopen(inputFile.c_str(), "rb");
124   if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
125     return nullptr;
126   }
127   return inputFd;
128 }
129 
openOutputFile(const Options & options,const std::string & outputFile,SharedState & state)130 static FILE *openOutputFile(const Options &options,
131                             const std::string &outputFile,
132                             SharedState& state) {
133   if (outputFile == "-") {
134     SET_BINARY_MODE(stdout);
135     return stdout;
136   }
137   // Check if the output file exists and then open it
138   if (!options.overwrite && outputFile != nullOutput) {
139     auto outputFd = std::fopen(outputFile.c_str(), "rb");
140     if (outputFd != nullptr) {
141       std::fclose(outputFd);
142       if (!state.log.logsAt(INFO)) {
143         state.errorHolder.setError("Output file exists");
144         return nullptr;
145       }
146       state.log(
147           INFO,
148           "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
149           outputFile.c_str());
150       int c = getchar();
151       if (c != 'y' && c != 'Y') {
152         state.errorHolder.setError("Not overwritten");
153         return nullptr;
154       }
155     }
156   }
157   auto outputFd = std::fopen(outputFile.c_str(), "wb");
158   if (!state.errorHolder.check(
159           outputFd != nullptr, "Failed to open output file")) {
160     return nullptr;
161   }
162   return outputFd;
163 }
164 
pzstdMain(const Options & options)165 int pzstdMain(const Options &options) {
166   int returnCode = 0;
167   SharedState state(options);
168   for (const auto& input : options.inputFiles) {
169     // Setup the shared state
170     auto printErrorGuard = makeScopeGuard([&] {
171       if (state.errorHolder.hasError()) {
172         returnCode = 1;
173         state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(),
174                   state.errorHolder.getError().c_str());
175       }
176     });
177     // Open the input file
178     auto inputFd = openInputFile(input, state.errorHolder);
179     if (inputFd == nullptr) {
180       continue;
181     }
182     auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
183     // Open the output file
184     auto outputFile = options.getOutputFile(input);
185     if (!state.errorHolder.check(outputFile != "",
186                            "Input file does not have extension .zst")) {
187       continue;
188     }
189     auto outputFd = openOutputFile(options, outputFile, state);
190     if (outputFd == nullptr) {
191       continue;
192     }
193     auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
194     // (de)compress the file
195     handleOneInput(options, input, inputFd, outputFile, outputFd, state);
196     if (state.errorHolder.hasError()) {
197       continue;
198     }
199     // Delete the input file if necessary
200     if (!options.keepSource) {
201       // Be sure that we are done and have written everything before we delete
202       if (!state.errorHolder.check(std::fclose(inputFd) == 0,
203                              "Failed to close input file")) {
204         continue;
205       }
206       closeInputGuard.dismiss();
207       if (!state.errorHolder.check(std::fclose(outputFd) == 0,
208                              "Failed to close output file")) {
209         continue;
210       }
211       closeOutputGuard.dismiss();
212       if (std::remove(input.c_str()) != 0) {
213         state.errorHolder.setError("Failed to remove input file");
214         continue;
215       }
216     }
217   }
218   // Returns 1 if any of the files failed to (de)compress.
219   return returnCode;
220 }
221 
222 /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
makeZstdInBuffer(const Buffer & buffer)223 static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
224   return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
225 }
226 
227 /**
228  * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
229  * `inBuffer.pos`.
230  */
advance(Buffer & buffer,ZSTD_inBuffer & inBuffer)231 void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
232   auto pos = inBuffer.pos;
233   inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
234   inBuffer.size -= pos;
235   inBuffer.pos = 0;
236   return buffer.advance(pos);
237 }
238 
239 /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
makeZstdOutBuffer(Buffer & buffer)240 static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
241   return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
242 }
243 
244 /**
245  * Split `buffer` and advance `outBuffer` by the amount of data written, as
246  * indicated by `outBuffer.pos`.
247  */
split(Buffer & buffer,ZSTD_outBuffer & outBuffer)248 Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
249   auto pos = outBuffer.pos;
250   outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
251   outBuffer.size -= pos;
252   outBuffer.pos = 0;
253   return buffer.splitAt(pos);
254 }
255 
256 /**
257  * Stream chunks of input from `in`, compress it, and stream it out to `out`.
258  *
259  * @param state        The shared state
260  * @param in           Queue that we `pop()` input buffers from
261  * @param out          Queue that we `push()` compressed output buffers to
262  * @param maxInputSize An upper bound on the size of the input
263  */
compress(SharedState & state,std::shared_ptr<BufferWorkQueue> in,std::shared_ptr<BufferWorkQueue> out,size_t maxInputSize)264 static void compress(
265     SharedState& state,
266     std::shared_ptr<BufferWorkQueue> in,
267     std::shared_ptr<BufferWorkQueue> out,
268     size_t maxInputSize) {
269   auto& errorHolder = state.errorHolder;
270   auto guard = makeScopeGuard([&] { out->finish(); });
271   // Initialize the CCtx
272   auto ctx = state.cStreamPool->get();
273   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
274     return;
275   }
276   {
277     auto err = ZSTD_resetCStream(ctx.get(), 0);
278     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
279       return;
280     }
281   }
282 
283   // Allocate space for the result
284   auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
285   auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
286   {
287     Buffer inBuffer;
288     // Read a buffer in from the input queue
289     while (in->pop(inBuffer) && !errorHolder.hasError()) {
290       auto zstdInBuffer = makeZstdInBuffer(inBuffer);
291       // Compress the whole buffer and send it to the output queue
292       while (!inBuffer.empty() && !errorHolder.hasError()) {
293         if (!errorHolder.check(
294                 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
295           return;
296         }
297         // Compress
298         auto err =
299             ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
300         if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
301           return;
302         }
303         // Split the compressed data off outBuffer and pass to the output queue
304         out->push(split(outBuffer, zstdOutBuffer));
305         // Forget about the data we already compressed
306         advance(inBuffer, zstdInBuffer);
307       }
308     }
309   }
310   // Write the epilog
311   size_t bytesLeft;
312   do {
313     if (!errorHolder.check(
314             !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
315       return;
316     }
317     bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
318     if (!errorHolder.check(
319             !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
320       return;
321     }
322     out->push(split(outBuffer, zstdOutBuffer));
323   } while (bytesLeft != 0 && !errorHolder.hasError());
324 }
325 
326 /**
327  * Calculates how large each independently compressed frame should be.
328  *
329  * @param size       The size of the source if known, 0 otherwise
330  * @param numThreads The number of threads available to run compression jobs on
331  * @param params     The zstd parameters to be used for compression
332  */
calculateStep(std::uintmax_t size,size_t numThreads,const ZSTD_parameters & params)333 static size_t calculateStep(
334     std::uintmax_t size,
335     size_t numThreads,
336     const ZSTD_parameters &params) {
337   (void)size;
338   (void)numThreads;
339   return size_t{1} << (params.cParams.windowLog + 2);
340 }
341 
342 namespace {
343 enum class FileStatus { Continue, Done, Error };
344 /// Determines the status of the file descriptor `fd`.
fileStatus(FILE * fd)345 FileStatus fileStatus(FILE* fd) {
346   if (std::feof(fd)) {
347     return FileStatus::Done;
348   } else if (std::ferror(fd)) {
349     return FileStatus::Error;
350   }
351   return FileStatus::Continue;
352 }
353 } // anonymous namespace
354 
355 /**
356  * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
357  * Will read less if an error or EOF occurs.
358  * Returns the status of the file after all of the reads have occurred.
359  */
360 static FileStatus
readData(BufferWorkQueue & queue,size_t chunkSize,size_t size,FILE * fd,std::uint64_t * totalBytesRead)361 readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
362          std::uint64_t *totalBytesRead) {
363   Buffer buffer(size);
364   while (!buffer.empty()) {
365     auto bytesRead =
366         std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
367     *totalBytesRead += bytesRead;
368     queue.push(buffer.splitAt(bytesRead));
369     auto status = fileStatus(fd);
370     if (status != FileStatus::Continue) {
371       return status;
372     }
373   }
374   return FileStatus::Continue;
375 }
376 
asyncCompressChunks(SharedState & state,WorkQueue<std::shared_ptr<BufferWorkQueue>> & chunks,ThreadPool & executor,FILE * fd,std::uintmax_t size,size_t numThreads,ZSTD_parameters params)377 std::uint64_t asyncCompressChunks(
378     SharedState& state,
379     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
380     ThreadPool& executor,
381     FILE* fd,
382     std::uintmax_t size,
383     size_t numThreads,
384     ZSTD_parameters params) {
385   auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
386   std::uint64_t bytesRead = 0;
387 
388   // Break the input up into chunks of size `step` and compress each chunk
389   // independently.
390   size_t step = calculateStep(size, numThreads, params);
391   state.log(DEBUG, "Chosen frame size: %zu\n", step);
392   auto status = FileStatus::Continue;
393   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
394     // Make a new input queue that we will put the chunk's input data into.
395     auto in = std::make_shared<BufferWorkQueue>();
396     auto inGuard = makeScopeGuard([&] { in->finish(); });
397     // Make a new output queue that compress will put the compressed data into.
398     auto out = std::make_shared<BufferWorkQueue>();
399     // Start compression in the thread pool
400     executor.add([&state, in, out, step] {
401       return compress(
402           state, std::move(in), std::move(out), step);
403     });
404     // Pass the output queue to the writer thread.
405     chunks.push(std::move(out));
406     state.log(VERBOSE, "%s\n", "Starting a new frame");
407     // Fill the input queue for the compression job we just started
408     status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
409   }
410   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
411   return bytesRead;
412 }
413 
414 /**
415  * Decompress a frame, whose data is streamed into `in`, and stream the output
416  * to `out`.
417  *
418  * @param state        The shared state
419  * @param in           Queue that we `pop()` input buffers from. It contains
420  *                      exactly one compressed frame.
421  * @param out          Queue that we `push()` decompressed output buffers to
422  */
decompress(SharedState & state,std::shared_ptr<BufferWorkQueue> in,std::shared_ptr<BufferWorkQueue> out)423 static void decompress(
424     SharedState& state,
425     std::shared_ptr<BufferWorkQueue> in,
426     std::shared_ptr<BufferWorkQueue> out) {
427   auto& errorHolder = state.errorHolder;
428   auto guard = makeScopeGuard([&] { out->finish(); });
429   // Initialize the DCtx
430   auto ctx = state.dStreamPool->get();
431   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
432     return;
433   }
434   {
435     auto err = ZSTD_resetDStream(ctx.get());
436     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
437       return;
438     }
439   }
440 
441   const size_t outSize = ZSTD_DStreamOutSize();
442   Buffer inBuffer;
443   size_t returnCode = 0;
444   // Read a buffer in from the input queue
445   while (in->pop(inBuffer) && !errorHolder.hasError()) {
446     auto zstdInBuffer = makeZstdInBuffer(inBuffer);
447     // Decompress the whole buffer and send it to the output queue
448     while (!inBuffer.empty() && !errorHolder.hasError()) {
449       // Allocate a buffer with at least outSize bytes.
450       Buffer outBuffer(outSize);
451       auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
452       // Decompress
453       returnCode =
454           ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
455       if (!errorHolder.check(
456               !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
457         return;
458       }
459       // Pass the buffer with the decompressed data to the output queue
460       out->push(split(outBuffer, zstdOutBuffer));
461       // Advance past the input we already read
462       advance(inBuffer, zstdInBuffer);
463       if (returnCode == 0) {
464         // The frame is over, prepare to (maybe) start a new frame
465         ZSTD_initDStream(ctx.get());
466       }
467     }
468   }
469   if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
470     return;
471   }
472   // We've given ZSTD_decompressStream all of our data, but there may still
473   // be data to read.
474   while (returnCode == 1) {
475     // Allocate a buffer with at least outSize bytes.
476     Buffer outBuffer(outSize);
477     auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
478     // Pass in no input.
479     ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
480     // Decompress
481     returnCode =
482         ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
483     if (!errorHolder.check(
484             !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
485       return;
486     }
487     // Pass the buffer with the decompressed data to the output queue
488     out->push(split(outBuffer, zstdOutBuffer));
489   }
490 }
491 
asyncDecompressFrames(SharedState & state,WorkQueue<std::shared_ptr<BufferWorkQueue>> & frames,ThreadPool & executor,FILE * fd)492 std::uint64_t asyncDecompressFrames(
493     SharedState& state,
494     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
495     ThreadPool& executor,
496     FILE* fd) {
497   auto framesGuard = makeScopeGuard([&] { frames.finish(); });
498   std::uint64_t totalBytesRead = 0;
499 
500   // Split the source up into its component frames.
501   // If we find our recognized skippable frame we know the next frames size
502   // which means that we can decompress each standard frame in independently.
503   // Otherwise, we will decompress using only one decompression task.
504   const size_t chunkSize = ZSTD_DStreamInSize();
505   auto status = FileStatus::Continue;
506   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
507     // Make a new input queue that we will put the frames's bytes into.
508     auto in = std::make_shared<BufferWorkQueue>();
509     auto inGuard = makeScopeGuard([&] { in->finish(); });
510     // Make a output queue that decompress will put the decompressed data into
511     auto out = std::make_shared<BufferWorkQueue>();
512 
513     size_t frameSize;
514     {
515       // Calculate the size of the next frame.
516       // frameSize is 0 if the frame info can't be decoded.
517       Buffer buffer(SkippableFrame::kSize);
518       auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
519       totalBytesRead += bytesRead;
520       status = fileStatus(fd);
521       if (bytesRead == 0 && status != FileStatus::Continue) {
522         break;
523       }
524       buffer.subtract(buffer.size() - bytesRead);
525       frameSize = SkippableFrame::tryRead(buffer.range());
526       in->push(std::move(buffer));
527     }
528     if (frameSize == 0) {
529       // We hit a non SkippableFrame, so this will be the last job.
530       // Make sure that we don't use too much memory
531       in->setMaxSize(64);
532       out->setMaxSize(64);
533     }
534     // Start decompression in the thread pool
535     executor.add([&state, in, out] {
536       return decompress(state, std::move(in), std::move(out));
537     });
538     // Pass the output queue to the writer thread
539     frames.push(std::move(out));
540     if (frameSize == 0) {
541       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
542       // Pass the rest of the source to this decompression task
543       state.log(VERBOSE, "%s\n",
544           "Input not in pzstd format, falling back to serial decompression");
545       while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
546         status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
547       }
548       break;
549     }
550     state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize);
551     // Fill the input queue for the decompression job we just started
552     status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
553   }
554   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
555   return totalBytesRead;
556 }
557 
558 /// Write `data` to `fd`, returns true iff success.
writeData(ByteRange data,FILE * fd)559 static bool writeData(ByteRange data, FILE* fd) {
560   while (!data.empty()) {
561     data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
562     if (std::ferror(fd)) {
563       return false;
564     }
565   }
566   return true;
567 }
568 
writeFile(SharedState & state,WorkQueue<std::shared_ptr<BufferWorkQueue>> & outs,FILE * outputFd,bool decompress)569 std::uint64_t writeFile(
570     SharedState& state,
571     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
572     FILE* outputFd,
573     bool decompress) {
574   auto& errorHolder = state.errorHolder;
575   auto lineClearGuard = makeScopeGuard([&state] {
576     state.log.clear(INFO);
577   });
578   std::uint64_t bytesWritten = 0;
579   std::shared_ptr<BufferWorkQueue> out;
580   // Grab the output queue for each decompression job (in order).
581   while (outs.pop(out)) {
582     if (errorHolder.hasError()) {
583       continue;
584     }
585     if (!decompress) {
586       // If we are compressing and want to write skippable frames we can't
587       // start writing before compression is done because we need to know the
588       // compressed size.
589       // Wait for the compressed size to be available and write skippable frame
590       SkippableFrame frame(out->size());
591       if (!writeData(frame.data(), outputFd)) {
592         errorHolder.setError("Failed to write output");
593         return bytesWritten;
594       }
595       bytesWritten += frame.kSize;
596     }
597     // For each chunk of the frame: Pop it from the queue and write it
598     Buffer buffer;
599     while (out->pop(buffer) && !errorHolder.hasError()) {
600       if (!writeData(buffer.range(), outputFd)) {
601         errorHolder.setError("Failed to write output");
602         return bytesWritten;
603       }
604       bytesWritten += buffer.size();
605       state.log.update(INFO, "Written: %u MB   ",
606                 static_cast<std::uint32_t>(bytesWritten >> 20));
607     }
608   }
609   return bytesWritten;
610 }
611 }
612