1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to you under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 * implied. See the License for the specific language governing
15 * permissions and limitations under the License.
16 */
17
18 #include <avro/platform.h>
19 #include <stdlib.h>
20 #include <string.h>
21
22 #include "avro/allocation.h"
23 #include "avro/basics.h"
24 #include "avro/data.h"
25 #include "avro/io.h"
26 #include "avro/value.h"
27 #include "avro_private.h"
28 #include "encoding.h"
29
30
31 /*
32 * Forward declaration; this is basically the same as avro_value_read,
33 * but it doesn't reset dest first. (Since it will have already been
34 * reset in avro_value_read itself).
35 */
36
37 static int
38 read_value(avro_reader_t reader, avro_value_t *dest);
39
40
41 static int
read_array_value(avro_reader_t reader,avro_value_t * dest)42 read_array_value(avro_reader_t reader, avro_value_t *dest)
43 {
44 int rval;
45 size_t i; /* index within the current block */
46 size_t index = 0; /* index within the entire array */
47 int64_t block_count;
48 int64_t block_size;
49
50 check_prefix(rval, avro_binary_encoding.
51 read_long(reader, &block_count),
52 "Cannot read array block count: ");
53
54 while (block_count != 0) {
55 if (block_count < 0) {
56 block_count = block_count * -1;
57 check_prefix(rval, avro_binary_encoding.
58 read_long(reader, &block_size),
59 "Cannot read array block size: ");
60 }
61
62 for (i = 0; i < (size_t) block_count; i++, index++) {
63 avro_value_t child;
64
65 check(rval, avro_value_append(dest, &child, NULL));
66 check(rval, read_value(reader, &child));
67 }
68
69 check_prefix(rval, avro_binary_encoding.
70 read_long(reader, &block_count),
71 "Cannot read array block count: ");
72 }
73
74 return 0;
75 }
76
77
78 static int
read_map_value(avro_reader_t reader,avro_value_t * dest)79 read_map_value(avro_reader_t reader, avro_value_t *dest)
80 {
81 int rval;
82 size_t i; /* index within the current block */
83 size_t index = 0; /* index within the entire array */
84 int64_t block_count;
85 int64_t block_size;
86
87 check_prefix(rval, avro_binary_encoding.read_long(reader, &block_count),
88 "Cannot read map block count: ");
89
90 while (block_count != 0) {
91 if (block_count < 0) {
92 block_count = block_count * -1;
93 check_prefix(rval, avro_binary_encoding.
94 read_long(reader, &block_size),
95 "Cannot read map block size: ");
96 }
97
98 for (i = 0; i < (size_t) block_count; i++, index++) {
99 char *key;
100 int64_t key_size;
101 avro_value_t child;
102
103 check_prefix(rval, avro_binary_encoding.
104 read_string(reader, &key, &key_size),
105 "Cannot read map key: ");
106
107 rval = avro_value_add(dest, key, &child, NULL, NULL);
108 if (rval) {
109 avro_free(key, key_size);
110 return rval;
111 }
112
113 rval = read_value(reader, &child);
114 if (rval) {
115 avro_free(key, key_size);
116 return rval;
117 }
118
119 avro_free(key, key_size);
120 }
121
122 check_prefix(rval, avro_binary_encoding.
123 read_long(reader, &block_count),
124 "Cannot read map block count: ");
125 }
126
127 return 0;
128 }
129
130
131 static int
read_record_value(avro_reader_t reader,avro_value_t * dest)132 read_record_value(avro_reader_t reader, avro_value_t *dest)
133 {
134 int rval;
135 size_t field_count;
136 size_t i;
137
138 avro_schema_t record_schema = avro_value_get_schema(dest);
139
140 check(rval, avro_value_get_size(dest, &field_count));
141 for (i = 0; i < field_count; i++) {
142 avro_value_t field;
143
144 check(rval, avro_value_get_by_index(dest, i, &field, NULL));
145 if (field.iface != NULL) {
146 check(rval, read_value(reader, &field));
147 } else {
148 avro_schema_t field_schema =
149 avro_schema_record_field_get_by_index(record_schema, i);
150 check(rval, avro_skip_data(reader, field_schema));
151 }
152 }
153
154 return 0;
155 }
156
157
158 static int
read_union_value(avro_reader_t reader,avro_value_t * dest)159 read_union_value(avro_reader_t reader, avro_value_t *dest)
160 {
161 int rval;
162 int64_t discriminant;
163 avro_schema_t union_schema;
164 int64_t branch_count;
165 avro_value_t branch;
166
167 check_prefix(rval, avro_binary_encoding.
168 read_long(reader, &discriminant),
169 "Cannot read union discriminant: ");
170
171 union_schema = avro_value_get_schema(dest);
172 branch_count = avro_schema_union_size(union_schema);
173
174 if (discriminant < 0 || discriminant >= branch_count) {
175 avro_set_error("Invalid union discriminant value: (%d)",
176 discriminant);
177 return 1;
178 }
179
180 check(rval, avro_value_set_branch(dest, discriminant, &branch));
181 check(rval, read_value(reader, &branch));
182 return 0;
183 }
184
185
186 /*
187 * A wrapped buffer implementation that takes control of a buffer
188 * allocated using avro_malloc.
189 */
190
191 struct avro_wrapped_alloc {
192 const void *original;
193 size_t allocated_size;
194 };
195
196 static void
avro_wrapped_alloc_free(avro_wrapped_buffer_t * self)197 avro_wrapped_alloc_free(avro_wrapped_buffer_t *self)
198 {
199 struct avro_wrapped_alloc *alloc = (struct avro_wrapped_alloc *) self->user_data;
200 avro_free((void *) alloc->original, alloc->allocated_size);
201 avro_freet(struct avro_wrapped_alloc, alloc);
202 }
203
204 static int
avro_wrapped_alloc_new(avro_wrapped_buffer_t * dest,const void * buf,size_t length)205 avro_wrapped_alloc_new(avro_wrapped_buffer_t *dest,
206 const void *buf, size_t length)
207 {
208 struct avro_wrapped_alloc *alloc = (struct avro_wrapped_alloc *) avro_new(struct avro_wrapped_alloc);
209 if (alloc == NULL) {
210 return ENOMEM;
211 }
212
213 dest->buf = buf;
214 dest->size = length;
215 dest->user_data = alloc;
216 dest->free = avro_wrapped_alloc_free;
217 dest->copy = NULL;
218 dest->slice = NULL;
219
220 alloc->original = buf;
221 alloc->allocated_size = length;
222 return 0;
223 }
224
225
226 static int
read_value(avro_reader_t reader,avro_value_t * dest)227 read_value(avro_reader_t reader, avro_value_t *dest)
228 {
229 int rval;
230
231 switch (avro_value_get_type(dest)) {
232 case AVRO_BOOLEAN:
233 {
234 int8_t val;
235 check_prefix(rval, avro_binary_encoding.
236 read_boolean(reader, &val),
237 "Cannot read boolean value: ");
238 return avro_value_set_boolean(dest, val);
239 }
240
241 case AVRO_BYTES:
242 {
243 char *bytes;
244 int64_t len;
245 check_prefix(rval, avro_binary_encoding.
246 read_bytes(reader, &bytes, &len),
247 "Cannot read bytes value: ");
248
249 /*
250 * read_bytes allocates an extra byte to always
251 * ensure that the data is NUL terminated, but
252 * that byte isn't included in the length. We
253 * include that extra byte in the allocated
254 * size, but not in the length of the buffer.
255 */
256
257 avro_wrapped_buffer_t buf;
258 check(rval, avro_wrapped_alloc_new(&buf, bytes, len+1));
259 buf.size--;
260 return avro_value_give_bytes(dest, &buf);
261 }
262
263 case AVRO_DOUBLE:
264 {
265 double val;
266 check_prefix(rval, avro_binary_encoding.
267 read_double(reader, &val),
268 "Cannot read double value: ");
269 return avro_value_set_double(dest, val);
270 }
271
272 case AVRO_FLOAT:
273 {
274 float val;
275 check_prefix(rval, avro_binary_encoding.
276 read_float(reader, &val),
277 "Cannot read float value: ");
278 return avro_value_set_float(dest, val);
279 }
280
281 case AVRO_INT32:
282 {
283 int32_t val;
284 check_prefix(rval, avro_binary_encoding.
285 read_int(reader, &val),
286 "Cannot read int value: ");
287 return avro_value_set_int(dest, val);
288 }
289
290 case AVRO_INT64:
291 {
292 int64_t val;
293 check_prefix(rval, avro_binary_encoding.
294 read_long(reader, &val),
295 "Cannot read long value: ");
296 return avro_value_set_long(dest, val);
297 }
298
299 case AVRO_NULL:
300 {
301 check_prefix(rval, avro_binary_encoding.
302 read_null(reader),
303 "Cannot read null value: ");
304 return avro_value_set_null(dest);
305 }
306
307 case AVRO_STRING:
308 {
309 char *str;
310 int64_t size;
311
312 /*
313 * read_string returns a size that includes the
314 * NUL terminator, and the free function will be
315 * called with a size that also includes the NUL
316 */
317
318 check_prefix(rval, avro_binary_encoding.
319 read_string(reader, &str, &size),
320 "Cannot read string value: ");
321
322 avro_wrapped_buffer_t buf;
323 check(rval, avro_wrapped_alloc_new(&buf, str, size));
324 return avro_value_give_string_len(dest, &buf);
325 }
326
327 case AVRO_ARRAY:
328 return read_array_value(reader, dest);
329
330 case AVRO_ENUM:
331 {
332 int64_t val;
333 check_prefix(rval, avro_binary_encoding.
334 read_long(reader, &val),
335 "Cannot read enum value: ");
336 return avro_value_set_enum(dest, val);
337 }
338
339 case AVRO_FIXED:
340 {
341 avro_schema_t schema = avro_value_get_schema(dest);
342 char *bytes;
343 int64_t size = avro_schema_fixed_size(schema);
344
345 bytes = (char *) avro_malloc(size);
346 if (!bytes) {
347 avro_prefix_error("Cannot allocate new fixed value");
348 return ENOMEM;
349 }
350 rval = avro_read(reader, bytes, size);
351 if (rval) {
352 avro_prefix_error("Cannot read fixed value: ");
353 avro_free(bytes, size);
354 return rval;
355 }
356
357 avro_wrapped_buffer_t buf;
358 rval = avro_wrapped_alloc_new(&buf, bytes, size);
359 if (rval != 0) {
360 avro_free(bytes, size);
361 return rval;
362 }
363
364 return avro_value_give_fixed(dest, &buf);
365 }
366
367 case AVRO_MAP:
368 return read_map_value(reader, dest);
369
370 case AVRO_RECORD:
371 return read_record_value(reader, dest);
372
373 case AVRO_UNION:
374 return read_union_value(reader, dest);
375
376 default:
377 {
378 avro_set_error("Unknown schema type");
379 return EINVAL;
380 }
381 }
382
383 return 0;
384 }
385
386 int
avro_value_read(avro_reader_t reader,avro_value_t * dest)387 avro_value_read(avro_reader_t reader, avro_value_t *dest)
388 {
389 int rval;
390 check(rval, avro_value_reset(dest));
391 return read_value(reader, dest);
392 }
393