1 #include <functional>
2 #include <fcntl.h>
3 #include "compress.h"
4 #include "uv_helper.h"
5 #include "cso.h"
6 #include "dax.h"
7 #include "input.h"
8 #include "output.h"
9 #include "buffer_pool.h"
10
11 namespace maxcso {
12
13 // Anything above this is insane. This value is even insane.
14 static const uint32_t MAX_BLOCK_SIZE = 0x40000;
15
16 // These are the default block sizes.
17 static const uint32_t SMALL_BLOCK_SIZE = 2048;
18 static const uint32_t LARGE_BLOCK_SIZE = 16384;
19 // We use the LARGE_BLOCK_SIZE default for files larger than 2GB.
20 static const int64_t LARGE_BLOCK_SIZE_THRESH = 0x80000000;
21
22 // This actually handles decompression too. They're basically the same.
23 class CompressionTask {
24 public:
CompressionTask(uv_loop_t * loop,const Task & t)25 CompressionTask(uv_loop_t *loop, const Task &t)
26 : task_(t), loop_(loop), input_(-1), inputHandler_(loop), outputHandler_(loop, t), output_(-1),
27 blockSize_(0) {
28 }
~CompressionTask()29 ~CompressionTask() {
30 Cleanup();
31 }
32
33 void Enqueue();
34 void Cleanup();
35
36 private:
Notify(TaskStatus status,int64_t pos=-1,int64_t total=-1,int64_t written=-1)37 void Notify(TaskStatus status, int64_t pos = -1, int64_t total = -1, int64_t written = -1) {
38 if (status == TASK_INPROGRESS || status == TASK_SUCCESS) {
39 task_.progress(&task_, status, pos, total, written);
40 } else {
41 task_.error(&task_, status, nullptr);
42 }
43 }
Notify(TaskStatus status,const char * reason)44 void Notify(TaskStatus status, const char *reason) {
45 task_.error(&task_, status, reason);
46 }
47
48 void BeginProcessing();
49
50 UVHelper uv_;
51 const Task &task_;
52 uv_loop_t *loop_;
53
54 uv_fs_t read_;
55 uv_fs_t write_;
56
57 uv_file input_;
58 Input inputHandler_;
59 Output outputHandler_;
60 uv_file output_;
61 uint32_t blockSize_;
62 };
63
Enqueue()64 void CompressionTask::Enqueue() {
65 if (task_.block_size == DEFAULT_BLOCK_SIZE) {
66 if (task_.flags & TASKFLAG_FMT_DAX) {
67 blockSize_ = DAX_FRAME_SIZE;
68 } else {
69 // Start with a small block size.
70 // We'll re-evaluate later.
71 blockSize_ = SMALL_BLOCK_SIZE;
72 }
73 } else {
74 if (task_.block_size > MAX_BLOCK_SIZE) {
75 Notify(TASK_INVALID_OPTION, "Block size too large");
76 return;
77 }
78 if (task_.block_size < SECTOR_SIZE) {
79 Notify(TASK_INVALID_OPTION, "Block size too small, must be at least 2048");
80 return;
81 }
82 if ((task_.block_size & (task_.block_size - 1)) != 0) {
83 Notify(TASK_INVALID_OPTION, "Block size must be a power of two");
84 return;
85 }
86 blockSize_ = task_.block_size;
87 }
88 if (!pool.SetBufferSize(blockSize_ * 2)) {
89 Notify(TASK_INVALID_OPTION, "Unable to update buffer size to match block size");
90 return;
91 }
92
93 // We open input and output in order in case there are errors.
94 uv_.fs_open(loop_, &read_, task_.input.c_str(), O_RDONLY, 0444, [this](uv_fs_t *req) {
95 if (req->result < 0) {
96 Notify(TASK_BAD_INPUT, "Could not open input file");
97 } else {
98 input_ = static_cast<uv_file>(req->result);
99 uv_.fs_open(loop_, &write_, task_.output.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644, [this](uv_fs_t *req) {
100 if (req->result < 0) {
101 Notify(TASK_BAD_OUTPUT, "Could not open output file");
102 } else {
103 output_ = static_cast<uv_file>(req->result);
104
105 // Okay, both files opened fine, it's time to turn on the tap.
106 BeginProcessing();
107 }
108 uv_fs_req_cleanup(req);
109 });
110 }
111 uv_fs_req_cleanup(req);
112 });
113 }
114
Cleanup()115 void CompressionTask::Cleanup() {
116 if (input_ >= 0) {
117 uv_fs_close(loop_, &read_, input_, nullptr);
118 uv_fs_req_cleanup(&read_);
119 input_ = -1;
120 }
121 if (output_ >= 0) {
122 uv_fs_close(loop_, &write_, output_, nullptr);
123 uv_fs_req_cleanup(&write_);
124 output_ = -1;
125 }
126 }
127
BeginProcessing()128 void CompressionTask::BeginProcessing() {
129 inputHandler_.OnFinish([this](bool success, const char *reason) {
130 if (!success) {
131 Notify(TASK_INVALID_DATA, reason);
132 }
133 });
134 outputHandler_.OnFinish([this](bool success, const char *reason) {
135 if (success) {
136 Notify(TASK_SUCCESS);
137 } else {
138 // Abort reading.
139 inputHandler_.Pause();
140 Notify(TASK_CANNOT_WRITE, reason);
141 }
142 });
143
144 outputHandler_.OnProgress([this](int64_t pos, int64_t total, int64_t written) {
145 // If it was paused, the queue has space now.
146 inputHandler_.Resume();
147 Notify(TASK_INPROGRESS, pos, total, written);
148 });
149
150 inputHandler_.OnBegin([this](int64_t size) {
151 CSOFormat fmt = CSO_FMT_CSO1;
152 if (task_.flags & TASKFLAG_FMT_CSO_2) {
153 fmt = CSO_FMT_CSO2;
154 } else if (task_.flags & TASKFLAG_FMT_ZSO) {
155 fmt = CSO_FMT_ZSO;
156 } else if (task_.flags & TASKFLAG_FMT_DAX) {
157 fmt = CSO_FMT_DAX;
158 }
159
160 // Now that we know the file size, check if we should resize the blockSize_.
161 if (task_.block_size == DEFAULT_BLOCK_SIZE && size >= LARGE_BLOCK_SIZE_THRESH) {
162 blockSize_ = LARGE_BLOCK_SIZE;
163 if (!pool.SetBufferSize(blockSize_ * 2)) {
164 // Abort reading.
165 inputHandler_.Pause();
166 Notify(TASK_INVALID_OPTION, "Unable to update buffer size to match block size");
167 return;
168 }
169 }
170
171 outputHandler_.SetFile(output_, size, blockSize_, fmt);
172 Notify(TASK_INPROGRESS, 0, size, 0);
173 });
174 inputHandler_.Pipe(input_, [this](int64_t pos, uint8_t *sector) {
175 outputHandler_.Enqueue(pos, sector);
176 if (outputHandler_.QueueFull()) {
177 inputHandler_.Pause();
178 }
179 });
180 }
181
Compress(const std::vector<Task> & tasks)182 void Compress(const std::vector<Task> &tasks) {
183 uv_loop_t loop;
184 uv_loop_init(&loop);
185
186 for (const Task t : tasks) {
187 CompressionTask task(&loop, t);
188 task.Enqueue();
189 uv_run(&loop, UV_RUN_DEFAULT);
190 }
191
192 // Run any remaining events from destructors.
193 uv_run(&loop, UV_RUN_DEFAULT);
194
195 uv_loop_close(&loop);
196 }
197
198 };
199