1 #include "pixz.h"
2 
3 #include <archive.h>
4 #include <archive_entry.h>
5 
6 
7 #pragma mark TYPES
8 
9 typedef struct io_block_t io_block_t;
10 struct io_block_t {
11     lzma_block block;
12     uint8_t *input, *output;
13     size_t insize, outsize;
14 };
15 
16 
17 #pragma mark GLOBALS
18 
19 #define LZMA_CHUNK_MAX (1 << 16)
20 
21 double gBlockFraction = 2.0;
22 
23 static bool gTar = true;
24 
25 static size_t gBlockInSize = 0, gBlockOutSize = 0;
26 
27 static off_t gMultiHeaderStart = 0;
28 static bool gMultiHeader = false;
29 static off_t gTotalRead = 0;
30 
31 static pipeline_item_t *gReadItem = NULL;
32 static io_block_t *gReadBlock = NULL;
33 static size_t gReadItemCount = 0;
34 
35 static lzma_filter gFilters[LZMA_FILTERS_MAX + 1];
36 
37 static uint8_t gFileIndexBuf[CHUNKSIZE];
38 static size_t gFileIndexBufPos = 0;
39 
40 
41 #pragma mark FUNCTION DECLARATIONS
42 
43 static void read_thread();
44 
45 static void encode_thread(size_t thnum);
46 static void encode_uncompressible(io_block_t *ib);
47 static size_t size_uncompressible(size_t insize);
48 
49 static void *block_create();
50 static void block_free(void *data);
51 
52 typedef enum {
53 	BLOCK_IN = 1,
54 	BLOCK_OUT = 2,
55 	BLOCK_ALL = BLOCK_IN | BLOCK_OUT,
56 } block_parts;
57 static void block_alloc(io_block_t *ib, block_parts parts);
58 static void block_dealloc(io_block_t *ib, block_parts parts);
59 
60 static void add_file(off_t offset, const char *name);
61 
62 static archive_read_callback tar_read;
63 static archive_open_callback tar_ok;
64 static archive_close_callback tar_ok;
65 
66 static void block_init(lzma_block *block, size_t insize);
67 static void stream_edge(lzma_vli backward_size);
68 static void write_block(pipeline_item_t *pi);
69 static void encode_index(void);
70 
71 static void write_file_index(void);
72 static void write_file_index_bytes(size_t size, uint8_t *buf);
73 static void write_file_index_buf(lzma_action action);
74 
75 
76 #pragma mark FUNCTION DEFINITIONS
77 
pixz_write(bool tar,uint32_t level)78 void pixz_write(bool tar, uint32_t level) {
79     gTar = tar;
80 
81     // xz options
82     lzma_options_lzma lzma_opts;
83     if (lzma_lzma_preset(&lzma_opts, level))
84         die("Error setting lzma options");
85     gFilters[0] = (lzma_filter){ .id = LZMA_FILTER_LZMA2,
86             .options = &lzma_opts };
87     gFilters[1] = (lzma_filter){ .id = LZMA_VLI_UNKNOWN, .options = NULL };
88 
89     gBlockInSize = lzma_opts.dict_size * gBlockFraction;
90     if (gBlockInSize <= 0)
91         die("Block size must be positive");
92     gBlockOutSize = lzma_block_buffer_bound(gBlockInSize);
93 
94     pipeline_create(block_create, block_free, read_thread, encode_thread);
95     debug("writer: start");
96 
97     // pre-block setup: header, index
98     if (!(gIndex = lzma_index_init(NULL)))
99         die("Error creating index");
100     stream_edge(LZMA_VLI_UNKNOWN);
101 
102     // write blocks
103     while (true) {
104         pipeline_item_t *pi = pipeline_merged();
105         if (!pi)
106             break;
107 
108         debug("writer: received %zu", pi->seq);
109         write_block(pi);
110         queue_push(gPipelineStartQ, PIPELINE_ITEM, pi);
111     }
112 
113     // file index
114     if (gTar)
115         write_file_index();
116     free_file_index();
117 
118     // post-block cleanup: index, footer
119     encode_index();
120     stream_edge(lzma_index_size(gIndex));
121     lzma_index_end(gIndex, NULL);
122     fclose(gOutFile);
123 
124     debug("writer: cleaning up reader");
125     pipeline_destroy();
126 
127     debug("exit");
128 }
129 
130 
131 #pragma mark READING
132 
read_thread()133 static void read_thread() {
134     debug("reader: start");
135 
136     if (gTar) {
137 		struct archive *ar = archive_read_new();
138 	    prevent_compression(ar);
139 	    archive_read_support_format_tar(ar);
140 	    archive_read_support_format_raw(ar);
141 	    archive_read_open(ar, NULL, tar_ok, tar_read, tar_ok);
142 	    struct archive_entry *entry;
143 	    while (true) {
144 	        int aerr = archive_read_next_header(ar, &entry);
145 	        if (aerr == ARCHIVE_EOF) {
146 	            break;
147 	        } else if (aerr != ARCHIVE_OK && aerr != ARCHIVE_WARN) {
148 	            // Some charset translations warn spuriously
149 	            fprintf(stderr, "%s\n", archive_error_string(ar));
150 	            die("Error reading archive entry");
151 	        }
152 
153 	        if (archive_format(ar) == ARCHIVE_FORMAT_RAW) {
154 	            gTar = false;
155 				break;
156 			}
157             add_file(archive_read_header_position(ar),
158                 archive_entry_pathname(entry));
159 	    }
160 		if (archive_read_header_position(ar) == 0)
161 			gTar = false; // probably spuriously identified as tar
162     	finish_reading(ar);
163 	}
164 	if (!feof(gInFile)) {
165 		const void *dummy;
166 		while (tar_read(NULL, NULL, &dummy) != 0)
167 			; // just keep pumping
168 	}
169     fclose(gInFile);
170 
171 	if (gTar)
172         add_file(gTotalRead, NULL);
173 
174     // write last block, if necessary
175     if (gReadItem) {
176         // if this block had only one read, and it was EOF, it's waste
177         debug("reader: handling last block %zu", gReadItemCount);
178         if (gReadBlock->insize)
179             pipeline_split(gReadItem);
180         else
181             queue_push(gPipelineStartQ, PIPELINE_ITEM, gReadItem);
182         gReadItem = NULL;
183     }
184 
185     // stop the other threads
186     debug("reader: cleaning up encoders");
187     pipeline_stop();
188     debug("reader: end");
189 }
190 
tar_read(struct archive * ar,void * ref,const void ** bufp)191 static ssize_t tar_read(struct archive *ar, void *ref, const void **bufp) {
192     if (!gReadItem) {
193         queue_pop(gPipelineStartQ, (void**)&gReadItem);
194         gReadBlock = (io_block_t*)(gReadItem->data);
195         block_alloc(gReadBlock, BLOCK_IN);
196         gReadBlock->insize = 0;
197         debug("reader: reading %zu", gReadItemCount);
198     }
199 
200     size_t space = gBlockInSize - gReadBlock->insize;
201     if (space > CHUNKSIZE)
202         space = CHUNKSIZE;
203     uint8_t *buf = gReadBlock->input + gReadBlock->insize;
204     size_t rd = fread(buf, 1, space, gInFile);
205     if (ferror(gInFile))
206         die("Error reading input file");
207     gReadBlock->insize += rd;
208     gTotalRead += rd;
209     *bufp = buf;
210 
211     if (gReadBlock->insize == gBlockInSize) {
212         debug("reader: sending %zu", gReadItemCount);
213         pipeline_split(gReadItem);
214         ++gReadItemCount;
215         gReadItem = NULL;
216     }
217 
218     return rd;
219 }
220 
tar_ok(struct archive * ar,void * ref)221 static int tar_ok(struct archive *ar, void *ref) {
222     return ARCHIVE_OK;
223 }
224 
add_file(off_t offset,const char * name)225 static void add_file(off_t offset, const char *name) {
226     if (name && is_multi_header(name)) {
227         if (!gMultiHeader)
228             gMultiHeaderStart = offset;
229         gMultiHeader = true;
230         return;
231     }
232 
233     file_index_t *f = malloc(sizeof(file_index_t));
234     f->offset = gMultiHeader ? gMultiHeaderStart : offset;
235     gMultiHeader = false;
236     f->name = name ? xstrdup(name) : NULL;
237     f->next = NULL;
238 
239     if (gLastFile) {
240         gLastFile->next = f;
241     } else { // new index
242         gFileIndex = f;
243     }
244     gLastFile = f;
245 }
246 
block_free(void * data)247 static void block_free(void *data) {
248     io_block_t *ib = (io_block_t*)data;
249     free(ib->input);
250     free(ib->output);
251     free(ib);
252 }
253 
block_create()254 static void *block_create() {
255     io_block_t *ib = malloc(sizeof(io_block_t));
256     ib->input = ib->output = NULL;
257     return ib;
258 }
259 
block_alloc(io_block_t * ib,block_parts parts)260 static void block_alloc(io_block_t *ib, block_parts parts) {
261     if ((parts & BLOCK_IN) && !ib->input)
262         ib->input = malloc(gBlockInSize);
263     if ((parts & BLOCK_IN) && !ib->output)
264         ib->output = malloc(gBlockOutSize);
265     if (!ib->input || !ib->output)
266         die("Can't allocate blocks");
267 }
268 
block_dealloc(io_block_t * ib,block_parts parts)269 static void block_dealloc(io_block_t *ib, block_parts parts) {
270     if (parts & BLOCK_IN) {
271 		free(ib->input);
272 		ib->input = NULL;
273 	}
274     if (parts & BLOCK_OUT) {
275 		free(ib->output);
276 		ib->output = NULL;
277 	}
278 }
279 
280 
281 #pragma mark ENCODING
282 
size_uncompressible(size_t insize)283 static size_t size_uncompressible(size_t insize) {
284     size_t chunks = insize / LZMA_CHUNK_MAX;
285     if (insize % LZMA_CHUNK_MAX)
286         ++chunks;
287     // Per chunk (control code + 2-byte size), one byte for EOF
288     size_t data_size = insize + chunks * 3 + 1;
289     if (data_size % 4)
290         data_size += 4 - data_size % 4; // Padding
291     return data_size;
292 }
293 
encode_uncompressible(io_block_t * ib)294 static void encode_uncompressible(io_block_t *ib) {
295     // See http://en.wikipedia.org/wiki/Lzma#LZMA2_format
296     const uint8_t control_uncomp = 1;
297     const uint8_t control_end = 0;
298 
299     uint8_t *output_start = ib->output + ib->block.header_size;
300     uint8_t *output = output_start;
301     uint8_t *input = ib->input;
302     size_t remain = ib->insize;
303 
304     while (remain) {
305         size_t size = remain;
306         if (size > LZMA_CHUNK_MAX)
307             size = LZMA_CHUNK_MAX;
308 
309         // control byte for uncompressed block
310         *output++ = control_uncomp;
311 
312         // 16-bit big endian (size - 1)
313         uint16_t size_write = size - 1;
314         *output++ = (size_write >> 8);
315         *output++ = (size_write & 0xFF);
316 
317         // actual chunk data
318         memcpy(output, input, size);
319 
320         remain -= size;
321         output += size;
322         input += size;
323     }
324     // control byte for end of block
325     *output++ = control_end;
326 
327     ib->block.compressed_size = output - output_start;
328     ib->block.uncompressed_size = ib->insize;
329 
330     // padding
331     while ((output - output_start) % 4)
332         *output++ = 0;
333 
334     // checksum (little endian)
335     if (ib->block.check != LZMA_CHECK_CRC32)
336         die("pixz only supports CRC-32 checksums");
337     uint32_t check = lzma_crc32(ib->input, ib->insize, 0);
338     *output++ = check & 0xFF;
339     *output++ = (check >> 8) & 0xFF;
340     *output++ = (check >> 16) & 0xFF;
341     *output++ = (check >> 24);
342 }
343 
encode_thread(size_t thnum)344 static void encode_thread(size_t thnum) {
345     lzma_stream stream = LZMA_STREAM_INIT;
346     while (true) {
347         pipeline_item_t *pi;
348         int msg = queue_pop(gPipelineSplitQ, (void**)&pi);
349         if (msg == PIPELINE_STOP)
350             break;
351 
352         debug("encoder %zu: received %zu", thnum, pi->seq);
353         io_block_t *ib = (io_block_t*)(pi->data);
354 
355 		block_alloc(ib, BLOCK_OUT);
356         block_init(&ib->block, ib->insize);
357         size_t header_size = ib->block.header_size;
358         size_t uncompressible_size = size_uncompressible(ib->insize) +
359             lzma_check_size(ib->block.check);
360 
361         if (lzma_block_encoder(&stream, &ib->block) != LZMA_OK)
362             die("Error creating block encoder");
363         stream.next_in = ib->input;
364         stream.avail_in = ib->insize;
365         stream.next_out = ib->output + header_size;
366         stream.avail_out = uncompressible_size;
367 
368         ib->block.uncompressed_size = LZMA_VLI_UNKNOWN; // for encoder to change
369         lzma_ret err = LZMA_OK;
370         while (err == LZMA_OK) {
371             err = lzma_code(&stream, LZMA_FINISH);
372         }
373         if (err == LZMA_BUF_ERROR) {
374             debug("encoder: uncompressible %zu", pi->seq);
375             encode_uncompressible(ib);
376             ib->outsize = header_size + uncompressible_size;
377         } else if (err == LZMA_STREAM_END) {
378             ib->outsize = stream.next_out - ib->output;
379         } else {
380             die("Error encoding block");
381         }
382         block_dealloc(ib, BLOCK_IN);
383 
384         if (lzma_block_header_encode(&ib->block, ib->output) != LZMA_OK)
385             die("Error encoding block header");
386 
387 		debug("encoder %zu: sending %zu", thnum, pi->seq);
388         queue_push(gPipelineMergeQ, PIPELINE_ITEM, pi);
389     }
390 
391     lzma_end(&stream);
392 }
393 
394 
395 #pragma mark WRITING
396 
block_init(lzma_block * block,size_t insize)397 static void block_init(lzma_block *block, size_t insize) {
398     block->version = 0;
399     block->check = CHECK;
400     block->filters = gFilters;
401 	block->uncompressed_size = insize ? insize : LZMA_VLI_UNKNOWN;
402     block->compressed_size = insize ? gBlockOutSize : LZMA_VLI_UNKNOWN;
403 
404     if (lzma_block_header_size(block) != LZMA_OK)
405         die("Error getting block header size");
406 }
407 
stream_edge(lzma_vli backward_size)408 static void stream_edge(lzma_vli backward_size) {
409     lzma_stream_flags flags = { .version = 0, .check = CHECK,
410         .backward_size = backward_size };
411     uint8_t buf[LZMA_STREAM_HEADER_SIZE];
412 
413     lzma_ret (*encoder)(const lzma_stream_flags *flags, uint8_t *buf);
414     encoder = backward_size == LZMA_VLI_UNKNOWN
415         ? &lzma_stream_header_encode
416         : &lzma_stream_footer_encode;
417     if ((*encoder)(&flags, buf) != LZMA_OK)
418         die("Error encoding stream edge");
419 
420     if (fwrite(buf, LZMA_STREAM_HEADER_SIZE, 1, gOutFile) != 1)
421         die("Error writing stream edge");
422 }
423 
write_block(pipeline_item_t * pi)424 static void write_block(pipeline_item_t *pi) {
425     debug("writer: writing %zu", pi->seq);
426     io_block_t *ib = (io_block_t*)(pi->data);
427 
428     // Does it make sense to chunk this?
429     size_t written = 0;
430     while (ib->outsize > written) {
431         size_t size = ib->outsize - written;
432         if (size > CHUNKSIZE)
433             size = CHUNKSIZE;
434         if (fwrite(ib->output + written, size, 1, gOutFile) != 1)
435             die("Error writing block data");
436         written += size;
437     }
438 
439     if (lzma_index_append(gIndex, NULL,
440             lzma_block_unpadded_size(&ib->block),
441             ib->block.uncompressed_size) != LZMA_OK)
442         die("Error adding to index");
443 
444     block_dealloc(ib, BLOCK_ALL);
445     debug("writer: writing %zu complete", pi->seq);
446 }
447 
encode_index(void)448 static void encode_index(void) {
449     if (lzma_index_encoder(&gStream, gIndex) != LZMA_OK)
450         die("Error creating index encoder");
451     uint8_t obuf[CHUNKSIZE];
452     lzma_ret err = LZMA_OK;
453     while (err != LZMA_STREAM_END) {
454         gStream.next_out = obuf;
455         gStream.avail_out = CHUNKSIZE;
456         err = lzma_code(&gStream, LZMA_RUN);
457         if (err != LZMA_OK && err != LZMA_STREAM_END)
458             die("Error encoding index");
459         if (gStream.avail_out != CHUNKSIZE) {
460             if (fwrite(obuf, CHUNKSIZE - gStream.avail_out, 1, gOutFile) != 1)
461                 die("Error writing index data");
462         }
463     }
464     lzma_end(&gStream);
465 }
466 
write_file_index(void)467 static void write_file_index(void) {
468     lzma_block block;
469     block_init(&block, 0);
470     uint8_t hdrbuf[block.header_size];
471     if (lzma_block_header_encode(&block, hdrbuf) != LZMA_OK)
472         die("Error encoding file index header");
473     if (fwrite(hdrbuf, block.header_size, 1, gOutFile) != 1)
474         die("Error writing file index header");
475 
476     if (lzma_block_encoder(&gStream, &block) != LZMA_OK)
477         die("Error creating file index encoder");
478 
479     uint8_t offbuf[sizeof(uint64_t)];
480     xle64enc(offbuf, PIXZ_INDEX_MAGIC);
481     write_file_index_bytes(sizeof(offbuf), offbuf);
482     for (file_index_t *f = gFileIndex; f != NULL; f = f->next) {
483         char *name = f->name ? f->name : "";
484         size_t len = strlen(name);
485         write_file_index_bytes(len + 1, (uint8_t*)name);
486         xle64enc(offbuf, f->offset);
487         write_file_index_bytes(sizeof(offbuf), offbuf);
488     }
489     write_file_index_buf(LZMA_FINISH);
490 
491     if (lzma_index_append(gIndex, NULL, lzma_block_unpadded_size(&block),
492             block.uncompressed_size) != LZMA_OK)
493         die("Error adding file-index to index");
494     lzma_end(&gStream);
495 }
496 
write_file_index_bytes(size_t size,uint8_t * buf)497 static void write_file_index_bytes(size_t size, uint8_t *buf) {
498     size_t bufpos = 0;
499     while (bufpos < size) {
500         size_t len = size - bufpos;
501         size_t space = CHUNKSIZE - gFileIndexBufPos;
502         if (len > space)
503             len = space;
504         memcpy(gFileIndexBuf + gFileIndexBufPos, buf + bufpos, len);
505         gFileIndexBufPos += len;
506         bufpos += len;
507 
508         if (gFileIndexBufPos == CHUNKSIZE) {
509             write_file_index_buf(LZMA_RUN);
510             gFileIndexBufPos = 0;
511         }
512     }
513 }
514 
write_file_index_buf(lzma_action action)515 static void write_file_index_buf(lzma_action action) {
516     uint8_t obuf[CHUNKSIZE];
517     gStream.avail_in = gFileIndexBufPos;
518     gStream.next_in = gFileIndexBuf;
519     lzma_ret err = LZMA_OK;
520     while (err != LZMA_STREAM_END && (action == LZMA_FINISH || gStream.avail_in)) {
521         gStream.avail_out = CHUNKSIZE;
522         gStream.next_out = obuf;
523         err = lzma_code(&gStream, action);
524         if (err != LZMA_OK && err != LZMA_STREAM_END)
525             die("Error encoding file index");
526         if (gStream.avail_out != CHUNKSIZE) {
527             if (fwrite(obuf, CHUNKSIZE - gStream.avail_out, 1, gOutFile) != 1)
528                 die("Error writing file index");
529         }
530     }
531 
532     gFileIndexBufPos = 0;
533 }
534