1 /* -*- indent-tabs-mode: nil -*-
2 *
3 * Copyright 2016 Kubo Takehiro <kubo@jiubao.org>
4 *
5 * Redistribution and use in source and binary forms, with or without modification, are
6 * permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice, this list of
9 * conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright notice, this list
12 * of conditions and the following disclaimer in the documentation and/or other materials
13 * provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS ''AS IS'' AND ANY EXPRESS OR IMPLIED
16 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
17 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> OR
18 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
20 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
21 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
22 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
23 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24 *
25 * The views and conclusions contained in the software and documentation are those of the
26 * authors and should not be interpreted as representing official policies, either expressed
27 * or implied, of the authors.
28 *
29 */
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #include <stdlib.h>
35 #include <string.h>
36 #include <errno.h>
37 #include <snappy-c.h>
38 #include "snzip.h"
39
40 /* same with CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT in hadoop */
41 #define SNAPPY_BUFFER_SIZE_DEFAULT (256 * 1024)
42
43 /* Calculate max_input_size from block_size as in hadoop-snappy.
44 *
45 * In SnappyCodec.createOutputStream(OutputStream out, Compressor compressor)
46 *
47 * int compressionOverhead = (bufferSize / 6) + 32;
48 *
49 * In BlockCompressorStream(OutputStream out, Compressor compressor, int bufferSize, int compressionOverhead)
50 *
51 * MAX_INPUT_SIZE = bufferSize - compressionOverhead;
52 */
hadoop_snappy_max_input_size(size_t block_size)53 size_t hadoop_snappy_max_input_size(size_t block_size)
54 {
55 const size_t buffer_size = block_size ? block_size : SNAPPY_BUFFER_SIZE_DEFAULT;
56 const size_t compression_overhead = (buffer_size / 6) + 32;
57 return buffer_size - compression_overhead;
58 }
59
write_num(FILE * fp,size_t num)60 static inline int write_num(FILE *fp, size_t num)
61 {
62 unsigned int n = SNZ_TO_BE32((unsigned int)num);
63 if (fwrite(&n, sizeof(n), 1, fp) != 1) {
64 print_error("Failed to write a file: %s\n", strerror(errno));
65 return 0;
66 }
67 return 1;
68 }
69
hadoop_snappy_format_compress(FILE * infp,FILE * outfp,size_t block_size)70 static int hadoop_snappy_format_compress(FILE *infp, FILE *outfp, size_t block_size)
71 {
72 work_buffer_t wb;
73 size_t uncompressed_data_len;
74 int err = 1;
75
76 work_buffer_init(&wb, hadoop_snappy_max_input_size(block_size));
77
78 /* write file body */
79 while ((uncompressed_data_len = fread(wb.uc, 1, wb.uclen, infp)) > 0) {
80 size_t compressed_data_len;
81 /* write length before compression */
82 if (write_num(outfp, uncompressed_data_len) == 0) {
83 goto cleanup;
84 }
85
86 /* compress the block. */
87 compressed_data_len = wb.clen;
88 snappy_compress(wb.uc, uncompressed_data_len, wb.c, &compressed_data_len);
89
90 /* write compressed length */
91 if (write_num(outfp, compressed_data_len) == 0) {
92 goto cleanup;
93 }
94 /* write data */
95 if (fwrite(wb.c, compressed_data_len, 1, outfp) != 1) {
96 print_error("Failed to write a file: %s\n", strerror(errno));
97 goto cleanup;
98 }
99 }
100 /* check stream errors */
101 if (ferror(infp)) {
102 print_error("Failed to read a file: %s\n", strerror(errno));
103 goto cleanup;
104 }
105 if (ferror(outfp)) {
106 print_error("Failed to write a file: %s\n", strerror(errno));
107 goto cleanup;
108 }
109 err = 0;
110 cleanup:
111 work_buffer_free(&wb);
112 return err;
113 }
114
read_data(char * buf,size_t buflen,FILE * fp)115 static int read_data(char *buf, size_t buflen, FILE *fp)
116 {
117 if (fread(buf, buflen, 1, fp) != 1) {
118 if (feof(fp)) {
119 print_error("Unexpected end of file\n");
120 } else {
121 print_error("Failed to read a file: %s\n", strerror(errno));
122 }
123 return -1;
124 }
125 return 0;
126 }
127
hadoop_snappy_format_uncompress(FILE * infp,FILE * outfp,int skip_magic)128 static int hadoop_snappy_format_uncompress(FILE *infp, FILE *outfp, int skip_magic)
129 {
130 work_buffer_t wb;
131 size_t source_len = 0;
132 size_t compressed_len = 0;
133 int err = 1;
134
135 work_buffer_init(&wb, hadoop_snappy_max_input_size(0));
136
137 if (skip_magic) {
138 source_len = hadoop_snappy_source_length;
139 compressed_len = hadoop_snappy_compressed_length;
140 trace("source_len = %ld.\n", (long)source_len);
141 trace("compressed_len = %ld.\n", (long)compressed_len);
142 goto after_reading_compressed_len;
143 }
144
145 for (;;) {
146 unsigned int n;
147
148 if (fread(&n, sizeof(n), 1, infp) != 1) {
149 if (feof(infp)) {
150 err = 0;
151 } else {
152 print_error("Failed to read a file: %s\n", strerror(errno));
153 }
154 goto cleanup;
155 }
156 source_len = SNZ_FROM_BE32(n);
157 trace("source_len = %ld.\n", (long)source_len);
158
159 while (source_len > 0) {
160 size_t uncompressed_len;
161
162 if (read_data((char*)&n, sizeof(n), infp) != 0) {
163 goto cleanup;
164 }
165 compressed_len = SNZ_FROM_BE32(n);
166 trace("compressed_len = %ld.\n", (long)compressed_len);
167 after_reading_compressed_len:
168 if (compressed_len > wb.clen) {
169 work_buffer_resize(&wb, compressed_len, 0);
170 }
171
172 /* read the compressed data */
173 if (read_data(wb.c, compressed_len, infp) != 0) {
174 goto cleanup;
175 }
176 trace("read %ld bytes.\n", (long)(compressed_len));
177
178 /* check the uncompressed length */
179 err = snappy_uncompressed_length(wb.c, compressed_len, &uncompressed_len);
180 if (err != 0) {
181 print_error("Invalid data: GetUncompressedLength failed %d\n", err);
182 goto cleanup;
183 }
184 err = 1;
185 if (uncompressed_len > source_len) {
186 print_error("Invalid data: uncompressed_length > source_len\n");
187 goto cleanup;
188 }
189
190 if (uncompressed_len > wb.uclen) {
191 work_buffer_resize(&wb, 0, uncompressed_len);
192 }
193
194 /* uncompress and write */
195 if (snappy_uncompress(wb.c, compressed_len, wb.uc, &uncompressed_len)) {
196 print_error("Invalid data: RawUncompress failed\n");
197 goto cleanup;
198 }
199 if (fwrite(wb.uc, uncompressed_len, 1, outfp) != 1) {
200 print_error("Failed to write a file: %s\n", strerror(errno));
201 goto cleanup;
202 }
203 trace("write %ld bytes\n", (long)uncompressed_len);
204
205 source_len -= uncompressed_len;
206 trace("uncompressed_len = %ld, source_len -> %ld\n", (long)uncompressed_len, (long)source_len);
207 }
208 }
209 /* check stream errors */
210 if (ferror(infp)) {
211 print_error("Failed to read a file: %s\n", strerror(errno));
212 goto cleanup;
213 }
214 if (ferror(outfp)) {
215 print_error("Failed to write a file: %s\n", strerror(errno));
216 goto cleanup;
217 }
218 err = 0;
219 cleanup:
220 work_buffer_free(&wb);
221 return err;
222 }
223
224 stream_format_t hadoop_snappy_format = {
225 "hadoop-snappy",
226 "https://code.google.com/p/hadoop-snappy/",
227 "snappy",
228 hadoop_snappy_format_compress,
229 hadoop_snappy_format_uncompress,
230 };
231