1 #include <stdlib.h>
2 #include <zlib.h>
3 
4 #include "../readstat.h"
5 #include "../readstat_bits.h"
6 #include "../readstat_iconv.h"
7 #include "../readstat_malloc.h"
8 #include "readstat_sav.h"
9 #include "readstat_sav_compress.h"
10 
11 struct zheader {
12     uint64_t zheader_ofs;
13     uint64_t ztrailer_ofs;
14     uint64_t ztrailer_len;
15 };
16 
17 struct ztrailer {
18     int64_t bias;
19     int64_t zero;
20     int32_t block_size;
21     int32_t n_blocks;
22 };
23 
24 struct ztrailer_entry {
25     int64_t uncompressed_ofs;
26     int64_t compressed_ofs;
27     int32_t uncompressed_size;
28     int32_t compressed_size;
29 };
30 
zsav_read_compressed_data(sav_ctx_t * ctx,readstat_error_t (* row_handler)(unsigned char *,size_t,sav_ctx_t *))31 readstat_error_t zsav_read_compressed_data(sav_ctx_t *ctx,
32         readstat_error_t (*row_handler)(unsigned char *, size_t, sav_ctx_t *)) {
33     readstat_error_t retval = READSTAT_OK;
34     readstat_io_t *io = ctx->io;
35     readstat_off_t data_offset = 0;
36 
37     size_t uncompressed_row_len = ctx->var_offset * 8;
38     readstat_off_t uncompressed_offset = 0;
39     unsigned char *uncompressed_row = NULL;
40 
41     uLongf uncompressed_block_len = 0;
42     unsigned char *compressed_block = NULL, *uncompressed_block = NULL;
43 
44     struct sav_row_stream_s state = {
45         .missing_value = ctx->missing_double,
46         .bias = ctx->bias,
47         .bswap = ctx->bswap };
48 
49     struct zheader zheader;
50     struct ztrailer ztrailer;
51     struct ztrailer_entry *ztrailer_entries = NULL;
52 
53     int n_blocks = 0;
54     int block_i = 0;
55     int i;
56 
57     if (io->read(&zheader, sizeof(struct zheader), io->io_ctx) < sizeof(struct zheader)) {
58         retval = READSTAT_ERROR_READ;
59         goto cleanup;
60     }
61 
62     zheader.zheader_ofs = ctx->bswap ? byteswap8(zheader.zheader_ofs) : zheader.zheader_ofs;
63     zheader.ztrailer_ofs = ctx->bswap ? byteswap8(zheader.ztrailer_ofs) : zheader.ztrailer_ofs;
64     zheader.ztrailer_len = ctx->bswap ? byteswap8(zheader.ztrailer_len) : zheader.ztrailer_len;
65 
66     if (zheader.zheader_ofs != io->seek(0, READSTAT_SEEK_CUR, io->io_ctx) - sizeof(struct zheader)) {
67         retval = READSTAT_ERROR_PARSE;
68         goto cleanup;
69     }
70 
71     n_blocks = (zheader.ztrailer_len - 24) / 24;
72 
73     if (io->seek(zheader.ztrailer_ofs, READSTAT_SEEK_SET, io->io_ctx) == -1) {
74         retval = READSTAT_ERROR_SEEK;
75         goto cleanup;
76     }
77 
78     if (io->read(&ztrailer, sizeof(struct ztrailer), io->io_ctx) < sizeof(struct ztrailer)) {
79         retval = READSTAT_ERROR_READ;
80         goto cleanup;
81     }
82 
83     ztrailer.bias = ctx->bswap ? byteswap8(ztrailer.bias) : ztrailer.bias;
84     ztrailer.zero = ctx->bswap ? byteswap8(ztrailer.zero) : ztrailer.zero;
85     ztrailer.block_size = ctx->bswap ? byteswap4(ztrailer.block_size) : ztrailer.block_size;
86     ztrailer.n_blocks = ctx->bswap ? byteswap4(ztrailer.n_blocks) : ztrailer.n_blocks;
87 
88     if (n_blocks != ztrailer.n_blocks) {
89         retval = READSTAT_ERROR_PARSE;
90         goto cleanup;
91     }
92 
93     if (n_blocks && (ztrailer_entries = readstat_malloc(n_blocks * sizeof(struct ztrailer_entry))) == NULL) {
94         retval = READSTAT_ERROR_MALLOC;
95         goto cleanup;
96     }
97 
98     if (io->read(ztrailer_entries, n_blocks * sizeof(struct ztrailer_entry), io->io_ctx) <
99             n_blocks * sizeof(struct ztrailer_entry)) {
100         retval = READSTAT_ERROR_READ;
101         goto cleanup;
102     }
103 
104     for (i=0; i<n_blocks; i++) {
105         struct ztrailer_entry *entry = &ztrailer_entries[i];
106 
107         entry->uncompressed_ofs = ctx->bswap ? byteswap8(entry->uncompressed_ofs) : entry->uncompressed_ofs;
108         entry->compressed_ofs = ctx->bswap ? byteswap8(entry->compressed_ofs) : entry->compressed_ofs;
109         entry->uncompressed_size = ctx->bswap ? byteswap4(entry->uncompressed_size) : entry->uncompressed_size;
110         entry->compressed_size = ctx->bswap ? byteswap4(entry->compressed_size) : entry->compressed_size;
111     }
112 
113     if (uncompressed_row_len && (uncompressed_row = readstat_malloc(uncompressed_row_len)) == NULL) {
114         retval = READSTAT_ERROR_MALLOC;
115         goto cleanup;
116     }
117 
118     while (1) {
119         if (block_i == n_blocks)
120             goto cleanup;
121 
122         struct ztrailer_entry *entry = &ztrailer_entries[block_i];
123         if (io->seek(entry->compressed_ofs, READSTAT_SEEK_SET, io->io_ctx) == -1) {
124             retval = READSTAT_ERROR_SEEK;
125             goto cleanup;
126         }
127         if ((compressed_block = readstat_realloc(compressed_block, entry->compressed_size)) == NULL) {
128             retval = READSTAT_ERROR_MALLOC;
129             goto cleanup;
130         }
131         if (io->read(compressed_block, entry->compressed_size, io->io_ctx) != entry->compressed_size) {
132             retval = READSTAT_ERROR_READ;
133             goto cleanup;
134         }
135 
136         uncompressed_block_len = entry->uncompressed_size;
137         if ((uncompressed_block = readstat_realloc(uncompressed_block, uncompressed_block_len)) == NULL) {
138             retval = READSTAT_ERROR_MALLOC;
139             goto cleanup;
140         }
141         int status = uncompress(uncompressed_block, &uncompressed_block_len,
142                 compressed_block, entry->compressed_size);
143         if (status != Z_OK || uncompressed_block_len != entry->uncompressed_size) {
144             retval = READSTAT_ERROR_PARSE;
145             goto cleanup;
146         }
147 
148         block_i++;
149         state.status = SAV_ROW_STREAM_HAVE_DATA;
150         data_offset = 0;
151 
152         while (state.status != SAV_ROW_STREAM_NEED_DATA) {
153             state.next_in = &uncompressed_block[data_offset];
154             state.avail_in = uncompressed_block_len - data_offset;
155 
156             state.next_out = &uncompressed_row[uncompressed_offset];
157             state.avail_out = uncompressed_row_len - uncompressed_offset;
158 
159             sav_decompress_row(&state);
160 
161             uncompressed_offset = uncompressed_row_len - state.avail_out;
162             data_offset = uncompressed_block_len - state.avail_in;
163 
164             if (state.status == SAV_ROW_STREAM_FINISHED_ROW) {
165                 retval = row_handler(uncompressed_row, uncompressed_row_len, ctx);
166                 if (retval != READSTAT_OK)
167                     goto cleanup;
168 
169                 uncompressed_offset = 0;
170             }
171 
172             if (state.status == SAV_ROW_STREAM_FINISHED_ALL)
173                 goto cleanup;
174             if (ctx->row_limit > 0 && ctx->current_row == ctx->row_limit)
175                 goto cleanup;
176         }
177     }
178 
179 cleanup:
180     if (uncompressed_row)
181         free(uncompressed_row);
182     if (ztrailer_entries)
183         free(ztrailer_entries);
184     if (compressed_block)
185         free(compressed_block);
186     if (uncompressed_block)
187         free(uncompressed_block);
188 
189     return retval;
190 }
191