1 /*
2  * Copyright (c) 2016-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9 #pragma once
10 
11 #include "ErrorHolder.h"
12 #include "Logging.h"
13 #include "Options.h"
14 #include "utils/Buffer.h"
15 #include "utils/Range.h"
16 #include "utils/ResourcePool.h"
17 #include "utils/ThreadPool.h"
18 #include "utils/WorkQueue.h"
19 #define ZSTD_STATIC_LINKING_ONLY
20 #include "zstd.h"
21 #undef ZSTD_STATIC_LINKING_ONLY
22 
23 #include <cstddef>
24 #include <cstdint>
25 #include <memory>
26 
27 namespace pzstd {
28 /**
29  * Runs pzstd with `options` and returns the number of bytes written.
30  * An error occurred if `errorHandler.hasError()`.
31  *
32  * @param options      The pzstd options to use for (de)compression
33  * @returns            0 upon success and non-zero on failure.
34  */
35 int pzstdMain(const Options& options);
36 
37 class SharedState {
38  public:
SharedState(const Options & options)39   SharedState(const Options& options) : log(options.verbosity) {
40     if (!options.decompress) {
41       auto parameters = options.determineParameters();
42       cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
43           [this, parameters]() -> ZSTD_CStream* {
44             this->log(VERBOSE, "%s\n", "Creating new ZSTD_CStream");
45             auto zcs = ZSTD_createCStream();
46             if (zcs) {
47               auto err = ZSTD_initCStream_advanced(
48                   zcs, nullptr, 0, parameters, 0);
49               if (ZSTD_isError(err)) {
50                 ZSTD_freeCStream(zcs);
51                 return nullptr;
52               }
53             }
54             return zcs;
55           },
56           [](ZSTD_CStream *zcs) {
57             ZSTD_freeCStream(zcs);
58           }});
59     } else {
60       dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
61           [this]() -> ZSTD_DStream* {
62             this->log(VERBOSE, "%s\n", "Creating new ZSTD_DStream");
63             auto zds = ZSTD_createDStream();
64             if (zds) {
65               auto err = ZSTD_initDStream(zds);
66               if (ZSTD_isError(err)) {
67                 ZSTD_freeDStream(zds);
68                 return nullptr;
69               }
70             }
71             return zds;
72           },
73           [](ZSTD_DStream *zds) {
74             ZSTD_freeDStream(zds);
75           }});
76     }
77   }
78 
~SharedState()79   ~SharedState() {
80     // The resource pools have references to this, so destroy them first.
81     cStreamPool.reset();
82     dStreamPool.reset();
83   }
84 
85   Logger log;
86   ErrorHolder errorHolder;
87   std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;
88   std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool;
89 };
90 
91 /**
92  * Streams input from `fd`, breaks input up into chunks, and compresses each
93  * chunk independently.  Output of each chunk gets streamed to a queue, and
94  * the output queues get put into `chunks` in order.
95  *
96  * @param state        The shared state
97  * @param chunks       Each compression jobs output queue gets `pushed()` here
98  *                      as soon as it is available
99  * @param executor     The thread pool to run compression jobs in
100  * @param fd           The input file descriptor
101  * @param size         The size of the input file if known, 0 otherwise
102  * @param numThreads   The number of threads in the thread pool
103  * @param parameters   The zstd parameters to use for compression
104  * @returns            The number of bytes read from the file
105  */
106 std::uint64_t asyncCompressChunks(
107     SharedState& state,
108     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
109     ThreadPool& executor,
110     FILE* fd,
111     std::uintmax_t size,
112     std::size_t numThreads,
113     ZSTD_parameters parameters);
114 
115 /**
116  * Streams input from `fd`.  If pzstd headers are available it breaks the input
117  * up into independent frames.  It sends each frame to an independent
118  * decompression job.  Output of each frame gets streamed to a queue, and
119  * the output queues get put into `frames` in order.
120  *
121  * @param state        The shared state
122  * @param frames       Each decompression jobs output queue gets `pushed()` here
123  *                      as soon as it is available
124  * @param executor     The thread pool to run compression jobs in
125  * @param fd           The input file descriptor
126  * @returns            The number of bytes read from the file
127  */
128 std::uint64_t asyncDecompressFrames(
129     SharedState& state,
130     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
131     ThreadPool& executor,
132     FILE* fd);
133 
134 /**
135  * Streams input in from each queue in `outs` in order, and writes the data to
136  * `outputFd`.
137  *
138  * @param state        The shared state
139  * @param outs         A queue of output queues, one for each
140  *                      (de)compression job.
141  * @param outputFd     The file descriptor to write to
142  * @param decompress   Are we decompressing?
143  * @returns            The number of bytes written
144  */
145 std::uint64_t writeFile(
146     SharedState& state,
147     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
148     FILE* outputFd,
149     bool decompress);
150 }
151