1 #include "pixz.h"
2
3 #include <archive.h>
4 #include <archive_entry.h>
5
6
7 #pragma mark TYPES
8
9 typedef struct io_block_t io_block_t;
10 struct io_block_t {
11 lzma_block block;
12 uint8_t *input, *output;
13 size_t insize, outsize;
14 };
15
16
17 #pragma mark GLOBALS
18
19 #define LZMA_CHUNK_MAX (1 << 16)
20
21 double gBlockFraction = 2.0;
22
23 static bool gTar = true;
24
25 static size_t gBlockInSize = 0, gBlockOutSize = 0;
26
27 static off_t gMultiHeaderStart = 0;
28 static bool gMultiHeader = false;
29 static off_t gTotalRead = 0;
30
31 static pipeline_item_t *gReadItem = NULL;
32 static io_block_t *gReadBlock = NULL;
33 static size_t gReadItemCount = 0;
34
35 static lzma_filter gFilters[LZMA_FILTERS_MAX + 1];
36
37 static uint8_t gFileIndexBuf[CHUNKSIZE];
38 static size_t gFileIndexBufPos = 0;
39
40
41 #pragma mark FUNCTION DECLARATIONS
42
43 static void read_thread();
44
45 static void encode_thread(size_t thnum);
46 static void encode_uncompressible(io_block_t *ib);
47 static size_t size_uncompressible(size_t insize);
48
49 static void *block_create();
50 static void block_free(void *data);
51
52 typedef enum {
53 BLOCK_IN = 1,
54 BLOCK_OUT = 2,
55 BLOCK_ALL = BLOCK_IN | BLOCK_OUT,
56 } block_parts;
57 static void block_alloc(io_block_t *ib, block_parts parts);
58 static void block_dealloc(io_block_t *ib, block_parts parts);
59
60 static void add_file(off_t offset, const char *name);
61
62 static archive_read_callback tar_read;
63 static archive_open_callback tar_ok;
64 static archive_close_callback tar_ok;
65
66 static void block_init(lzma_block *block, size_t insize);
67 static void stream_edge(lzma_vli backward_size);
68 static void write_block(pipeline_item_t *pi);
69 static void encode_index(void);
70
71 static void write_file_index(void);
72 static void write_file_index_bytes(size_t size, uint8_t *buf);
73 static void write_file_index_buf(lzma_action action);
74
75
76 #pragma mark FUNCTION DEFINITIONS
77
pixz_write(bool tar,uint32_t level)78 void pixz_write(bool tar, uint32_t level) {
79 gTar = tar;
80
81 // xz options
82 lzma_options_lzma lzma_opts;
83 if (lzma_lzma_preset(&lzma_opts, level))
84 die("Error setting lzma options");
85 gFilters[0] = (lzma_filter){ .id = LZMA_FILTER_LZMA2,
86 .options = &lzma_opts };
87 gFilters[1] = (lzma_filter){ .id = LZMA_VLI_UNKNOWN, .options = NULL };
88
89 gBlockInSize = lzma_opts.dict_size * gBlockFraction;
90 if (gBlockInSize <= 0)
91 die("Block size must be positive");
92 gBlockOutSize = lzma_block_buffer_bound(gBlockInSize);
93
94 pipeline_create(block_create, block_free, read_thread, encode_thread);
95 debug("writer: start");
96
97 // pre-block setup: header, index
98 if (!(gIndex = lzma_index_init(NULL)))
99 die("Error creating index");
100 stream_edge(LZMA_VLI_UNKNOWN);
101
102 // write blocks
103 while (true) {
104 pipeline_item_t *pi = pipeline_merged();
105 if (!pi)
106 break;
107
108 debug("writer: received %zu", pi->seq);
109 write_block(pi);
110 queue_push(gPipelineStartQ, PIPELINE_ITEM, pi);
111 }
112
113 // file index
114 if (gTar)
115 write_file_index();
116 free_file_index();
117
118 // post-block cleanup: index, footer
119 encode_index();
120 stream_edge(lzma_index_size(gIndex));
121 lzma_index_end(gIndex, NULL);
122 fclose(gOutFile);
123
124 debug("writer: cleaning up reader");
125 pipeline_destroy();
126
127 debug("exit");
128 }
129
130
131 #pragma mark READING
132
read_thread()133 static void read_thread() {
134 debug("reader: start");
135
136 if (gTar) {
137 struct archive *ar = archive_read_new();
138 prevent_compression(ar);
139 archive_read_support_format_tar(ar);
140 archive_read_support_format_raw(ar);
141 archive_read_open(ar, NULL, tar_ok, tar_read, tar_ok);
142 struct archive_entry *entry;
143 while (true) {
144 int aerr = archive_read_next_header(ar, &entry);
145 if (aerr == ARCHIVE_EOF) {
146 break;
147 } else if (aerr != ARCHIVE_OK && aerr != ARCHIVE_WARN) {
148 // Some charset translations warn spuriously
149 fprintf(stderr, "%s\n", archive_error_string(ar));
150 die("Error reading archive entry");
151 }
152
153 if (archive_format(ar) == ARCHIVE_FORMAT_RAW) {
154 gTar = false;
155 break;
156 }
157 add_file(archive_read_header_position(ar),
158 archive_entry_pathname(entry));
159 }
160 if (archive_read_header_position(ar) == 0)
161 gTar = false; // probably spuriously identified as tar
162 finish_reading(ar);
163 }
164 if (!feof(gInFile)) {
165 const void *dummy;
166 while (tar_read(NULL, NULL, &dummy) != 0)
167 ; // just keep pumping
168 }
169 fclose(gInFile);
170
171 if (gTar)
172 add_file(gTotalRead, NULL);
173
174 // write last block, if necessary
175 if (gReadItem) {
176 // if this block had only one read, and it was EOF, it's waste
177 debug("reader: handling last block %zu", gReadItemCount);
178 if (gReadBlock->insize)
179 pipeline_split(gReadItem);
180 else
181 queue_push(gPipelineStartQ, PIPELINE_ITEM, gReadItem);
182 gReadItem = NULL;
183 }
184
185 // stop the other threads
186 debug("reader: cleaning up encoders");
187 pipeline_stop();
188 debug("reader: end");
189 }
190
tar_read(struct archive * ar,void * ref,const void ** bufp)191 static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
192 if (!gReadItem) {
193 queue_pop(gPipelineStartQ, (void**)&gReadItem);
194 gReadBlock = (io_block_t*)(gReadItem->data);
195 block_alloc(gReadBlock, BLOCK_IN);
196 gReadBlock->insize = 0;
197 debug("reader: reading %zu", gReadItemCount);
198 }
199
200 size_t space = gBlockInSize - gReadBlock->insize;
201 if (space > CHUNKSIZE)
202 space = CHUNKSIZE;
203 uint8_t *buf = gReadBlock->input + gReadBlock->insize;
204 size_t rd = fread(buf, 1, space, gInFile);
205 if (ferror(gInFile))
206 die("Error reading input file");
207 gReadBlock->insize += rd;
208 gTotalRead += rd;
209 *bufp = buf;
210
211 if (gReadBlock->insize == gBlockInSize) {
212 debug("reader: sending %zu", gReadItemCount);
213 pipeline_split(gReadItem);
214 ++gReadItemCount;
215 gReadItem = NULL;
216 }
217
218 return rd;
219 }
220
tar_ok(struct archive * ar,void * ref)221 static int tar_ok(struct archive *ar, void *ref) {
222 return ARCHIVE_OK;
223 }
224
add_file(off_t offset,const char * name)225 static void add_file(off_t offset, const char *name) {
226 if (name && is_multi_header(name)) {
227 if (!gMultiHeader)
228 gMultiHeaderStart = offset;
229 gMultiHeader = true;
230 return;
231 }
232
233 file_index_t *f = malloc(sizeof(file_index_t));
234 f->offset = gMultiHeader ? gMultiHeaderStart : offset;
235 gMultiHeader = false;
236 f->name = name ? xstrdup(name) : NULL;
237 f->next = NULL;
238
239 if (gLastFile) {
240 gLastFile->next = f;
241 } else { // new index
242 gFileIndex = f;
243 }
244 gLastFile = f;
245 }
246
block_free(void * data)247 static void block_free(void *data) {
248 io_block_t *ib = (io_block_t*)data;
249 free(ib->input);
250 free(ib->output);
251 free(ib);
252 }
253
block_create()254 static void *block_create() {
255 io_block_t *ib = malloc(sizeof(io_block_t));
256 ib->input = ib->output = NULL;
257 return ib;
258 }
259
block_alloc(io_block_t * ib,block_parts parts)260 static void block_alloc(io_block_t *ib, block_parts parts) {
261 if ((parts & BLOCK_IN) && !ib->input)
262 ib->input = malloc(gBlockInSize);
263 if ((parts & BLOCK_IN) && !ib->output)
264 ib->output = malloc(gBlockOutSize);
265 if (!ib->input || !ib->output)
266 die("Can't allocate blocks");
267 }
268
block_dealloc(io_block_t * ib,block_parts parts)269 static void block_dealloc(io_block_t *ib, block_parts parts) {
270 if (parts & BLOCK_IN) {
271 free(ib->input);
272 ib->input = NULL;
273 }
274 if (parts & BLOCK_OUT) {
275 free(ib->output);
276 ib->output = NULL;
277 }
278 }
279
280
281 #pragma mark ENCODING
282
size_uncompressible(size_t insize)283 static size_t size_uncompressible(size_t insize) {
284 size_t chunks = insize / LZMA_CHUNK_MAX;
285 if (insize % LZMA_CHUNK_MAX)
286 ++chunks;
287 // Per chunk (control code + 2-byte size), one byte for EOF
288 size_t data_size = insize + chunks * 3 + 1;
289 if (data_size % 4)
290 data_size += 4 - data_size % 4; // Padding
291 return data_size;
292 }
293
encode_uncompressible(io_block_t * ib)294 static void encode_uncompressible(io_block_t *ib) {
295 // See http://en.wikipedia.org/wiki/Lzma#LZMA2_format
296 const uint8_t control_uncomp = 1;
297 const uint8_t control_end = 0;
298
299 uint8_t *output_start = ib->output + ib->block.header_size;
300 uint8_t *output = output_start;
301 uint8_t *input = ib->input;
302 size_t remain = ib->insize;
303
304 while (remain) {
305 size_t size = remain;
306 if (size > LZMA_CHUNK_MAX)
307 size = LZMA_CHUNK_MAX;
308
309 // control byte for uncompressed block
310 *output++ = control_uncomp;
311
312 // 16-bit big endian (size - 1)
313 uint16_t size_write = size - 1;
314 *output++ = (size_write >> 8);
315 *output++ = (size_write & 0xFF);
316
317 // actual chunk data
318 memcpy(output, input, size);
319
320 remain -= size;
321 output += size;
322 input += size;
323 }
324 // control byte for end of block
325 *output++ = control_end;
326
327 ib->block.compressed_size = output - output_start;
328 ib->block.uncompressed_size = ib->insize;
329
330 // padding
331 while ((output - output_start) % 4)
332 *output++ = 0;
333
334 // checksum (little endian)
335 if (ib->block.check != LZMA_CHECK_CRC32)
336 die("pixz only supports CRC-32 checksums");
337 uint32_t check = lzma_crc32(ib->input, ib->insize, 0);
338 *output++ = check & 0xFF;
339 *output++ = (check >> 8) & 0xFF;
340 *output++ = (check >> 16) & 0xFF;
341 *output++ = (check >> 24);
342 }
343
encode_thread(size_t thnum)344 static void encode_thread(size_t thnum) {
345 lzma_stream stream = LZMA_STREAM_INIT;
346 while (true) {
347 pipeline_item_t *pi;
348 int msg = queue_pop(gPipelineSplitQ, (void**)&pi);
349 if (msg == PIPELINE_STOP)
350 break;
351
352 debug("encoder %zu: received %zu", thnum, pi->seq);
353 io_block_t *ib = (io_block_t*)(pi->data);
354
355 block_alloc(ib, BLOCK_OUT);
356 block_init(&ib->block, ib->insize);
357 size_t header_size = ib->block.header_size;
358 size_t uncompressible_size = size_uncompressible(ib->insize) +
359 lzma_check_size(ib->block.check);
360
361 if (lzma_block_encoder(&stream, &ib->block) != LZMA_OK)
362 die("Error creating block encoder");
363 stream.next_in = ib->input;
364 stream.avail_in = ib->insize;
365 stream.next_out = ib->output + header_size;
366 stream.avail_out = uncompressible_size;
367
368 ib->block.uncompressed_size = LZMA_VLI_UNKNOWN; // for encoder to change
369 lzma_ret err = LZMA_OK;
370 while (err == LZMA_OK) {
371 err = lzma_code(&stream, LZMA_FINISH);
372 }
373 if (err == LZMA_BUF_ERROR) {
374 debug("encoder: uncompressible %zu", pi->seq);
375 encode_uncompressible(ib);
376 ib->outsize = header_size + uncompressible_size;
377 } else if (err == LZMA_STREAM_END) {
378 ib->outsize = stream.next_out - ib->output;
379 } else {
380 die("Error encoding block");
381 }
382 block_dealloc(ib, BLOCK_IN);
383
384 if (lzma_block_header_encode(&ib->block, ib->output) != LZMA_OK)
385 die("Error encoding block header");
386
387 debug("encoder %zu: sending %zu", thnum, pi->seq);
388 queue_push(gPipelineMergeQ, PIPELINE_ITEM, pi);
389 }
390
391 lzma_end(&stream);
392 }
393
394
395 #pragma mark WRITING
396
block_init(lzma_block * block,size_t insize)397 static void block_init(lzma_block *block, size_t insize) {
398 block->version = 0;
399 block->check = CHECK;
400 block->filters = gFilters;
401 block->uncompressed_size = insize ? insize : LZMA_VLI_UNKNOWN;
402 block->compressed_size = insize ? gBlockOutSize : LZMA_VLI_UNKNOWN;
403
404 if (lzma_block_header_size(block) != LZMA_OK)
405 die("Error getting block header size");
406 }
407
stream_edge(lzma_vli backward_size)408 static void stream_edge(lzma_vli backward_size) {
409 lzma_stream_flags flags = { .version = 0, .check = CHECK,
410 .backward_size = backward_size };
411 uint8_t buf[LZMA_STREAM_HEADER_SIZE];
412
413 lzma_ret (*encoder)(const lzma_stream_flags *flags, uint8_t *buf);
414 encoder = backward_size == LZMA_VLI_UNKNOWN
415 ? &lzma_stream_header_encode
416 : &lzma_stream_footer_encode;
417 if ((*encoder)(&flags, buf) != LZMA_OK)
418 die("Error encoding stream edge");
419
420 if (fwrite(buf, LZMA_STREAM_HEADER_SIZE, 1, gOutFile) != 1)
421 die("Error writing stream edge");
422 }
423
write_block(pipeline_item_t * pi)424 static void write_block(pipeline_item_t *pi) {
425 debug("writer: writing %zu", pi->seq);
426 io_block_t *ib = (io_block_t*)(pi->data);
427
428 // Does it make sense to chunk this?
429 size_t written = 0;
430 while (ib->outsize > written) {
431 size_t size = ib->outsize - written;
432 if (size > CHUNKSIZE)
433 size = CHUNKSIZE;
434 if (fwrite(ib->output + written, size, 1, gOutFile) != 1)
435 die("Error writing block data");
436 written += size;
437 }
438
439 if (lzma_index_append(gIndex, NULL,
440 lzma_block_unpadded_size(&ib->block),
441 ib->block.uncompressed_size) != LZMA_OK)
442 die("Error adding to index");
443
444 block_dealloc(ib, BLOCK_ALL);
445 debug("writer: writing %zu complete", pi->seq);
446 }
447
encode_index(void)448 static void encode_index(void) {
449 if (lzma_index_encoder(&gStream, gIndex) != LZMA_OK)
450 die("Error creating index encoder");
451 uint8_t obuf[CHUNKSIZE];
452 lzma_ret err = LZMA_OK;
453 while (err != LZMA_STREAM_END) {
454 gStream.next_out = obuf;
455 gStream.avail_out = CHUNKSIZE;
456 err = lzma_code(&gStream, LZMA_RUN);
457 if (err != LZMA_OK && err != LZMA_STREAM_END)
458 die("Error encoding index");
459 if (gStream.avail_out != CHUNKSIZE) {
460 if (fwrite(obuf, CHUNKSIZE - gStream.avail_out, 1, gOutFile) != 1)
461 die("Error writing index data");
462 }
463 }
464 lzma_end(&gStream);
465 }
466
write_file_index(void)467 static void write_file_index(void) {
468 lzma_block block;
469 block_init(&block, 0);
470 uint8_t hdrbuf[block.header_size];
471 if (lzma_block_header_encode(&block, hdrbuf) != LZMA_OK)
472 die("Error encoding file index header");
473 if (fwrite(hdrbuf, block.header_size, 1, gOutFile) != 1)
474 die("Error writing file index header");
475
476 if (lzma_block_encoder(&gStream, &block) != LZMA_OK)
477 die("Error creating file index encoder");
478
479 uint8_t offbuf[sizeof(uint64_t)];
480 xle64enc(offbuf, PIXZ_INDEX_MAGIC);
481 write_file_index_bytes(sizeof(offbuf), offbuf);
482 for (file_index_t *f = gFileIndex; f != NULL; f = f->next) {
483 char *name = f->name ? f->name : "";
484 size_t len = strlen(name);
485 write_file_index_bytes(len + 1, (uint8_t*)name);
486 xle64enc(offbuf, f->offset);
487 write_file_index_bytes(sizeof(offbuf), offbuf);
488 }
489 write_file_index_buf(LZMA_FINISH);
490
491 if (lzma_index_append(gIndex, NULL, lzma_block_unpadded_size(&block),
492 block.uncompressed_size) != LZMA_OK)
493 die("Error adding file-index to index");
494 lzma_end(&gStream);
495 }
496
write_file_index_bytes(size_t size,uint8_t * buf)497 static void write_file_index_bytes(size_t size, uint8_t *buf) {
498 size_t bufpos = 0;
499 while (bufpos < size) {
500 size_t len = size - bufpos;
501 size_t space = CHUNKSIZE - gFileIndexBufPos;
502 if (len > space)
503 len = space;
504 memcpy(gFileIndexBuf + gFileIndexBufPos, buf + bufpos, len);
505 gFileIndexBufPos += len;
506 bufpos += len;
507
508 if (gFileIndexBufPos == CHUNKSIZE) {
509 write_file_index_buf(LZMA_RUN);
510 gFileIndexBufPos = 0;
511 }
512 }
513 }
514
write_file_index_buf(lzma_action action)515 static void write_file_index_buf(lzma_action action) {
516 uint8_t obuf[CHUNKSIZE];
517 gStream.avail_in = gFileIndexBufPos;
518 gStream.next_in = gFileIndexBuf;
519 lzma_ret err = LZMA_OK;
520 while (err != LZMA_STREAM_END && (action == LZMA_FINISH || gStream.avail_in)) {
521 gStream.avail_out = CHUNKSIZE;
522 gStream.next_out = obuf;
523 err = lzma_code(&gStream, action);
524 if (err != LZMA_OK && err != LZMA_STREAM_END)
525 die("Error encoding file index");
526 if (gStream.avail_out != CHUNKSIZE) {
527 if (fwrite(obuf, CHUNKSIZE - gStream.avail_out, 1, gOutFile) != 1)
528 die("Error writing file index");
529 }
530 }
531
532 gFileIndexBufPos = 0;
533 }
534