1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to you under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  * https://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14  * implied.  See the License for the specific language governing
15  * permissions and limitations under the License.
16  */
17 
18 #include <avro/platform.h>
19 #include <stdlib.h>
20 #include <string.h>
21 
22 #include "avro/allocation.h"
23 #include "avro/basics.h"
24 #include "avro/data.h"
25 #include "avro/io.h"
26 #include "avro/value.h"
27 #include "avro_private.h"
28 #include "encoding.h"
29 
30 
31 /*
32  * Forward declaration; this is basically the same as avro_value_read,
33  * but it doesn't reset dest first.  (Since it will have already been
34  * reset in avro_value_read itself).
35  */
36 
37 static int
38 read_value(avro_reader_t reader, avro_value_t *dest);
39 
40 
41 static int
read_array_value(avro_reader_t reader,avro_value_t * dest)42 read_array_value(avro_reader_t reader, avro_value_t *dest)
43 {
44 	int  rval;
45 	size_t  i;          /* index within the current block */
46 	size_t  index = 0;  /* index within the entire array */
47 	int64_t  block_count;
48 	int64_t  block_size;
49 
50 	check_prefix(rval, avro_binary_encoding.
51 		     read_long(reader, &block_count),
52 		     "Cannot read array block count: ");
53 
54 	while (block_count != 0) {
55 		if (block_count < 0) {
56 			block_count = block_count * -1;
57 			check_prefix(rval, avro_binary_encoding.
58 				     read_long(reader, &block_size),
59 				     "Cannot read array block size: ");
60 		}
61 
62 		for (i = 0; i < (size_t) block_count; i++, index++) {
63 			avro_value_t  child;
64 
65 			check(rval, avro_value_append(dest, &child, NULL));
66 			check(rval, read_value(reader, &child));
67 		}
68 
69 		check_prefix(rval, avro_binary_encoding.
70 			     read_long(reader, &block_count),
71 			     "Cannot read array block count: ");
72 	}
73 
74 	return 0;
75 }
76 
77 
78 static int
read_map_value(avro_reader_t reader,avro_value_t * dest)79 read_map_value(avro_reader_t reader, avro_value_t *dest)
80 {
81 	int  rval;
82 	size_t  i;          /* index within the current block */
83 	size_t  index = 0;  /* index within the entire array */
84 	int64_t  block_count;
85 	int64_t  block_size;
86 
87 	check_prefix(rval, avro_binary_encoding.read_long(reader, &block_count),
88 		     "Cannot read map block count: ");
89 
90 	while (block_count != 0) {
91 		if (block_count < 0) {
92 			block_count = block_count * -1;
93 			check_prefix(rval, avro_binary_encoding.
94 				     read_long(reader, &block_size),
95 				     "Cannot read map block size: ");
96 		}
97 
98 		for (i = 0; i < (size_t) block_count; i++, index++) {
99 			char *key;
100 			int64_t key_size;
101 			avro_value_t  child;
102 
103 			check_prefix(rval, avro_binary_encoding.
104 				     read_string(reader, &key, &key_size),
105 				     "Cannot read map key: ");
106 
107 			rval = avro_value_add(dest, key, &child, NULL, NULL);
108 			if (rval) {
109 				avro_free(key, key_size);
110 				return rval;
111 			}
112 
113 			rval = read_value(reader, &child);
114 			if (rval) {
115 				avro_free(key, key_size);
116 				return rval;
117 			}
118 
119 			avro_free(key, key_size);
120 		}
121 
122 		check_prefix(rval, avro_binary_encoding.
123 			     read_long(reader, &block_count),
124 			     "Cannot read map block count: ");
125 	}
126 
127 	return 0;
128 }
129 
130 
131 static int
read_record_value(avro_reader_t reader,avro_value_t * dest)132 read_record_value(avro_reader_t reader, avro_value_t *dest)
133 {
134 	int  rval;
135 	size_t  field_count;
136 	size_t  i;
137 
138 	avro_schema_t  record_schema = avro_value_get_schema(dest);
139 
140 	check(rval, avro_value_get_size(dest, &field_count));
141 	for (i = 0; i < field_count; i++) {
142 		avro_value_t  field;
143 
144 		check(rval, avro_value_get_by_index(dest, i, &field, NULL));
145 		if (field.iface != NULL) {
146 			check(rval, read_value(reader, &field));
147 		} else {
148 			avro_schema_t  field_schema =
149 			    avro_schema_record_field_get_by_index(record_schema, i);
150 			check(rval, avro_skip_data(reader, field_schema));
151 		}
152 	}
153 
154 	return 0;
155 }
156 
157 
158 static int
read_union_value(avro_reader_t reader,avro_value_t * dest)159 read_union_value(avro_reader_t reader, avro_value_t *dest)
160 {
161 	int rval;
162 	int64_t discriminant;
163 	avro_schema_t  union_schema;
164 	int64_t  branch_count;
165 	avro_value_t  branch;
166 
167 	check_prefix(rval, avro_binary_encoding.
168 		     read_long(reader, &discriminant),
169 		     "Cannot read union discriminant: ");
170 
171 	union_schema = avro_value_get_schema(dest);
172 	branch_count = avro_schema_union_size(union_schema);
173 
174 	if (discriminant < 0 || discriminant >= branch_count) {
175 		avro_set_error("Invalid union discriminant value: (%d)",
176 			       discriminant);
177 		return 1;
178 	}
179 
180 	check(rval, avro_value_set_branch(dest, discriminant, &branch));
181 	check(rval, read_value(reader, &branch));
182 	return 0;
183 }
184 
185 
186 /*
187  * A wrapped buffer implementation that takes control of a buffer
188  * allocated using avro_malloc.
189  */
190 
191 struct avro_wrapped_alloc {
192 	const void  *original;
193 	size_t  allocated_size;
194 };
195 
196 static void
avro_wrapped_alloc_free(avro_wrapped_buffer_t * self)197 avro_wrapped_alloc_free(avro_wrapped_buffer_t *self)
198 {
199 	struct avro_wrapped_alloc  *alloc = (struct avro_wrapped_alloc *) self->user_data;
200 	avro_free((void *) alloc->original, alloc->allocated_size);
201 	avro_freet(struct avro_wrapped_alloc, alloc);
202 }
203 
204 static int
avro_wrapped_alloc_new(avro_wrapped_buffer_t * dest,const void * buf,size_t length)205 avro_wrapped_alloc_new(avro_wrapped_buffer_t *dest,
206 		       const void *buf, size_t length)
207 {
208 	struct avro_wrapped_alloc  *alloc = (struct avro_wrapped_alloc *) avro_new(struct avro_wrapped_alloc);
209 	if (alloc == NULL) {
210 		return ENOMEM;
211 	}
212 
213 	dest->buf = buf;
214 	dest->size = length;
215 	dest->user_data = alloc;
216 	dest->free = avro_wrapped_alloc_free;
217 	dest->copy = NULL;
218 	dest->slice = NULL;
219 
220 	alloc->original = buf;
221 	alloc->allocated_size = length;
222 	return 0;
223 }
224 
225 
226 static int
read_value(avro_reader_t reader,avro_value_t * dest)227 read_value(avro_reader_t reader, avro_value_t *dest)
228 {
229 	int  rval;
230 
231 	switch (avro_value_get_type(dest)) {
232 		case AVRO_BOOLEAN:
233 		{
234 			int8_t  val;
235 			check_prefix(rval, avro_binary_encoding.
236 				     read_boolean(reader, &val),
237 				     "Cannot read boolean value: ");
238 			return avro_value_set_boolean(dest, val);
239 		}
240 
241 		case AVRO_BYTES:
242 		{
243 			char  *bytes;
244 			int64_t  len;
245 			check_prefix(rval, avro_binary_encoding.
246 				     read_bytes(reader, &bytes, &len),
247 				     "Cannot read bytes value: ");
248 
249 			/*
250 			 * read_bytes allocates an extra byte to always
251 			 * ensure that the data is NUL terminated, but
252 			 * that byte isn't included in the length.  We
253 			 * include that extra byte in the allocated
254 			 * size, but not in the length of the buffer.
255 			 */
256 
257 			avro_wrapped_buffer_t  buf;
258 			check(rval, avro_wrapped_alloc_new(&buf, bytes, len+1));
259 			buf.size--;
260 			return avro_value_give_bytes(dest, &buf);
261 		}
262 
263 		case AVRO_DOUBLE:
264 		{
265 			double  val;
266 			check_prefix(rval, avro_binary_encoding.
267 				     read_double(reader, &val),
268 				     "Cannot read double value: ");
269 			return avro_value_set_double(dest, val);
270 		}
271 
272 		case AVRO_FLOAT:
273 		{
274 			float  val;
275 			check_prefix(rval, avro_binary_encoding.
276 				     read_float(reader, &val),
277 				     "Cannot read float value: ");
278 			return avro_value_set_float(dest, val);
279 		}
280 
281 		case AVRO_INT32:
282 		{
283 			int32_t  val;
284 			check_prefix(rval, avro_binary_encoding.
285 				     read_int(reader, &val),
286 				     "Cannot read int value: ");
287 			return avro_value_set_int(dest, val);
288 		}
289 
290 		case AVRO_INT64:
291 		{
292 			int64_t  val;
293 			check_prefix(rval, avro_binary_encoding.
294 				     read_long(reader, &val),
295 				     "Cannot read long value: ");
296 			return avro_value_set_long(dest, val);
297 		}
298 
299 		case AVRO_NULL:
300 		{
301 			check_prefix(rval, avro_binary_encoding.
302 				     read_null(reader),
303 				     "Cannot read null value: ");
304 			return avro_value_set_null(dest);
305 		}
306 
307 		case AVRO_STRING:
308 		{
309 			char  *str;
310 			int64_t  size;
311 
312 			/*
313 			 * read_string returns a size that includes the
314 			 * NUL terminator, and the free function will be
315 			 * called with a size that also includes the NUL
316 			 */
317 
318 			check_prefix(rval, avro_binary_encoding.
319 				     read_string(reader, &str, &size),
320 				     "Cannot read string value: ");
321 
322 			avro_wrapped_buffer_t  buf;
323 			check(rval, avro_wrapped_alloc_new(&buf, str, size));
324 			return avro_value_give_string_len(dest, &buf);
325 		}
326 
327 		case AVRO_ARRAY:
328 			return read_array_value(reader, dest);
329 
330 		case AVRO_ENUM:
331 		{
332 			int64_t  val;
333 			check_prefix(rval, avro_binary_encoding.
334 				     read_long(reader, &val),
335 				     "Cannot read enum value: ");
336 			return avro_value_set_enum(dest, val);
337 		}
338 
339 		case AVRO_FIXED:
340 		{
341 			avro_schema_t  schema = avro_value_get_schema(dest);
342 			char *bytes;
343 			int64_t size = avro_schema_fixed_size(schema);
344 
345 			bytes = (char *) avro_malloc(size);
346 			if (!bytes) {
347 				avro_prefix_error("Cannot allocate new fixed value");
348 				return ENOMEM;
349 			}
350 			rval = avro_read(reader, bytes, size);
351 			if (rval) {
352 				avro_prefix_error("Cannot read fixed value: ");
353 				avro_free(bytes, size);
354 				return rval;
355 			}
356 
357 			avro_wrapped_buffer_t  buf;
358 			rval = avro_wrapped_alloc_new(&buf, bytes, size);
359 			if (rval != 0) {
360 				avro_free(bytes, size);
361 				return rval;
362 			}
363 
364 			return avro_value_give_fixed(dest, &buf);
365 		}
366 
367 		case AVRO_MAP:
368 			return read_map_value(reader, dest);
369 
370 		case AVRO_RECORD:
371 			return read_record_value(reader, dest);
372 
373 		case AVRO_UNION:
374 			return read_union_value(reader, dest);
375 
376 		default:
377 		{
378 			avro_set_error("Unknown schema type");
379 			return EINVAL;
380 		}
381 	}
382 
383 	return 0;
384 }
385 
386 int
avro_value_read(avro_reader_t reader,avro_value_t * dest)387 avro_value_read(avro_reader_t reader, avro_value_t *dest)
388 {
389 	int  rval;
390 	check(rval, avro_value_reset(dest));
391 	return read_value(reader, dest);
392 }
393