1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to you under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  * https://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14  * implied.  See the License for the specific language governing
15  * permissions and limitations under the License.
16  */
17 
18 #include "avro_private.h"
19 #include "avro/allocation.h"
20 #include "avro/consumer.h"
21 #include "avro/errors.h"
22 #include "avro/resolver.h"
23 #include "avro/value.h"
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <string.h>
27 #include "encoding.h"
28 #include "schema.h"
29 #include "datum.h"
30 
31 
32 static int
read_enum(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)33 read_enum(avro_reader_t reader, const avro_encoding_t * enc,
34 	  avro_consumer_t *consumer, void *ud)
35 {
36 	int rval;
37 	int64_t index;
38 
39 	check_prefix(rval, enc->read_long(reader, &index),
40 		     "Cannot read enum value: ");
41 	return avro_consumer_call(consumer, enum_value, index, ud);
42 }
43 
44 static int
read_array(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)45 read_array(avro_reader_t reader, const avro_encoding_t * enc,
46 	   avro_consumer_t *consumer, void *ud)
47 {
48 	int rval;
49 	int64_t i;          /* index within the current block */
50 	int64_t index = 0;  /* index within the entire array */
51 	int64_t block_count;
52 	int64_t block_size;
53 
54 	check_prefix(rval, enc->read_long(reader, &block_count),
55 		     "Cannot read array block count: ");
56 	check(rval, avro_consumer_call(consumer, array_start_block,
57 				       1, block_count, ud));
58 
59 	while (block_count != 0) {
60 		if (block_count < 0) {
61 			block_count = block_count * -1;
62 			check_prefix(rval, enc->read_long(reader, &block_size),
63 				     "Cannot read array block size: ");
64 		}
65 
66 		for (i = 0; i < block_count; i++, index++) {
67 			avro_consumer_t  *element_consumer = NULL;
68 			void  *element_ud = NULL;
69 
70 			check(rval,
71 			      avro_consumer_call(consumer, array_element,
72 					         index, &element_consumer, &element_ud,
73 						 ud));
74 
75 			check(rval, avro_consume_binary(reader, element_consumer, element_ud));
76 		}
77 
78 		check_prefix(rval, enc->read_long(reader, &block_count),
79 			     "Cannot read array block count: ");
80 		check(rval, avro_consumer_call(consumer, array_start_block,
81 					       0, block_count, ud));
82 	}
83 
84 	return 0;
85 }
86 
87 static int
read_map(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)88 read_map(avro_reader_t reader, const avro_encoding_t * enc,
89 	 avro_consumer_t *consumer, void *ud)
90 {
91 	int rval;
92 	int64_t i;          /* index within the current block */
93 	int64_t index = 0;  /* index within the entire array */
94 	int64_t block_count;
95 	int64_t block_size;
96 
97 	check_prefix(rval, enc->read_long(reader, &block_count),
98 		     "Cannot read map block count: ");
99 	check(rval, avro_consumer_call(consumer, map_start_block,
100 				       1, block_count, ud));
101 
102 	while (block_count != 0) {
103 		if (block_count < 0) {
104 			block_count = block_count * -1;
105 			check_prefix(rval, enc->read_long(reader, &block_size),
106 				     "Cannot read map block size: ");
107 		}
108 
109 		for (i = 0; i < block_count; i++, index++) {
110 			char *key;
111 			int64_t key_size;
112 			avro_consumer_t  *element_consumer = NULL;
113 			void  *element_ud = NULL;
114 
115 			check_prefix(rval, enc->read_string(reader, &key, &key_size),
116 				     "Cannot read map key: ");
117 
118 			rval = avro_consumer_call(consumer, map_element,
119 						  index, key,
120 						  &element_consumer, &element_ud,
121 						  ud);
122 			if (rval) {
123 				avro_free(key, key_size);
124 				return rval;
125 			}
126 
127 			rval = avro_consume_binary(reader, element_consumer, element_ud);
128 			if (rval) {
129 				avro_free(key, key_size);
130 				return rval;
131 			}
132 
133 			avro_free(key, key_size);
134 		}
135 
136 		check_prefix(rval, enc->read_long(reader, &block_count),
137 			     "Cannot read map block count: ");
138 		check(rval, avro_consumer_call(consumer, map_start_block,
139 					       0, block_count, ud));
140 	}
141 
142 	return 0;
143 }
144 
145 static int
read_union(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)146 read_union(avro_reader_t reader, const avro_encoding_t * enc,
147 	   avro_consumer_t *consumer, void *ud)
148 {
149 	int rval;
150 	int64_t discriminant;
151 	avro_consumer_t  *branch_consumer = NULL;
152 	void  *branch_ud = NULL;
153 
154 	check_prefix(rval, enc->read_long(reader, &discriminant),
155 		     "Cannot read union discriminant: ");
156 	check(rval, avro_consumer_call(consumer, union_branch,
157 				       discriminant,
158 				       &branch_consumer, &branch_ud, ud));
159 	return avro_consume_binary(reader, branch_consumer, branch_ud);
160 }
161 
162 static int
read_record(avro_reader_t reader,const avro_encoding_t * enc,avro_consumer_t * consumer,void * ud)163 read_record(avro_reader_t reader, const avro_encoding_t * enc,
164 	    avro_consumer_t *consumer, void *ud)
165 {
166 	int rval;
167 	size_t  num_fields;
168 	unsigned int  i;
169 
170 	AVRO_UNUSED(enc);
171 
172 	check(rval, avro_consumer_call(consumer, record_start, ud));
173 
174 	num_fields = avro_schema_record_size(consumer->schema);
175 	for (i = 0; i < num_fields; i++) {
176 		avro_consumer_t  *field_consumer = NULL;
177 		void  *field_ud = NULL;
178 
179 		check(rval, avro_consumer_call(consumer, record_field,
180 					       i, &field_consumer, &field_ud,
181 					       ud));
182 
183 		if (field_consumer) {
184 			check(rval, avro_consume_binary(reader, field_consumer, field_ud));
185 		} else {
186 			avro_schema_t  field_schema =
187 			    avro_schema_record_field_get_by_index(consumer->schema, i);
188 			check(rval, avro_skip_data(reader, field_schema));
189 		}
190 	}
191 
192 	return 0;
193 }
194 
195 int
avro_consume_binary(avro_reader_t reader,avro_consumer_t * consumer,void * ud)196 avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
197 {
198 	int rval;
199 	const avro_encoding_t *enc = &avro_binary_encoding;
200 
201 	check_param(EINVAL, reader, "reader");
202 	check_param(EINVAL, consumer, "consumer");
203 
204 	switch (avro_typeof(consumer->schema)) {
205 	case AVRO_NULL:
206 		check_prefix(rval, enc->read_null(reader),
207 			     "Cannot read null value: ");
208 		check(rval, avro_consumer_call(consumer, null_value, ud));
209 		break;
210 
211 	case AVRO_BOOLEAN:
212 		{
213 			int8_t b;
214 			check_prefix(rval, enc->read_boolean(reader, &b),
215 				     "Cannot read boolean value: ");
216 			check(rval, avro_consumer_call(consumer, boolean_value, b, ud));
217 		}
218 		break;
219 
220 	case AVRO_STRING:
221 		{
222 			int64_t len;
223 			char *s;
224 			check_prefix(rval, enc->read_string(reader, &s, &len),
225 				     "Cannot read string value: ");
226 			check(rval, avro_consumer_call(consumer, string_value, s, len, ud));
227 		}
228 		break;
229 
230 	case AVRO_INT32:
231 		{
232 			int32_t i;
233 			check_prefix(rval, enc->read_int(reader, &i),
234 				    "Cannot read int value: ");
235 			check(rval, avro_consumer_call(consumer, int_value, i, ud));
236 		}
237 		break;
238 
239 	case AVRO_INT64:
240 		{
241 			int64_t l;
242 			check_prefix(rval, enc->read_long(reader, &l),
243 				     "Cannot read long value: ");
244 			check(rval, avro_consumer_call(consumer, long_value, l, ud));
245 		}
246 		break;
247 
248 	case AVRO_FLOAT:
249 		{
250 			float f;
251 			check_prefix(rval, enc->read_float(reader, &f),
252 				     "Cannot read float value: ");
253 			check(rval, avro_consumer_call(consumer, float_value, f, ud));
254 		}
255 		break;
256 
257 	case AVRO_DOUBLE:
258 		{
259 			double d;
260 			check_prefix(rval, enc->read_double(reader, &d),
261 				     "Cannot read double value: ");
262 			check(rval, avro_consumer_call(consumer, double_value, d, ud));
263 		}
264 		break;
265 
266 	case AVRO_BYTES:
267 		{
268 			char *bytes;
269 			int64_t len;
270 			check_prefix(rval, enc->read_bytes(reader, &bytes, &len),
271 				     "Cannot read bytes value: ");
272 			check(rval, avro_consumer_call(consumer, bytes_value, bytes, len, ud));
273 		}
274 		break;
275 
276 	case AVRO_FIXED:
277 		{
278 			char *bytes;
279 			int64_t size =
280 			    avro_schema_to_fixed(consumer->schema)->size;
281 
282 			bytes = (char *) avro_malloc(size);
283 			if (!bytes) {
284 				avro_prefix_error("Cannot allocate new fixed value");
285 				return ENOMEM;
286 			}
287 			rval = avro_read(reader, bytes, size);
288 			if (rval) {
289 				avro_prefix_error("Cannot read fixed value: ");
290 				avro_free(bytes, size);
291 				return rval;
292 			}
293 
294 			rval = avro_consumer_call(consumer, fixed_value, bytes, size, ud);
295 			if (rval) {
296 				avro_free(bytes, size);
297 				return rval;
298 			}
299 		}
300 		break;
301 
302 	case AVRO_ENUM:
303 		check(rval, read_enum(reader, enc, consumer, ud));
304 		break;
305 
306 	case AVRO_ARRAY:
307 		check(rval, read_array(reader, enc, consumer, ud));
308 		break;
309 
310 	case AVRO_MAP:
311 		check(rval, read_map(reader, enc, consumer, ud));
312 		break;
313 
314 	case AVRO_UNION:
315 		check(rval, read_union(reader, enc, consumer, ud));
316 		break;
317 
318 	case AVRO_RECORD:
319 		check(rval, read_record(reader, enc, consumer, ud));
320 		break;
321 
322 	case AVRO_LINK:
323 		avro_set_error("Consumer can't consume a link schema directly");
324 		return EINVAL;
325 	}
326 
327 	return 0;
328 }
329