1 #include <functional>
2 #include <fcntl.h>
3 #include "compress.h"
4 #include "uv_helper.h"
5 #include "cso.h"
6 #include "dax.h"
7 #include "input.h"
8 #include "output.h"
9 #include "buffer_pool.h"
10 
11 namespace maxcso {
12 
13 // Anything above this is insane.  This value is even insane.
14 static const uint32_t MAX_BLOCK_SIZE = 0x40000;
15 
16 // These are the default block sizes.
17 static const uint32_t SMALL_BLOCK_SIZE = 2048;
18 static const uint32_t LARGE_BLOCK_SIZE = 16384;
19 // We use the LARGE_BLOCK_SIZE default for files larger than 2GB.
20 static const int64_t LARGE_BLOCK_SIZE_THRESH = 0x80000000;
21 
22 // This actually handles decompression too.  They're basically the same.
23 class CompressionTask {
24 public:
CompressionTask(uv_loop_t * loop,const Task & t)25 	CompressionTask(uv_loop_t *loop, const Task &t)
26 		: task_(t), loop_(loop), input_(-1), inputHandler_(loop), outputHandler_(loop, t), output_(-1),
27 		blockSize_(0) {
28 	}
~CompressionTask()29 	~CompressionTask() {
30 		Cleanup();
31 	}
32 
33 	void Enqueue();
34 	void Cleanup();
35 
36 private:
Notify(TaskStatus status,int64_t pos=-1,int64_t total=-1,int64_t written=-1)37 	void Notify(TaskStatus status, int64_t pos = -1, int64_t total = -1, int64_t written = -1) {
38 		if (status == TASK_INPROGRESS || status == TASK_SUCCESS) {
39 			task_.progress(&task_, status, pos, total, written);
40 		} else {
41 			task_.error(&task_, status, nullptr);
42 		}
43 	}
Notify(TaskStatus status,const char * reason)44 	void Notify(TaskStatus status, const char *reason) {
45 		task_.error(&task_, status, reason);
46 	}
47 
48 	void BeginProcessing();
49 
50 	UVHelper uv_;
51 	const Task &task_;
52 	uv_loop_t *loop_;
53 
54 	uv_fs_t read_;
55 	uv_fs_t write_;
56 
57 	uv_file input_;
58 	Input inputHandler_;
59 	Output outputHandler_;
60 	uv_file output_;
61 	uint32_t blockSize_;
62 };
63 
Enqueue()64 void CompressionTask::Enqueue() {
65 	if (task_.block_size == DEFAULT_BLOCK_SIZE) {
66 		if (task_.flags & TASKFLAG_FMT_DAX) {
67 			blockSize_ = DAX_FRAME_SIZE;
68 		} else {
69 			// Start with a small block size.
70 			// We'll re-evaluate later.
71 			blockSize_ = SMALL_BLOCK_SIZE;
72 		}
73 	} else {
74 		if (task_.block_size > MAX_BLOCK_SIZE) {
75 			Notify(TASK_INVALID_OPTION, "Block size too large");
76 			return;
77 		}
78 		if (task_.block_size < SECTOR_SIZE) {
79 			Notify(TASK_INVALID_OPTION, "Block size too small, must be at least 2048");
80 			return;
81 		}
82 		if ((task_.block_size & (task_.block_size - 1)) != 0) {
83 			Notify(TASK_INVALID_OPTION, "Block size must be a power of two");
84 			return;
85 		}
86 		blockSize_ = task_.block_size;
87 	}
88 	if (!pool.SetBufferSize(blockSize_ * 2)) {
89 		Notify(TASK_INVALID_OPTION, "Unable to update buffer size to match block size");
90 		return;
91 	}
92 
93 	// We open input and output in order in case there are errors.
94 	uv_.fs_open(loop_, &read_, task_.input.c_str(), O_RDONLY, 0444, [this](uv_fs_t *req) {
95 		if (req->result < 0) {
96 			Notify(TASK_BAD_INPUT, "Could not open input file");
97 		} else {
98 			input_ = static_cast<uv_file>(req->result);
99 			uv_.fs_open(loop_, &write_, task_.output.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0644, [this](uv_fs_t *req) {
100 				if (req->result < 0) {
101 					Notify(TASK_BAD_OUTPUT, "Could not open output file");
102 				} else {
103 					output_ = static_cast<uv_file>(req->result);
104 
105 					// Okay, both files opened fine, it's time to turn on the tap.
106 					BeginProcessing();
107 				}
108 				uv_fs_req_cleanup(req);
109 			});
110 		}
111 		uv_fs_req_cleanup(req);
112 	});
113 }
114 
Cleanup()115 void CompressionTask::Cleanup() {
116 	if (input_ >= 0) {
117 		uv_fs_close(loop_, &read_, input_, nullptr);
118 		uv_fs_req_cleanup(&read_);
119 		input_ = -1;
120 	}
121 	if (output_ >= 0) {
122 		uv_fs_close(loop_, &write_, output_, nullptr);
123 		uv_fs_req_cleanup(&write_);
124 		output_ = -1;
125 	}
126 }
127 
BeginProcessing()128 void CompressionTask::BeginProcessing() {
129 	inputHandler_.OnFinish([this](bool success, const char *reason) {
130 		if (!success) {
131 			Notify(TASK_INVALID_DATA, reason);
132 		}
133 	});
134 	outputHandler_.OnFinish([this](bool success, const char *reason) {
135 		if (success) {
136 			Notify(TASK_SUCCESS);
137 		} else {
138 			// Abort reading.
139 			inputHandler_.Pause();
140 			Notify(TASK_CANNOT_WRITE, reason);
141 		}
142 	});
143 
144 	outputHandler_.OnProgress([this](int64_t pos, int64_t total, int64_t written) {
145 		// If it was paused, the queue has space now.
146 		inputHandler_.Resume();
147 		Notify(TASK_INPROGRESS, pos, total, written);
148 	});
149 
150 	inputHandler_.OnBegin([this](int64_t size) {
151 		CSOFormat fmt = CSO_FMT_CSO1;
152 		if (task_.flags & TASKFLAG_FMT_CSO_2) {
153 			fmt = CSO_FMT_CSO2;
154 		} else if (task_.flags & TASKFLAG_FMT_ZSO) {
155 			fmt = CSO_FMT_ZSO;
156 		} else if (task_.flags & TASKFLAG_FMT_DAX) {
157 			fmt = CSO_FMT_DAX;
158 		}
159 
160 		// Now that we know the file size, check if we should resize the blockSize_.
161 		if (task_.block_size == DEFAULT_BLOCK_SIZE && size >= LARGE_BLOCK_SIZE_THRESH) {
162 			blockSize_ = LARGE_BLOCK_SIZE;
163 			if (!pool.SetBufferSize(blockSize_ * 2)) {
164 				// Abort reading.
165 				inputHandler_.Pause();
166 				Notify(TASK_INVALID_OPTION, "Unable to update buffer size to match block size");
167 				return;
168 			}
169 		}
170 
171 		outputHandler_.SetFile(output_, size, blockSize_, fmt);
172 		Notify(TASK_INPROGRESS, 0, size, 0);
173 	});
174 	inputHandler_.Pipe(input_, [this](int64_t pos, uint8_t *sector) {
175 		outputHandler_.Enqueue(pos, sector);
176 		if (outputHandler_.QueueFull()) {
177 			inputHandler_.Pause();
178 		}
179 	});
180 }
181 
Compress(const std::vector<Task> & tasks)182 void Compress(const std::vector<Task> &tasks) {
183 	uv_loop_t loop;
184 	uv_loop_init(&loop);
185 
186 	for (const Task t : tasks) {
187 		CompressionTask task(&loop, t);
188 		task.Enqueue();
189 		uv_run(&loop, UV_RUN_DEFAULT);
190 	}
191 
192 	// Run any remaining events from destructors.
193 	uv_run(&loop, UV_RUN_DEFAULT);
194 
195 	uv_loop_close(&loop);
196 }
197 
198 };
199