1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to you under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 * implied. See the License for the specific language governing
15 * permissions and limitations under the License.
16 */
17
18 #include "avro_private.h"
19 #include "avro/allocation.h"
20 #include "avro/consumer.h"
21 #include "avro/errors.h"
22 #include "avro/resolver.h"
23 #include "avro/value.h"
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <string.h>
27 #include "encoding.h"
28 #include "schema.h"
29 #include "datum.h"
30
31
32 static int
read_enum(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)33 read_enum(avro_reader_t reader, const avro_encoding_t * enc,
34 avro_consumer_t *consumer, void *ud)
35 {
36 int rval;
37 int64_t index;
38
39 check_prefix(rval, enc->read_long(reader, &index),
40 "Cannot read enum value: ");
41 return avro_consumer_call(consumer, enum_value, index, ud);
42 }
43
44 static int
read_array(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)45 read_array(avro_reader_t reader, const avro_encoding_t * enc,
46 avro_consumer_t *consumer, void *ud)
47 {
48 int rval;
49 int64_t i; /* index within the current block */
50 int64_t index = 0; /* index within the entire array */
51 int64_t block_count;
52 int64_t block_size;
53
54 check_prefix(rval, enc->read_long(reader, &block_count),
55 "Cannot read array block count: ");
56 check(rval, avro_consumer_call(consumer, array_start_block,
57 1, block_count, ud));
58
59 while (block_count != 0) {
60 if (block_count < 0) {
61 block_count = block_count * -1;
62 check_prefix(rval, enc->read_long(reader, &block_size),
63 "Cannot read array block size: ");
64 }
65
66 for (i = 0; i < block_count; i++, index++) {
67 avro_consumer_t *element_consumer = NULL;
68 void *element_ud = NULL;
69
70 check(rval,
71 avro_consumer_call(consumer, array_element,
72 index, &element_consumer, &element_ud,
73 ud));
74
75 check(rval, avro_consume_binary(reader, element_consumer, element_ud));
76 }
77
78 check_prefix(rval, enc->read_long(reader, &block_count),
79 "Cannot read array block count: ");
80 check(rval, avro_consumer_call(consumer, array_start_block,
81 0, block_count, ud));
82 }
83
84 return 0;
85 }
86
87 static int
read_map(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)88 read_map(avro_reader_t reader, const avro_encoding_t * enc,
89 avro_consumer_t *consumer, void *ud)
90 {
91 int rval;
92 int64_t i; /* index within the current block */
93 int64_t index = 0; /* index within the entire array */
94 int64_t block_count;
95 int64_t block_size;
96
97 check_prefix(rval, enc->read_long(reader, &block_count),
98 "Cannot read map block count: ");
99 check(rval, avro_consumer_call(consumer, map_start_block,
100 1, block_count, ud));
101
102 while (block_count != 0) {
103 if (block_count < 0) {
104 block_count = block_count * -1;
105 check_prefix(rval, enc->read_long(reader, &block_size),
106 "Cannot read map block size: ");
107 }
108
109 for (i = 0; i < block_count; i++, index++) {
110 char *key;
111 int64_t key_size;
112 avro_consumer_t *element_consumer = NULL;
113 void *element_ud = NULL;
114
115 check_prefix(rval, enc->read_string(reader, &key, &key_size),
116 "Cannot read map key: ");
117
118 rval = avro_consumer_call(consumer, map_element,
119 index, key,
120 &element_consumer, &element_ud,
121 ud);
122 if (rval) {
123 avro_free(key, key_size);
124 return rval;
125 }
126
127 rval = avro_consume_binary(reader, element_consumer, element_ud);
128 if (rval) {
129 avro_free(key, key_size);
130 return rval;
131 }
132
133 avro_free(key, key_size);
134 }
135
136 check_prefix(rval, enc->read_long(reader, &block_count),
137 "Cannot read map block count: ");
138 check(rval, avro_consumer_call(consumer, map_start_block,
139 0, block_count, ud));
140 }
141
142 return 0;
143 }
144
145 static int
read_union(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)146 read_union(avro_reader_t reader, const avro_encoding_t * enc,
147 avro_consumer_t *consumer, void *ud)
148 {
149 int rval;
150 int64_t discriminant;
151 avro_consumer_t *branch_consumer = NULL;
152 void *branch_ud = NULL;
153
154 check_prefix(rval, enc->read_long(reader, &discriminant),
155 "Cannot read union discriminant: ");
156 check(rval, avro_consumer_call(consumer, union_branch,
157 discriminant,
158 &branch_consumer, &branch_ud, ud));
159 return avro_consume_binary(reader, branch_consumer, branch_ud);
160 }
161
162 static int
read_record(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)163 read_record(avro_reader_t reader, const avro_encoding_t * enc,
164 avro_consumer_t *consumer, void *ud)
165 {
166 int rval;
167 size_t num_fields;
168 unsigned int i;
169
170 AVRO_UNUSED(enc);
171
172 check(rval, avro_consumer_call(consumer, record_start, ud));
173
174 num_fields = avro_schema_record_size(consumer->schema);
175 for (i = 0; i < num_fields; i++) {
176 avro_consumer_t *field_consumer = NULL;
177 void *field_ud = NULL;
178
179 check(rval, avro_consumer_call(consumer, record_field,
180 i, &field_consumer, &field_ud,
181 ud));
182
183 if (field_consumer) {
184 check(rval, avro_consume_binary(reader, field_consumer, field_ud));
185 } else {
186 avro_schema_t field_schema =
187 avro_schema_record_field_get_by_index(consumer->schema, i);
188 check(rval, avro_skip_data(reader, field_schema));
189 }
190 }
191
192 return 0;
193 }
194
195 int
avro_consume_binary(avro_reader_t reader,avro_consumer_t * consumer,void * ud)196 avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
197 {
198 int rval;
199 const avro_encoding_t *enc = &avro_binary_encoding;
200
201 check_param(EINVAL, reader, "reader");
202 check_param(EINVAL, consumer, "consumer");
203
204 switch (avro_typeof(consumer->schema)) {
205 case AVRO_NULL:
206 check_prefix(rval, enc->read_null(reader),
207 "Cannot read null value: ");
208 check(rval, avro_consumer_call(consumer, null_value, ud));
209 break;
210
211 case AVRO_BOOLEAN:
212 {
213 int8_t b;
214 check_prefix(rval, enc->read_boolean(reader, &b),
215 "Cannot read boolean value: ");
216 check(rval, avro_consumer_call(consumer, boolean_value, b, ud));
217 }
218 break;
219
220 case AVRO_STRING:
221 {
222 int64_t len;
223 char *s;
224 check_prefix(rval, enc->read_string(reader, &s, &len),
225 "Cannot read string value: ");
226 check(rval, avro_consumer_call(consumer, string_value, s, len, ud));
227 }
228 break;
229
230 case AVRO_INT32:
231 {
232 int32_t i;
233 check_prefix(rval, enc->read_int(reader, &i),
234 "Cannot read int value: ");
235 check(rval, avro_consumer_call(consumer, int_value, i, ud));
236 }
237 break;
238
239 case AVRO_INT64:
240 {
241 int64_t l;
242 check_prefix(rval, enc->read_long(reader, &l),
243 "Cannot read long value: ");
244 check(rval, avro_consumer_call(consumer, long_value, l, ud));
245 }
246 break;
247
248 case AVRO_FLOAT:
249 {
250 float f;
251 check_prefix(rval, enc->read_float(reader, &f),
252 "Cannot read float value: ");
253 check(rval, avro_consumer_call(consumer, float_value, f, ud));
254 }
255 break;
256
257 case AVRO_DOUBLE:
258 {
259 double d;
260 check_prefix(rval, enc->read_double(reader, &d),
261 "Cannot read double value: ");
262 check(rval, avro_consumer_call(consumer, double_value, d, ud));
263 }
264 break;
265
266 case AVRO_BYTES:
267 {
268 char *bytes;
269 int64_t len;
270 check_prefix(rval, enc->read_bytes(reader, &bytes, &len),
271 "Cannot read bytes value: ");
272 check(rval, avro_consumer_call(consumer, bytes_value, bytes, len, ud));
273 }
274 break;
275
276 case AVRO_FIXED:
277 {
278 char *bytes;
279 int64_t size =
280 avro_schema_to_fixed(consumer->schema)->size;
281
282 bytes = (char *) avro_malloc(size);
283 if (!bytes) {
284 avro_prefix_error("Cannot allocate new fixed value");
285 return ENOMEM;
286 }
287 rval = avro_read(reader, bytes, size);
288 if (rval) {
289 avro_prefix_error("Cannot read fixed value: ");
290 avro_free(bytes, size);
291 return rval;
292 }
293
294 rval = avro_consumer_call(consumer, fixed_value, bytes, size, ud);
295 if (rval) {
296 avro_free(bytes, size);
297 return rval;
298 }
299 }
300 break;
301
302 case AVRO_ENUM:
303 check(rval, read_enum(reader, enc, consumer, ud));
304 break;
305
306 case AVRO_ARRAY:
307 check(rval, read_array(reader, enc, consumer, ud));
308 break;
309
310 case AVRO_MAP:
311 check(rval, read_map(reader, enc, consumer, ud));
312 break;
313
314 case AVRO_UNION:
315 check(rval, read_union(reader, enc, consumer, ud));
316 break;
317
318 case AVRO_RECORD:
319 check(rval, read_record(reader, enc, consumer, ud));
320 break;
321
322 case AVRO_LINK:
323 avro_set_error("Consumer can't consume a link schema directly");
324 return EINVAL;
325 }
326
327 return 0;
328 }
329