1 /* -*- indent-tabs-mode: nil -*-
2  *
3  * Copyright 2016 Kubo Takehiro <kubo@jiubao.org>
4  *
5  * Redistribution and use in source and binary forms, with or without modification, are
6  * permitted provided that the following conditions are met:
7  *
8  *    1. Redistributions of source code must retain the above copyright notice, this list of
9  *       conditions and the following disclaimer.
10  *
11  *    2. Redistributions in binary form must reproduce the above copyright notice, this list
12  *       of conditions and the following disclaimer in the documentation and/or other materials
13  *       provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ''AS IS'' AND ANY EXPRESS OR IMPLIED
16  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> OR
18  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
20  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
21  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
22  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
23  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  * The views and conclusions contained in the software and documentation are those of the
26  * authors and should not be interpreted as representing official policies, either expressed
27  * or implied, of the authors.
28  *
29  */
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33 
34 #include <stdlib.h>
35 #include <string.h>
36 #include <errno.h>
37 #include <snappy-c.h>
38 #include "snzip.h"
39 
40 /* same with CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT in hadoop */
41 #define SNAPPY_BUFFER_SIZE_DEFAULT (256 * 1024)
42 
43 /* Calculate max_input_size from block_size as in hadoop-snappy.
44  *
45  * In SnappyCodec.createOutputStream(OutputStream out, Compressor compressor)
46  *
47  *     int compressionOverhead = (bufferSize / 6) + 32;
48  *
49  * In BlockCompressorStream(OutputStream out, Compressor compressor, int bufferSize, int compressionOverhead)
50  *
51  *     MAX_INPUT_SIZE = bufferSize - compressionOverhead;
52  */
hadoop_snappy_max_input_size(size_t block_size)53 size_t hadoop_snappy_max_input_size(size_t block_size)
54 {
55   const size_t buffer_size = block_size ? block_size : SNAPPY_BUFFER_SIZE_DEFAULT;
56   const size_t compression_overhead = (buffer_size / 6) + 32;
57   return buffer_size - compression_overhead;
58 }
59 
write_num(FILE * fp,size_t num)60 static inline int write_num(FILE *fp, size_t num)
61 {
62   unsigned int n = SNZ_TO_BE32((unsigned int)num);
63   if (fwrite(&n, sizeof(n), 1, fp) != 1) {
64     print_error("Failed to write a file: %s\n", strerror(errno));
65     return 0;
66   }
67   return 1;
68 }
69 
hadoop_snappy_format_compress(FILE * infp,FILE * outfp,size_t block_size)70 static int hadoop_snappy_format_compress(FILE *infp, FILE *outfp, size_t block_size)
71 {
72   work_buffer_t wb;
73   size_t uncompressed_data_len;
74   int err = 1;
75 
76   work_buffer_init(&wb, hadoop_snappy_max_input_size(block_size));
77 
78   /* write file body */
79   while ((uncompressed_data_len = fread(wb.uc, 1, wb.uclen, infp)) > 0) {
80     size_t compressed_data_len;
81     /* write length before compression */
82     if (write_num(outfp, uncompressed_data_len) == 0) {
83       goto cleanup;
84     }
85 
86     /* compress the block. */
87     compressed_data_len = wb.clen;
88     snappy_compress(wb.uc, uncompressed_data_len, wb.c, &compressed_data_len);
89 
90     /* write compressed length */
91     if (write_num(outfp, compressed_data_len) == 0) {
92       goto cleanup;
93     }
94     /* write data */
95     if (fwrite(wb.c, compressed_data_len, 1, outfp) != 1) {
96       print_error("Failed to write a file: %s\n", strerror(errno));
97       goto cleanup;
98     }
99   }
100   /* check stream errors */
101   if (ferror(infp)) {
102     print_error("Failed to read a file: %s\n", strerror(errno));
103     goto cleanup;
104   }
105   if (ferror(outfp)) {
106     print_error("Failed to write a file: %s\n", strerror(errno));
107     goto cleanup;
108   }
109   err = 0;
110  cleanup:
111   work_buffer_free(&wb);
112   return err;
113 }
114 
read_data(char * buf,size_t buflen,FILE * fp)115 static int read_data(char *buf, size_t buflen, FILE *fp)
116 {
117   if (fread(buf, buflen, 1, fp) != 1) {
118     if (feof(fp)) {
119       print_error("Unexpected end of file\n");
120     } else {
121       print_error("Failed to read a file: %s\n", strerror(errno));
122     }
123     return -1;
124   }
125   return 0;
126 }
127 
hadoop_snappy_format_uncompress(FILE * infp,FILE * outfp,int skip_magic)128 static int hadoop_snappy_format_uncompress(FILE *infp, FILE *outfp, int skip_magic)
129 {
130   work_buffer_t wb;
131   size_t source_len = 0;
132   size_t compressed_len = 0;
133   int err = 1;
134 
135   work_buffer_init(&wb, hadoop_snappy_max_input_size(0));
136 
137   if (skip_magic) {
138     source_len = hadoop_snappy_source_length;
139     compressed_len = hadoop_snappy_compressed_length;
140     trace("source_len = %ld.\n", (long)source_len);
141     trace("compressed_len = %ld.\n", (long)compressed_len);
142     goto after_reading_compressed_len;
143   }
144 
145   for (;;) {
146     unsigned int n;
147 
148     if (fread(&n, sizeof(n), 1, infp) != 1) {
149       if (feof(infp)) {
150         err = 0;
151       } else {
152         print_error("Failed to read a file: %s\n", strerror(errno));
153       }
154       goto cleanup;
155     }
156     source_len = SNZ_FROM_BE32(n);
157     trace("source_len = %ld.\n", (long)source_len);
158 
159     while (source_len > 0) {
160       size_t uncompressed_len;
161 
162       if (read_data((char*)&n, sizeof(n), infp) != 0) {
163         goto cleanup;
164       }
165       compressed_len = SNZ_FROM_BE32(n);
166       trace("compressed_len = %ld.\n", (long)compressed_len);
167     after_reading_compressed_len:
168       if (compressed_len > wb.clen) {
169         work_buffer_resize(&wb, compressed_len, 0);
170       }
171 
172       /* read the compressed data */
173       if (read_data(wb.c, compressed_len, infp) != 0) {
174         goto cleanup;
175       }
176       trace("read %ld bytes.\n", (long)(compressed_len));
177 
178       /* check the uncompressed length */
179       err = snappy_uncompressed_length(wb.c, compressed_len, &uncompressed_len);
180       if (err != 0) {
181         print_error("Invalid data: GetUncompressedLength failed %d\n", err);
182         goto cleanup;
183       }
184       err = 1;
185       if (uncompressed_len > source_len) {
186         print_error("Invalid data: uncompressed_length > source_len\n");
187         goto cleanup;
188       }
189 
190       if (uncompressed_len > wb.uclen) {
191         work_buffer_resize(&wb, 0, uncompressed_len);
192       }
193 
194       /* uncompress and write */
195       if (snappy_uncompress(wb.c, compressed_len, wb.uc, &uncompressed_len)) {
196         print_error("Invalid data: RawUncompress failed\n");
197         goto cleanup;
198       }
199       if (fwrite(wb.uc, uncompressed_len, 1, outfp) != 1) {
200         print_error("Failed to write a file: %s\n", strerror(errno));
201         goto cleanup;
202       }
203       trace("write %ld bytes\n", (long)uncompressed_len);
204 
205       source_len -= uncompressed_len;
206       trace("uncompressed_len = %ld, source_len -> %ld\n", (long)uncompressed_len, (long)source_len);
207     }
208   }
209   /* check stream errors */
210   if (ferror(infp)) {
211     print_error("Failed to read a file: %s\n", strerror(errno));
212     goto cleanup;
213   }
214   if (ferror(outfp)) {
215     print_error("Failed to write a file: %s\n", strerror(errno));
216     goto cleanup;
217   }
218   err = 0;
219  cleanup:
220   work_buffer_free(&wb);
221   return err;
222 }
223 
224 stream_format_t hadoop_snappy_format = {
225   "hadoop-snappy",
226   "https://code.google.com/p/hadoop-snappy/",
227   "snappy",
228   hadoop_snappy_format_compress,
229   hadoop_snappy_format_uncompress,
230 };
231