1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to you under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  * https://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14  * implied.  See the License for the specific language governing
15  * permissions and limitations under the License.
16  */
17 
18 #include "avro/allocation.h"
19 #include "avro/refcount.h"
20 #include "avro/errors.h"
21 #include "avro/io.h"
22 #include "avro/legacy.h"
23 #include "avro/schema.h"
24 #include "avro_private.h"
25 #include <avro/platform.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <errno.h>
29 #include <ctype.h>
30 
31 #include "jansson.h"
32 #include "st.h"
33 #include "schema.h"
34 
35 #define DEFAULT_TABLE_SIZE 32
36 
37 /* forward declaration */
38 static int
39 avro_schema_to_json2(const avro_schema_t schema, avro_writer_t out,
40 		     const char *parent_namespace);
41 
avro_schema_init(avro_schema_t schema,avro_type_t type)42 static void avro_schema_init(avro_schema_t schema, avro_type_t type)
43 {
44 	schema->type = type;
45 	schema->class_type = AVRO_SCHEMA;
46 	avro_refcount_set(&schema->refcount, 1);
47 }
48 
is_avro_id(const char * name)49 static int is_avro_id(const char *name)
50 {
51 	size_t i, len;
52 	if (name) {
53 		len = strlen(name);
54 		if (len < 1) {
55 			return 0;
56 		}
57 		for (i = 0; i < len; i++) {
58 			if (!(isalpha(name[i])
59 			      || name[i] == '_' || (i && isdigit(name[i])))) {
60 				return 0;
61 			}
62 		}
63 		/*
64 		 * starts with [A-Za-z_] subsequent [A-Za-z0-9_]
65 		 */
66 		return 1;
67 	}
68 	return 0;
69 }
70 
71 /* Splits a qualified name by the last period, e.g. fullname "foo.bar.Baz" into
72  * name "Baz" and namespace "foo.bar". Sets name_out to the name part (pointing
73  * to a later position in the buffer that was passed in), and returns the
74  * namespace (as a newly allocated buffer using Avro's allocator). */
split_namespace_name(const char * fullname,const char ** name_out)75 static char *split_namespace_name(const char *fullname, const char **name_out)
76 {
77 	char *last_dot = strrchr(fullname, '.');
78 	if (last_dot == NULL) {
79 		*name_out = fullname;
80 		return NULL;
81 	} else {
82 		*name_out = last_dot + 1;
83 		return avro_strndup(fullname, last_dot - fullname);
84 	}
85 }
86 
record_free_foreach(int i,struct avro_record_field_t * field,void * arg)87 static int record_free_foreach(int i, struct avro_record_field_t *field,
88 			       void *arg)
89 {
90 	AVRO_UNUSED(i);
91 	AVRO_UNUSED(arg);
92 
93 	avro_str_free(field->name);
94 	avro_schema_decref(field->type);
95 	avro_freet(struct avro_record_field_t, field);
96 	return ST_DELETE;
97 }
98 
enum_free_foreach(int i,char * sym,void * arg)99 static int enum_free_foreach(int i, char *sym, void *arg)
100 {
101 	AVRO_UNUSED(i);
102 	AVRO_UNUSED(arg);
103 
104 	avro_str_free(sym);
105 	return ST_DELETE;
106 }
107 
union_free_foreach(int i,avro_schema_t schema,void * arg)108 static int union_free_foreach(int i, avro_schema_t schema, void *arg)
109 {
110 	AVRO_UNUSED(i);
111 	AVRO_UNUSED(arg);
112 
113 	avro_schema_decref(schema);
114 	return ST_DELETE;
115 }
116 
avro_schema_free(avro_schema_t schema)117 static void avro_schema_free(avro_schema_t schema)
118 {
119 	if (is_avro_schema(schema)) {
120 		switch (avro_typeof(schema)) {
121 		case AVRO_STRING:
122 		case AVRO_BYTES:
123 		case AVRO_INT32:
124 		case AVRO_INT64:
125 		case AVRO_FLOAT:
126 		case AVRO_DOUBLE:
127 		case AVRO_BOOLEAN:
128 		case AVRO_NULL:
129 			/* no memory allocated for primitives */
130 			return;
131 
132 		case AVRO_RECORD:{
133 				struct avro_record_schema_t *record;
134 				record = avro_schema_to_record(schema);
135 				avro_str_free(record->name);
136 				if (record->space) {
137 					avro_str_free(record->space);
138 				}
139 				st_foreach(record->fields, HASH_FUNCTION_CAST record_free_foreach,
140 					   0);
141 				st_free_table(record->fields_byname);
142 				st_free_table(record->fields);
143 				avro_freet(struct avro_record_schema_t, record);
144 			}
145 			break;
146 
147 		case AVRO_ENUM:{
148 				struct avro_enum_schema_t *enump;
149 				enump = avro_schema_to_enum(schema);
150 				avro_str_free(enump->name);
151 				if (enump->space) {
152 					avro_str_free(enump->space);
153 				}
154 				st_foreach(enump->symbols, HASH_FUNCTION_CAST enum_free_foreach,
155 					   0);
156 				st_free_table(enump->symbols);
157 				st_free_table(enump->symbols_byname);
158 				avro_freet(struct avro_enum_schema_t, enump);
159 			}
160 			break;
161 
162 		case AVRO_FIXED:{
163 				struct avro_fixed_schema_t *fixed;
164 				fixed = avro_schema_to_fixed(schema);
165 				avro_str_free((char *) fixed->name);
166 				if (fixed->space) {
167 					avro_str_free((char *) fixed->space);
168 				}
169 				avro_freet(struct avro_fixed_schema_t, fixed);
170 			}
171 			break;
172 
173 		case AVRO_MAP:{
174 				struct avro_map_schema_t *map;
175 				map = avro_schema_to_map(schema);
176 				avro_schema_decref(map->values);
177 				avro_freet(struct avro_map_schema_t, map);
178 			}
179 			break;
180 
181 		case AVRO_ARRAY:{
182 				struct avro_array_schema_t *array;
183 				array = avro_schema_to_array(schema);
184 				avro_schema_decref(array->items);
185 				avro_freet(struct avro_array_schema_t, array);
186 			}
187 			break;
188 		case AVRO_UNION:{
189 				struct avro_union_schema_t *unionp;
190 				unionp = avro_schema_to_union(schema);
191 				st_foreach(unionp->branches, HASH_FUNCTION_CAST union_free_foreach,
192 					   0);
193 				st_free_table(unionp->branches);
194 				st_free_table(unionp->branches_byname);
195 				avro_freet(struct avro_union_schema_t, unionp);
196 			}
197 			break;
198 
199 		case AVRO_LINK:{
200 				struct avro_link_schema_t *link;
201 				link = avro_schema_to_link(schema);
202 				/* Since we didn't increment the
203 				 * reference count of the target
204 				 * schema when we created the link, we
205 				 * should not decrement the reference
206 				 * count of the target schema when we
207 				 * free the link.
208 				 */
209 				avro_freet(struct avro_link_schema_t, link);
210 			}
211 			break;
212 		}
213 	}
214 }
215 
avro_schema_incref(avro_schema_t schema)216 avro_schema_t avro_schema_incref(avro_schema_t schema)
217 {
218 	if (schema) {
219 		avro_refcount_inc(&schema->refcount);
220 	}
221 	return schema;
222 }
223 
224 int
avro_schema_decref(avro_schema_t schema)225 avro_schema_decref(avro_schema_t schema)
226 {
227 	if (schema && avro_refcount_dec(&schema->refcount)) {
228 		avro_schema_free(schema);
229 		return 0;
230 	}
231 	return 1;
232 }
233 
avro_schema_string(void)234 avro_schema_t avro_schema_string(void)
235 {
236 	static struct avro_obj_t obj = {
237 		AVRO_STRING,
238 		AVRO_SCHEMA,
239 		1
240 	};
241 	return avro_schema_incref(&obj);
242 }
243 
avro_schema_bytes(void)244 avro_schema_t avro_schema_bytes(void)
245 {
246 	static struct avro_obj_t obj = {
247 		AVRO_BYTES,
248 		AVRO_SCHEMA,
249 		1
250 	};
251 	return avro_schema_incref(&obj);
252 }
253 
avro_schema_int(void)254 avro_schema_t avro_schema_int(void)
255 {
256 	static struct avro_obj_t obj = {
257 		AVRO_INT32,
258 		AVRO_SCHEMA,
259 		1
260 	};
261 	return avro_schema_incref(&obj);
262 }
263 
avro_schema_long(void)264 avro_schema_t avro_schema_long(void)
265 {
266 	static struct avro_obj_t obj = {
267 		AVRO_INT64,
268 		AVRO_SCHEMA,
269 		1
270 	};
271 	return avro_schema_incref(&obj);
272 }
273 
avro_schema_float(void)274 avro_schema_t avro_schema_float(void)
275 {
276 	static struct avro_obj_t obj = {
277 		AVRO_FLOAT,
278 		AVRO_SCHEMA,
279 		1
280 	};
281 	return avro_schema_incref(&obj);
282 }
283 
avro_schema_double(void)284 avro_schema_t avro_schema_double(void)
285 {
286 	static struct avro_obj_t obj = {
287 		AVRO_DOUBLE,
288 		AVRO_SCHEMA,
289 		1
290 	};
291 	return avro_schema_incref(&obj);
292 }
293 
avro_schema_boolean(void)294 avro_schema_t avro_schema_boolean(void)
295 {
296 	static struct avro_obj_t obj = {
297 		AVRO_BOOLEAN,
298 		AVRO_SCHEMA,
299 		1
300 	};
301 	return avro_schema_incref(&obj);
302 }
303 
avro_schema_null(void)304 avro_schema_t avro_schema_null(void)
305 {
306 	static struct avro_obj_t obj = {
307 		AVRO_NULL,
308 		AVRO_SCHEMA,
309 		1
310 	};
311 	return avro_schema_incref(&obj);
312 }
313 
avro_schema_fixed(const char * name,const int64_t size)314 avro_schema_t avro_schema_fixed(const char *name, const int64_t size)
315 {
316 	return avro_schema_fixed_ns(name, NULL, size);
317 }
318 
avro_schema_fixed_ns(const char * name,const char * space,const int64_t size)319 avro_schema_t avro_schema_fixed_ns(const char *name, const char *space,
320 		const int64_t size)
321 {
322 	if (!is_avro_id(name)) {
323 		avro_set_error("Invalid Avro identifier");
324 		return NULL;
325 	}
326 
327 	struct avro_fixed_schema_t *fixed =
328 	    (struct avro_fixed_schema_t *) avro_new(struct avro_fixed_schema_t);
329 	if (!fixed) {
330 		avro_set_error("Cannot allocate new fixed schema");
331 		return NULL;
332 	}
333 	fixed->name = avro_strdup(name);
334 	if (!fixed->name) {
335 		avro_set_error("Cannot allocate new fixed schema");
336 		avro_freet(struct avro_fixed_schema_t, fixed);
337 		return NULL;
338 	}
339 	fixed->space = space ? avro_strdup(space) : NULL;
340 	if (space && !fixed->space) {
341 		avro_set_error("Cannot allocate new fixed schema");
342 		avro_str_free((char *) fixed->name);
343 		avro_freet(struct avro_fixed_schema_t, fixed);
344 		return NULL;
345 	}
346 	fixed->size = size;
347 	avro_schema_init(&fixed->obj, AVRO_FIXED);
348 	return &fixed->obj;
349 }
350 
avro_schema_fixed_size(const avro_schema_t fixed)351 int64_t avro_schema_fixed_size(const avro_schema_t fixed)
352 {
353 	return avro_schema_to_fixed(fixed)->size;
354 }
355 
avro_schema_union(void)356 avro_schema_t avro_schema_union(void)
357 {
358 	struct avro_union_schema_t *schema =
359 	    (struct avro_union_schema_t *) avro_new(struct avro_union_schema_t);
360 	if (!schema) {
361 		avro_set_error("Cannot allocate new union schema");
362 		return NULL;
363 	}
364 	schema->branches = st_init_numtable_with_size(DEFAULT_TABLE_SIZE);
365 	if (!schema->branches) {
366 		avro_set_error("Cannot allocate new union schema");
367 		avro_freet(struct avro_union_schema_t, schema);
368 		return NULL;
369 	}
370 	schema->branches_byname =
371 	    st_init_strtable_with_size(DEFAULT_TABLE_SIZE);
372 	if (!schema->branches_byname) {
373 		avro_set_error("Cannot allocate new union schema");
374 		st_free_table(schema->branches);
375 		avro_freet(struct avro_union_schema_t, schema);
376 		return NULL;
377 	}
378 
379 	avro_schema_init(&schema->obj, AVRO_UNION);
380 	return &schema->obj;
381 }
382 
383 int
avro_schema_union_append(const avro_schema_t union_schema,const avro_schema_t schema)384 avro_schema_union_append(const avro_schema_t union_schema,
385 			 const avro_schema_t schema)
386 {
387 	check_param(EINVAL, is_avro_schema(union_schema), "union schema");
388 	check_param(EINVAL, is_avro_union(union_schema), "union schema");
389 	check_param(EINVAL, is_avro_schema(schema), "schema");
390 
391 	struct avro_union_schema_t *unionp = avro_schema_to_union(union_schema);
392 	int  new_index = unionp->branches->num_entries;
393 	st_insert(unionp->branches, new_index, (st_data_t) schema);
394 	const char *name = avro_schema_type_name(schema);
395 	st_insert(unionp->branches_byname, (st_data_t) name,
396 		  (st_data_t) new_index);
397 	avro_schema_incref(schema);
398 	return 0;
399 }
400 
avro_schema_union_size(const avro_schema_t union_schema)401 size_t avro_schema_union_size(const avro_schema_t union_schema)
402 {
403 	check_param(EINVAL, is_avro_schema(union_schema), "union schema");
404 	check_param(EINVAL, is_avro_union(union_schema), "union schema");
405 	struct avro_union_schema_t *unionp = avro_schema_to_union(union_schema);
406 	return unionp->branches->num_entries;
407 }
408 
avro_schema_union_branch(avro_schema_t unionp,int branch_index)409 avro_schema_t avro_schema_union_branch(avro_schema_t unionp,
410 				       int branch_index)
411 {
412 	union {
413 		st_data_t data;
414 		avro_schema_t schema;
415 	} val;
416 	if (st_lookup(avro_schema_to_union(unionp)->branches,
417 		      branch_index, &val.data)) {
418 		return val.schema;
419 	} else {
420 		avro_set_error("No union branch for discriminant %d",
421 			       branch_index);
422 		return NULL;
423 	}
424 }
425 
avro_schema_union_branch_by_name(avro_schema_t unionp,int * branch_index,const char * name)426 avro_schema_t avro_schema_union_branch_by_name
427 (avro_schema_t unionp, int *branch_index, const char *name)
428 {
429 	union {
430 		st_data_t data;
431 		int  branch_index;
432 	} val;
433 
434 	if (!st_lookup(avro_schema_to_union(unionp)->branches_byname,
435 		       (st_data_t) name, &val.data)) {
436 		avro_set_error("No union branch named %s", name);
437 		return NULL;
438 	}
439 
440 	if (branch_index != NULL) {
441 		*branch_index = val.branch_index;
442 	}
443 	return avro_schema_union_branch(unionp, val.branch_index);
444 }
445 
avro_schema_array(const avro_schema_t items)446 avro_schema_t avro_schema_array(const avro_schema_t items)
447 {
448 	struct avro_array_schema_t *array =
449 	    (struct avro_array_schema_t *) avro_new(struct avro_array_schema_t);
450 	if (!array) {
451 		avro_set_error("Cannot allocate new array schema");
452 		return NULL;
453 	}
454 	array->items = avro_schema_incref(items);
455 	avro_schema_init(&array->obj, AVRO_ARRAY);
456 	return &array->obj;
457 }
458 
avro_schema_array_items(avro_schema_t array)459 avro_schema_t avro_schema_array_items(avro_schema_t array)
460 {
461 	return avro_schema_to_array(array)->items;
462 }
463 
avro_schema_map(const avro_schema_t values)464 avro_schema_t avro_schema_map(const avro_schema_t values)
465 {
466 	struct avro_map_schema_t *map =
467 	    (struct avro_map_schema_t *) avro_new(struct avro_map_schema_t);
468 	if (!map) {
469 		avro_set_error("Cannot allocate new map schema");
470 		return NULL;
471 	}
472 	map->values = avro_schema_incref(values);
473 	avro_schema_init(&map->obj, AVRO_MAP);
474 	return &map->obj;
475 }
476 
avro_schema_map_values(avro_schema_t map)477 avro_schema_t avro_schema_map_values(avro_schema_t map)
478 {
479 	return avro_schema_to_map(map)->values;
480 }
481 
avro_schema_enum(const char * name)482 avro_schema_t avro_schema_enum(const char *name)
483 {
484 	return avro_schema_enum_ns(name, NULL);
485 }
486 
avro_schema_enum_ns(const char * name,const char * space)487 avro_schema_t avro_schema_enum_ns(const char *name, const char *space)
488 {
489 	if (!is_avro_id(name)) {
490 		avro_set_error("Invalid Avro identifier");
491 		return NULL;
492 	}
493 
494 	struct avro_enum_schema_t *enump = (struct avro_enum_schema_t *) avro_new(struct avro_enum_schema_t);
495 	if (!enump) {
496 		avro_set_error("Cannot allocate new enum schema");
497 		return NULL;
498 	}
499 	enump->name = avro_strdup(name);
500 	if (!enump->name) {
501 		avro_set_error("Cannot allocate new enum schema");
502 		avro_freet(struct avro_enum_schema_t, enump);
503 		return NULL;
504 	}
505 	enump->space = space ? avro_strdup(space) : NULL;
506 	if (space && !enump->space) {
507 		avro_set_error("Cannot allocate new enum schema");
508 		avro_str_free(enump->name);
509 		avro_freet(struct avro_enum_schema_t, enump);
510 		return NULL;
511 	}
512 	enump->symbols = st_init_numtable_with_size(DEFAULT_TABLE_SIZE);
513 	if (!enump->symbols) {
514 		avro_set_error("Cannot allocate new enum schema");
515 		if (enump->space) avro_str_free(enump->space);
516 		avro_str_free(enump->name);
517 		avro_freet(struct avro_enum_schema_t, enump);
518 		return NULL;
519 	}
520 	enump->symbols_byname = st_init_strtable_with_size(DEFAULT_TABLE_SIZE);
521 	if (!enump->symbols_byname) {
522 		avro_set_error("Cannot allocate new enum schema");
523 		st_free_table(enump->symbols);
524 		if (enump->space) avro_str_free(enump->space);
525 		avro_str_free(enump->name);
526 		avro_freet(struct avro_enum_schema_t, enump);
527 		return NULL;
528 	}
529 	avro_schema_init(&enump->obj, AVRO_ENUM);
530 	return &enump->obj;
531 }
532 
avro_schema_enum_get(const avro_schema_t enump,int index)533 const char *avro_schema_enum_get(const avro_schema_t enump,
534 				 int index)
535 {
536 	union {
537 		st_data_t data;
538 		char *sym;
539 	} val;
540 	st_lookup(avro_schema_to_enum(enump)->symbols, index, &val.data);
541 	return val.sym;
542 }
543 
avro_schema_enum_get_by_name(const avro_schema_t enump,const char * symbol_name)544 int avro_schema_enum_get_by_name(const avro_schema_t enump,
545 				 const char *symbol_name)
546 {
547 	union {
548 		st_data_t data;
549 		long idx;
550 	} val;
551 
552 	if (st_lookup(avro_schema_to_enum(enump)->symbols_byname,
553 		      (st_data_t) symbol_name, &val.data)) {
554 		return val.idx;
555 	} else {
556 		avro_set_error("No enum symbol named %s", symbol_name);
557 		return -1;
558 	}
559 }
560 
561 int
avro_schema_enum_symbol_append(const avro_schema_t enum_schema,const char * symbol)562 avro_schema_enum_symbol_append(const avro_schema_t enum_schema,
563 			       const char *symbol)
564 {
565 	check_param(EINVAL, is_avro_schema(enum_schema), "enum schema");
566 	check_param(EINVAL, is_avro_enum(enum_schema), "enum schema");
567 	check_param(EINVAL, symbol, "symbol");
568 
569 	char *sym;
570 	long idx;
571 	struct avro_enum_schema_t *enump = avro_schema_to_enum(enum_schema);
572 	sym = avro_strdup(symbol);
573 	if (!sym) {
574 		avro_set_error("Cannot create copy of symbol name");
575 		return ENOMEM;
576 	}
577 	idx = enump->symbols->num_entries;
578 	st_insert(enump->symbols, (st_data_t) idx, (st_data_t) sym);
579 	st_insert(enump->symbols_byname, (st_data_t) sym, (st_data_t) idx);
580 	return 0;
581 }
582 
583 int
avro_schema_enum_number_of_symbols(const avro_schema_t enum_schema)584 avro_schema_enum_number_of_symbols(const avro_schema_t enum_schema)
585 {
586 	check_param(EINVAL, is_avro_schema(enum_schema), "enum schema");
587 	check_param(EINVAL, is_avro_enum(enum_schema), "enum schema");
588 
589 	struct avro_enum_schema_t *enump = avro_schema_to_enum(enum_schema);
590 	return enump->symbols->num_entries;
591 }
592 
593 int
avro_schema_record_field_append(const avro_schema_t record_schema,const char * field_name,const avro_schema_t field_schema)594 avro_schema_record_field_append(const avro_schema_t record_schema,
595 				const char *field_name,
596 				const avro_schema_t field_schema)
597 {
598 	check_param(EINVAL, is_avro_schema(record_schema), "record schema");
599 	check_param(EINVAL, is_avro_record(record_schema), "record schema");
600 	check_param(EINVAL, field_name, "field name");
601 	check_param(EINVAL, is_avro_schema(field_schema), "field schema");
602 
603 	if (!is_avro_id(field_name)) {
604 		avro_set_error("Invalid Avro identifier");
605 		return EINVAL;
606 	}
607 
608 	if (record_schema == field_schema) {
609 		avro_set_error("Cannot create a circular schema");
610 		return EINVAL;
611 	}
612 
613 	struct avro_record_schema_t *record = avro_schema_to_record(record_schema);
614 	struct avro_record_field_t *new_field = (struct avro_record_field_t *) avro_new(struct avro_record_field_t);
615 	if (!new_field) {
616 		avro_set_error("Cannot allocate new record field");
617 		return ENOMEM;
618 	}
619 	new_field->index = record->fields->num_entries;
620 	new_field->name = avro_strdup(field_name);
621 	new_field->type = avro_schema_incref(field_schema);
622 	st_insert(record->fields, record->fields->num_entries,
623 		  (st_data_t) new_field);
624 	st_insert(record->fields_byname, (st_data_t) new_field->name,
625 		  (st_data_t) new_field);
626 	return 0;
627 }
628 
avro_schema_record(const char * name,const char * space)629 avro_schema_t avro_schema_record(const char *name, const char *space)
630 {
631 	if (!is_avro_id(name)) {
632 		avro_set_error("Invalid Avro identifier");
633 		return NULL;
634 	}
635 
636 	struct avro_record_schema_t *record = (struct avro_record_schema_t *) avro_new(struct avro_record_schema_t);
637 	if (!record) {
638 		avro_set_error("Cannot allocate new record schema");
639 		return NULL;
640 	}
641 	record->name = avro_strdup(name);
642 	if (!record->name) {
643 		avro_set_error("Cannot allocate new record schema");
644 		avro_freet(struct avro_record_schema_t, record);
645 		return NULL;
646 	}
647 	record->space = space ? avro_strdup(space) : NULL;
648 	if (space && !record->space) {
649 		avro_set_error("Cannot allocate new record schema");
650 		avro_str_free(record->name);
651 		avro_freet(struct avro_record_schema_t, record);
652 		return NULL;
653 	}
654 	record->fields = st_init_numtable_with_size(DEFAULT_TABLE_SIZE);
655 	if (!record->fields) {
656 		avro_set_error("Cannot allocate new record schema");
657 		if (record->space) {
658 			avro_str_free(record->space);
659 		}
660 		avro_str_free(record->name);
661 		avro_freet(struct avro_record_schema_t, record);
662 		return NULL;
663 	}
664 	record->fields_byname = st_init_strtable_with_size(DEFAULT_TABLE_SIZE);
665 	if (!record->fields_byname) {
666 		avro_set_error("Cannot allocate new record schema");
667 		st_free_table(record->fields);
668 		if (record->space) {
669 			avro_str_free(record->space);
670 		}
671 		avro_str_free(record->name);
672 		avro_freet(struct avro_record_schema_t, record);
673 		return NULL;
674 	}
675 
676 	avro_schema_init(&record->obj, AVRO_RECORD);
677 	return &record->obj;
678 }
679 
avro_schema_record_size(const avro_schema_t record)680 size_t avro_schema_record_size(const avro_schema_t record)
681 {
682 	return avro_schema_to_record(record)->fields->num_entries;
683 }
684 
avro_schema_record_field_get(const avro_schema_t record,const char * field_name)685 avro_schema_t avro_schema_record_field_get(const avro_schema_t
686 					   record, const char *field_name)
687 {
688 	union {
689 		st_data_t data;
690 		struct avro_record_field_t *field;
691 	} val;
692 	st_lookup(avro_schema_to_record(record)->fields_byname,
693 		  (st_data_t) field_name, &val.data);
694 	return val.field->type;
695 }
696 
avro_schema_record_field_get_index(const avro_schema_t schema,const char * field_name)697 int avro_schema_record_field_get_index(const avro_schema_t schema,
698 				       const char *field_name)
699 {
700 	union {
701 		st_data_t data;
702 		struct avro_record_field_t *field;
703 	} val;
704 	if (st_lookup(avro_schema_to_record(schema)->fields_byname,
705 		      (st_data_t) field_name, &val.data)) {
706 		return val.field->index;
707 	}
708 
709 	avro_set_error("No field named %s in record", field_name);
710 	return -1;
711 }
712 
avro_schema_record_field_name(const avro_schema_t schema,int index)713 const char *avro_schema_record_field_name(const avro_schema_t schema, int index)
714 {
715 	union {
716 		st_data_t data;
717 		struct avro_record_field_t *field;
718 	} val;
719 	st_lookup(avro_schema_to_record(schema)->fields, index, &val.data);
720 	return val.field->name;
721 }
722 
avro_schema_record_field_get_by_index(const avro_schema_t record,int index)723 avro_schema_t avro_schema_record_field_get_by_index
724 (const avro_schema_t record, int index)
725 {
726 	union {
727 		st_data_t data;
728 		struct avro_record_field_t *field;
729 	} val;
730 	st_lookup(avro_schema_to_record(record)->fields, index, &val.data);
731 	return val.field->type;
732 }
733 
avro_schema_link(avro_schema_t to)734 avro_schema_t avro_schema_link(avro_schema_t to)
735 {
736 	if (!is_avro_named_type(to)) {
737 		avro_set_error("Can only link to named types");
738 		return NULL;
739 	}
740 
741 	struct avro_link_schema_t *link = (struct avro_link_schema_t *) avro_new(struct avro_link_schema_t);
742 	if (!link) {
743 		avro_set_error("Cannot allocate new link schema");
744 		return NULL;
745 	}
746 
747 	/* Do not increment the reference count of target schema
748 	 * pointed to by the AVRO_LINK. AVRO_LINKs are only valid
749 	 * internal to a schema. The target schema pointed to by a
750 	 * link will be valid as long as the top-level schema is
751 	 * valid. Similarly, the link will be valid as long as the
752 	 * top-level schema is valid. Therefore the validity of the
753 	 * link ensures the validity of its target, and we don't need
754 	 * an additional reference count on the target. This mechanism
755 	 * of an implied validity also breaks reference count cycles
756 	 * for recursive schemas, which result in memory leaks.
757 	 */
758 	link->to = to;
759 	avro_schema_init(&link->obj, AVRO_LINK);
760 	return &link->obj;
761 }
762 
avro_schema_link_target(avro_schema_t schema)763 avro_schema_t avro_schema_link_target(avro_schema_t schema)
764 {
765 	check_param(NULL, is_avro_schema(schema), "schema");
766 	check_param(NULL, is_avro_link(schema), "schema");
767 
768 	struct avro_link_schema_t *link = avro_schema_to_link(schema);
769 	return link->to;
770 }
771 
772 static const char *
qualify_name(const char * name,const char * namespace)773 qualify_name(const char *name, const char *namespace)
774 {
775 	char *full_name;
776 	if (namespace != NULL && strchr(name, '.') == NULL) {
777 		full_name = avro_str_alloc(strlen(name) + strlen(namespace) + 2);
778 		sprintf(full_name, "%s.%s", namespace, name);
779 	} else {
780 		full_name = avro_strdup(name);
781 	}
782 	return full_name;
783 }
784 
785 static int
save_named_schemas(const avro_schema_t schema,st_table * st)786 save_named_schemas(const avro_schema_t schema, st_table *st)
787 {
788 	const char *name = avro_schema_name(schema);
789 	const char *namespace = avro_schema_namespace(schema);
790 	const char *full_name = qualify_name(name, namespace);
791 	int rval = st_insert(st, (st_data_t) full_name, (st_data_t) schema);
792 	return rval;
793 }
794 
795 static avro_schema_t
find_named_schemas(const char * name,const char * namespace,st_table * st)796 find_named_schemas(const char *name, const char *namespace, st_table *st)
797 {
798 	union {
799 		avro_schema_t schema;
800 		st_data_t data;
801 	} val;
802 	const char *full_name = qualify_name(name, namespace);
803 	int rval = st_lookup(st, (st_data_t) full_name, &(val.data));
804 	avro_str_free((char *)full_name);
805 	if (rval) {
806 		return val.schema;
807 	}
808 	avro_set_error("No schema type named %s", name);
809 	return NULL;
810 };
811 
812 static int
avro_type_from_json_t(json_t * json,avro_type_t * type,st_table * named_schemas,avro_schema_t * named_type,const char * namespace)813 avro_type_from_json_t(json_t *json, avro_type_t *type,
814 		      st_table *named_schemas, avro_schema_t *named_type,
815 		      const char *namespace)
816 {
817 	json_t *json_type;
818 	const char *type_str;
819 
820 	if (json_is_array(json)) {
821 		*type = AVRO_UNION;
822 		return 0;
823 	} else if (json_is_object(json)) {
824 		json_type = json_object_get(json, "type");
825 	} else {
826 		json_type = json;
827 	}
828 	if (!json_is_string(json_type)) {
829 		avro_set_error("\"type\" field must be a string");
830 		return EINVAL;
831 	}
832 	type_str = json_string_value(json_type);
833 	if (!type_str) {
834 		avro_set_error("\"type\" field must be a string");
835 		return EINVAL;
836 	}
837 	/*
838 	 * TODO: gperf/re2c this
839 	 */
840 	if (strcmp(type_str, "string") == 0) {
841 		*type = AVRO_STRING;
842 	} else if (strcmp(type_str, "bytes") == 0) {
843 		*type = AVRO_BYTES;
844 	} else if (strcmp(type_str, "int") == 0) {
845 		*type = AVRO_INT32;
846 	} else if (strcmp(type_str, "long") == 0) {
847 		*type = AVRO_INT64;
848 	} else if (strcmp(type_str, "float") == 0) {
849 		*type = AVRO_FLOAT;
850 	} else if (strcmp(type_str, "double") == 0) {
851 		*type = AVRO_DOUBLE;
852 	} else if (strcmp(type_str, "boolean") == 0) {
853 		*type = AVRO_BOOLEAN;
854 	} else if (strcmp(type_str, "null") == 0) {
855 		*type = AVRO_NULL;
856 	} else if (strcmp(type_str, "record") == 0) {
857 		*type = AVRO_RECORD;
858 	} else if (strcmp(type_str, "enum") == 0) {
859 		*type = AVRO_ENUM;
860 	} else if (strcmp(type_str, "array") == 0) {
861 		*type = AVRO_ARRAY;
862 	} else if (strcmp(type_str, "map") == 0) {
863 		*type = AVRO_MAP;
864 	} else if (strcmp(type_str, "fixed") == 0) {
865 		*type = AVRO_FIXED;
866 	} else if ((*named_type = find_named_schemas(type_str, namespace, named_schemas))) {
867 		*type = AVRO_LINK;
868 	} else {
869 		avro_set_error("Unknown Avro \"type\": %s", type_str);
870 		return EINVAL;
871 	}
872 	return 0;
873 }
874 
875 static int
avro_schema_from_json_t(json_t * json,avro_schema_t * schema,st_table * named_schemas,const char * parent_namespace)876 avro_schema_from_json_t(json_t *json, avro_schema_t *schema,
877 			st_table *named_schemas, const char *parent_namespace)
878 {
879 #ifdef _WIN32
880  #pragma message("#warning: Bug: '0' is not of type avro_type_t.")
881 #else
882  #warning "Bug: '0' is not of type avro_type_t."
883 #endif
884   /* We should really have an "AVRO_INVALID" type in
885    * avro_type_t. Suppress warning below in which we set type to 0.
886    */
887 	avro_type_t type = (avro_type_t) 0;
888 	unsigned int i;
889 	avro_schema_t named_type = NULL;
890 
891 	if (avro_type_from_json_t(json, &type, named_schemas, &named_type, parent_namespace)) {
892 		return EINVAL;
893 	}
894 
895 	switch (type) {
896 	case AVRO_LINK:
897 		*schema = avro_schema_link(named_type);
898 		break;
899 
900 	case AVRO_STRING:
901 		*schema = avro_schema_string();
902 		break;
903 
904 	case AVRO_BYTES:
905 		*schema = avro_schema_bytes();
906 		break;
907 
908 	case AVRO_INT32:
909 		*schema = avro_schema_int();
910 		break;
911 
912 	case AVRO_INT64:
913 		*schema = avro_schema_long();
914 		break;
915 
916 	case AVRO_FLOAT:
917 		*schema = avro_schema_float();
918 		break;
919 
920 	case AVRO_DOUBLE:
921 		*schema = avro_schema_double();
922 		break;
923 
924 	case AVRO_BOOLEAN:
925 		*schema = avro_schema_boolean();
926 		break;
927 
928 	case AVRO_NULL:
929 		*schema = avro_schema_null();
930 		break;
931 
932 	case AVRO_RECORD:
933 		{
934 			json_t *json_name = json_object_get(json, "name");
935 			json_t *json_namespace =
936 			    json_object_get(json, "namespace");
937 			json_t *json_fields = json_object_get(json, "fields");
938 			unsigned int num_fields;
939 			const char *fullname, *name;
940 
941 			if (!json_is_string(json_name)) {
942 				avro_set_error("Record type must have a \"name\"");
943 				return EINVAL;
944 			}
945 			if (!json_is_array(json_fields)) {
946 				avro_set_error("Record type must have \"fields\"");
947 				return EINVAL;
948 			}
949 			num_fields = json_array_size(json_fields);
950 			fullname = json_string_value(json_name);
951 			if (!fullname) {
952 				avro_set_error("Record type must have a \"name\"");
953 				return EINVAL;
954 			}
955 
956 			if (strchr(fullname, '.')) {
957 				char *namespace = split_namespace_name(fullname, &name);
958 				*schema = avro_schema_record(name, namespace);
959 				avro_str_free(namespace);
960 			} else if (json_is_string(json_namespace)) {
961 				const char *namespace = json_string_value(json_namespace);
962 				if (strlen(namespace) == 0) {
963 					namespace = NULL;
964 				}
965 				*schema = avro_schema_record(fullname, namespace);
966 			} else {
967 				*schema = avro_schema_record(fullname, parent_namespace);
968 			}
969 
970 			if (*schema == NULL) {
971 				return ENOMEM;
972 			}
973 			if (save_named_schemas(*schema, named_schemas)) {
974 				avro_set_error("Cannot save record schema");
975 				return ENOMEM;
976 			}
977 			for (i = 0; i < num_fields; i++) {
978 				json_t *json_field =
979 				    json_array_get(json_fields, i);
980 				json_t *json_field_name;
981 				json_t *json_field_type;
982 				avro_schema_t json_field_type_schema;
983 				int field_rval;
984 
985 				if (!json_is_object(json_field)) {
986 					avro_set_error("Record field %d must be an array", i);
987 					avro_schema_decref(*schema);
988 					return EINVAL;
989 				}
990 				json_field_name =
991 				    json_object_get(json_field, "name");
992 				if (!json_field_name) {
993 					avro_set_error("Record field %d must have a \"name\"", i);
994 					avro_schema_decref(*schema);
995 					return EINVAL;
996 				}
997 				json_field_type =
998 				    json_object_get(json_field, "type");
999 				if (!json_field_type) {
1000 					avro_set_error("Record field %d must have a \"type\"", i);
1001 					avro_schema_decref(*schema);
1002 					return EINVAL;
1003 				}
1004 				field_rval =
1005 				    avro_schema_from_json_t(json_field_type,
1006 							    &json_field_type_schema,
1007 							    named_schemas,
1008 							    avro_schema_namespace(*schema));
1009 				if (field_rval) {
1010 					avro_schema_decref(*schema);
1011 					return field_rval;
1012 				}
1013 				field_rval =
1014 				    avro_schema_record_field_append(*schema,
1015 								    json_string_value
1016 								    (json_field_name),
1017 								    json_field_type_schema);
1018 				avro_schema_decref(json_field_type_schema);
1019 				if (field_rval != 0) {
1020 					avro_schema_decref(*schema);
1021 					return field_rval;
1022 				}
1023 			}
1024 		}
1025 		break;
1026 
1027 	case AVRO_ENUM:
1028 		{
1029 			json_t *json_name = json_object_get(json, "name");
1030 			json_t *json_symbols = json_object_get(json, "symbols");
1031 			json_t *json_namespace = json_object_get(json, "namespace");
1032 			const char *fullname, *name;
1033 			unsigned int num_symbols;
1034 
1035 			if (!json_is_string(json_name)) {
1036 				avro_set_error("Enum type must have a \"name\"");
1037 				return EINVAL;
1038 			}
1039 			if (!json_is_array(json_symbols)) {
1040 				avro_set_error("Enum type must have \"symbols\"");
1041 				return EINVAL;
1042 			}
1043 
1044 			fullname = json_string_value(json_name);
1045 			if (!fullname) {
1046 				avro_set_error("Enum type must have a \"name\"");
1047 				return EINVAL;
1048 			}
1049 			num_symbols = json_array_size(json_symbols);
1050 			if (num_symbols == 0) {
1051 				avro_set_error("Enum type must have at least one symbol");
1052 				return EINVAL;
1053 			}
1054 
1055 			if (strchr(fullname, '.')) {
1056 				char *namespace;
1057 				namespace = split_namespace_name(fullname, &name);
1058 				*schema = avro_schema_enum_ns(name, namespace);
1059 				avro_str_free(namespace);
1060 			} else if (json_is_string(json_namespace)) {
1061 				const char *namespace = json_string_value(json_namespace);
1062 				if (strlen(namespace) == 0) {
1063 					namespace = NULL;
1064 				}
1065 				*schema = avro_schema_enum_ns(fullname, namespace);
1066 			} else {
1067 				*schema = avro_schema_enum_ns(fullname, parent_namespace);
1068 			}
1069 
1070 			if (*schema == NULL) {
1071 				return ENOMEM;
1072 			}
1073 			if (save_named_schemas(*schema, named_schemas)) {
1074 				avro_set_error("Cannot save enum schema");
1075 				return ENOMEM;
1076 			}
1077 			for (i = 0; i < num_symbols; i++) {
1078 				int enum_rval;
1079 				json_t *json_symbol =
1080 				    json_array_get(json_symbols, i);
1081 				const char *symbol;
1082 				if (!json_is_string(json_symbol)) {
1083 					avro_set_error("Enum symbol %d must be a string", i);
1084 					avro_schema_decref(*schema);
1085 					return EINVAL;
1086 				}
1087 				symbol = json_string_value(json_symbol);
1088 				enum_rval =
1089 				    avro_schema_enum_symbol_append(*schema,
1090 								   symbol);
1091 				if (enum_rval != 0) {
1092 					avro_schema_decref(*schema);
1093 					return enum_rval;
1094 				}
1095 			}
1096 		}
1097 		break;
1098 
1099 	case AVRO_ARRAY:
1100 		{
1101 			int items_rval;
1102 			json_t *json_items = json_object_get(json, "items");
1103 			avro_schema_t items_schema;
1104 			if (!json_items) {
1105 				avro_set_error("Array type must have \"items\"");
1106 				return EINVAL;
1107 			}
1108 			items_rval =
1109 			    avro_schema_from_json_t(json_items, &items_schema,
1110 						    named_schemas, parent_namespace);
1111 			if (items_rval) {
1112 				return items_rval;
1113 			}
1114 			*schema = avro_schema_array(items_schema);
1115 			avro_schema_decref(items_schema);
1116 		}
1117 		break;
1118 
1119 	case AVRO_MAP:
1120 		{
1121 			int values_rval;
1122 			json_t *json_values = json_object_get(json, "values");
1123 			avro_schema_t values_schema;
1124 
1125 			if (!json_values) {
1126 				avro_set_error("Map type must have \"values\"");
1127 				return EINVAL;
1128 			}
1129 			values_rval =
1130 			    avro_schema_from_json_t(json_values, &values_schema,
1131 						    named_schemas, parent_namespace);
1132 			if (values_rval) {
1133 				return values_rval;
1134 			}
1135 			*schema = avro_schema_map(values_schema);
1136 			avro_schema_decref(values_schema);
1137 		}
1138 		break;
1139 
1140 	case AVRO_UNION:
1141 		{
1142 			unsigned int num_schemas = json_array_size(json);
1143 			avro_schema_t s;
1144 			if (num_schemas == 0) {
1145 				avro_set_error("Union type must have at least one branch");
1146 				return EINVAL;
1147 			}
1148 			*schema = avro_schema_union();
1149 			for (i = 0; i < num_schemas; i++) {
1150 				int schema_rval;
1151 				json_t *schema_json = json_array_get(json, i);
1152 				if (!schema_json) {
1153 					avro_set_error("Cannot retrieve branch JSON");
1154 					return EINVAL;
1155 				}
1156 				schema_rval =
1157 				    avro_schema_from_json_t(schema_json, &s,
1158 							    named_schemas, parent_namespace);
1159 				if (schema_rval != 0) {
1160 					avro_schema_decref(*schema);
1161 					return schema_rval;
1162 				}
1163 				schema_rval =
1164 				    avro_schema_union_append(*schema, s);
1165 				avro_schema_decref(s);
1166 				if (schema_rval != 0) {
1167 					avro_schema_decref(*schema);
1168 					return schema_rval;
1169 				}
1170 			}
1171 		}
1172 		break;
1173 
1174 	case AVRO_FIXED:
1175 		{
1176 			json_t *json_size = json_object_get(json, "size");
1177 			json_t *json_name = json_object_get(json, "name");
1178 			json_t *json_namespace = json_object_get(json, "namespace");
1179 			json_int_t size;
1180 			const char *fullname, *name;
1181 			if (!json_is_integer(json_size)) {
1182 				avro_set_error("Fixed type must have a \"size\"");
1183 				return EINVAL;
1184 			}
1185 			if (!json_is_string(json_name)) {
1186 				avro_set_error("Fixed type must have a \"name\"");
1187 				return EINVAL;
1188 			}
1189 			size = json_integer_value(json_size);
1190 			fullname = json_string_value(json_name);
1191 
1192 			if (strchr(fullname, '.')) {
1193 				char *namespace;
1194 				namespace = split_namespace_name(fullname, &name);
1195 				*schema = avro_schema_fixed_ns(name, namespace, (int64_t) size);
1196 				avro_str_free(namespace);
1197 			} else if (json_is_string(json_namespace)) {
1198 				const char *namespace = json_string_value(json_namespace);
1199 				if (strlen(namespace) == 0) {
1200 					namespace = NULL;
1201 				}
1202 				*schema = avro_schema_fixed_ns(fullname, namespace, (int64_t) size);
1203 			} else {
1204 				*schema = avro_schema_fixed_ns(fullname, parent_namespace, (int64_t) size);
1205 			}
1206 
1207 			if (*schema == NULL) {
1208 				return ENOMEM;
1209 			}
1210 			if (save_named_schemas(*schema, named_schemas)) {
1211 				avro_set_error("Cannot save fixed schema");
1212 				return ENOMEM;
1213 			}
1214 		}
1215 		break;
1216 
1217 	default:
1218 		avro_set_error("Unknown schema type");
1219 		return EINVAL;
1220 	}
1221 	return 0;
1222 }
1223 
named_schema_free_foreach(char * full_name,st_data_t value,st_data_t arg)1224 static int named_schema_free_foreach(char *full_name, st_data_t value, st_data_t arg)
1225 {
1226 	AVRO_UNUSED(value);
1227 	AVRO_UNUSED(arg);
1228 
1229 	avro_str_free(full_name);
1230 	return ST_DELETE;
1231 }
1232 
1233 static int
avro_schema_from_json_root(json_t * root,avro_schema_t * schema)1234 avro_schema_from_json_root(json_t *root, avro_schema_t *schema)
1235 {
1236 	int  rval;
1237 	st_table *named_schemas;
1238 
1239 	named_schemas = st_init_strtable_with_size(DEFAULT_TABLE_SIZE);
1240 	if (!named_schemas) {
1241 		avro_set_error("Cannot allocate named schema map");
1242 		json_decref(root);
1243 		return ENOMEM;
1244 	}
1245 
1246 	/* json_dumpf(root, stderr, 0); */
1247 	rval = avro_schema_from_json_t(root, schema, named_schemas, NULL);
1248 	json_decref(root);
1249 	st_foreach(named_schemas, HASH_FUNCTION_CAST named_schema_free_foreach, 0);
1250 	st_free_table(named_schemas);
1251 	return rval;
1252 }
1253 
1254 int
avro_schema_from_json(const char * jsontext,const int32_t len,avro_schema_t * schema,avro_schema_error_t * e)1255 avro_schema_from_json(const char *jsontext, const int32_t len,
1256 		      avro_schema_t *schema, avro_schema_error_t *e)
1257 {
1258 	check_param(EINVAL, jsontext, "JSON text");
1259 	check_param(EINVAL, schema, "schema pointer");
1260 
1261 	json_t  *root;
1262 	json_error_t  json_error;
1263 
1264 	AVRO_UNUSED(len);
1265 	AVRO_UNUSED(e);
1266 
1267 	root = json_loads(jsontext, JSON_DECODE_ANY, &json_error);
1268 	if (!root) {
1269 		avro_set_error("Error parsing JSON: %s", json_error.text);
1270 		return EINVAL;
1271 	}
1272 
1273 	return avro_schema_from_json_root(root, schema);
1274 }
1275 
1276 int
avro_schema_from_json_length(const char * jsontext,size_t length,avro_schema_t * schema)1277 avro_schema_from_json_length(const char *jsontext, size_t length,
1278 			     avro_schema_t *schema)
1279 {
1280 	check_param(EINVAL, jsontext, "JSON text");
1281 	check_param(EINVAL, schema, "schema pointer");
1282 
1283 	json_t  *root;
1284 	json_error_t  json_error;
1285 
1286 	root = json_loadb(jsontext, length, JSON_DECODE_ANY, &json_error);
1287 	if (!root) {
1288 		avro_set_error("Error parsing JSON: %s", json_error.text);
1289 		return EINVAL;
1290 	}
1291 
1292 	return avro_schema_from_json_root(root, schema);
1293 }
1294 
avro_schema_copy_root(avro_schema_t schema,st_table * named_schemas)1295 avro_schema_t avro_schema_copy_root(avro_schema_t schema, st_table *named_schemas)
1296 {
1297 	long i;
1298 	avro_schema_t new_schema = NULL;
1299 	if (!schema) {
1300 		return NULL;
1301 	}
1302 	switch (avro_typeof(schema)) {
1303 	case AVRO_STRING:
1304 	case AVRO_BYTES:
1305 	case AVRO_INT32:
1306 	case AVRO_INT64:
1307 	case AVRO_FLOAT:
1308 	case AVRO_DOUBLE:
1309 	case AVRO_BOOLEAN:
1310 	case AVRO_NULL:
1311 		/*
1312 		 * No need to copy primitives since they're static
1313 		 */
1314 		new_schema = schema;
1315 		break;
1316 
1317 	case AVRO_RECORD:
1318 		{
1319 			struct avro_record_schema_t *record_schema =
1320 			    avro_schema_to_record(schema);
1321 			new_schema =
1322 			    avro_schema_record(record_schema->name,
1323 					       record_schema->space);
1324 		    if (save_named_schemas(new_schema, named_schemas)) {
1325    				avro_set_error("Cannot save enum schema");
1326    				return NULL;
1327    			}
1328 			for (i = 0; i < record_schema->fields->num_entries; i++) {
1329 				union {
1330 					st_data_t data;
1331 					struct avro_record_field_t *field;
1332 				} val;
1333 				st_lookup(record_schema->fields, i, &val.data);
1334 				avro_schema_t type_copy =
1335 				    avro_schema_copy_root(val.field->type, named_schemas);
1336 				avro_schema_record_field_append(new_schema,
1337 								val.field->name,
1338 								type_copy);
1339 				avro_schema_decref(type_copy);
1340 			}
1341 		}
1342 		break;
1343 
1344 	case AVRO_ENUM:
1345 		{
1346 			struct avro_enum_schema_t *enum_schema =
1347 			    avro_schema_to_enum(schema);
1348 			new_schema = avro_schema_enum_ns(enum_schema->name,
1349 					enum_schema->space);
1350 			if (save_named_schemas(new_schema, named_schemas)) {
1351 				avro_set_error("Cannot save enum schema");
1352 				return NULL;
1353 			}
1354 			for (i = 0; i < enum_schema->symbols->num_entries; i++) {
1355 				union {
1356 					st_data_t data;
1357 					char *sym;
1358 				} val;
1359 				st_lookup(enum_schema->symbols, i, &val.data);
1360 				avro_schema_enum_symbol_append(new_schema,
1361 							       val.sym);
1362 			}
1363 		}
1364 		break;
1365 
1366 	case AVRO_FIXED:
1367 		{
1368 			struct avro_fixed_schema_t *fixed_schema =
1369 			    avro_schema_to_fixed(schema);
1370 			new_schema =
1371 			    avro_schema_fixed_ns(fixed_schema->name,
1372 					         fixed_schema->space,
1373 					         fixed_schema->size);
1374  			if (save_named_schemas(new_schema, named_schemas)) {
1375  				avro_set_error("Cannot save fixed schema");
1376  				return NULL;
1377  			}
1378 		}
1379 		break;
1380 
1381 	case AVRO_MAP:
1382 		{
1383 			struct avro_map_schema_t *map_schema =
1384 			    avro_schema_to_map(schema);
1385 			avro_schema_t values_copy =
1386 			    avro_schema_copy_root(map_schema->values, named_schemas);
1387 			if (!values_copy) {
1388 				return NULL;
1389 			}
1390 			new_schema = avro_schema_map(values_copy);
1391 			avro_schema_decref(values_copy);
1392 		}
1393 		break;
1394 
1395 	case AVRO_ARRAY:
1396 		{
1397 			struct avro_array_schema_t *array_schema =
1398 			    avro_schema_to_array(schema);
1399 			avro_schema_t items_copy =
1400 			    avro_schema_copy_root(array_schema->items, named_schemas);
1401 			if (!items_copy) {
1402 				return NULL;
1403 			}
1404 			new_schema = avro_schema_array(items_copy);
1405 			avro_schema_decref(items_copy);
1406 		}
1407 		break;
1408 
1409 	case AVRO_UNION:
1410 		{
1411 			struct avro_union_schema_t *union_schema =
1412 			    avro_schema_to_union(schema);
1413 
1414 			new_schema = avro_schema_union();
1415 			for (i = 0; i < union_schema->branches->num_entries;
1416 			     i++) {
1417 				avro_schema_t schema_copy;
1418 				union {
1419 					st_data_t data;
1420 					avro_schema_t schema;
1421 				} val;
1422 				st_lookup(union_schema->branches, i, &val.data);
1423 				schema_copy = avro_schema_copy_root(val.schema, named_schemas);
1424 				if (avro_schema_union_append
1425 				    (new_schema, schema_copy)) {
1426 					avro_schema_decref(new_schema);
1427 					return NULL;
1428 				}
1429 				avro_schema_decref(schema_copy);
1430 			}
1431 		}
1432 		break;
1433 
1434 	case AVRO_LINK:
1435 		{
1436 			struct avro_link_schema_t *link_schema =
1437 			    avro_schema_to_link(schema);
1438 			avro_schema_t to;
1439 
1440 			to = find_named_schemas(avro_schema_name(link_schema->to),
1441 									avro_schema_namespace(link_schema->to),
1442 									named_schemas);
1443 			new_schema = avro_schema_link(to);
1444 		}
1445 		break;
1446 
1447 	default:
1448 		return NULL;
1449 	}
1450 	return new_schema;
1451 }
1452 
avro_schema_copy(avro_schema_t schema)1453 avro_schema_t avro_schema_copy(avro_schema_t schema)
1454 {
1455 	avro_schema_t new_schema;
1456 	st_table *named_schemas;
1457 
1458 	named_schemas = st_init_strtable_with_size(DEFAULT_TABLE_SIZE);
1459 	if (!named_schemas) {
1460 		avro_set_error("Cannot allocate named schema map");
1461 		return NULL;
1462 	}
1463 
1464 	new_schema = avro_schema_copy_root(schema, named_schemas);
1465 	st_foreach(named_schemas, HASH_FUNCTION_CAST named_schema_free_foreach, 0);
1466 	st_free_table(named_schemas);
1467 	return new_schema;
1468 }
1469 
avro_schema_get_subschema(const avro_schema_t schema,const char * name)1470 avro_schema_t avro_schema_get_subschema(const avro_schema_t schema,
1471          const char *name)
1472 {
1473  if (is_avro_record(schema)) {
1474    const struct avro_record_schema_t *rschema =
1475      avro_schema_to_record(schema);
1476    union {
1477      st_data_t data;
1478      struct avro_record_field_t *field;
1479    } field;
1480 
1481    if (st_lookup(rschema->fields_byname,
1482            (st_data_t) name, &field.data))
1483    {
1484      return field.field->type;
1485    }
1486 
1487    avro_set_error("No record field named %s", name);
1488    return NULL;
1489  } else if (is_avro_union(schema)) {
1490    const struct avro_union_schema_t *uschema =
1491      avro_schema_to_union(schema);
1492    long i;
1493 
1494    for (i = 0; i < uschema->branches->num_entries; i++) {
1495      union {
1496        st_data_t data;
1497        avro_schema_t schema;
1498      } val;
1499      st_lookup(uschema->branches, i, &val.data);
1500      if (strcmp(avro_schema_type_name(val.schema),
1501           name) == 0)
1502      {
1503        return val.schema;
1504      }
1505    }
1506 
1507    avro_set_error("No union branch named %s", name);
1508    return NULL;
1509  } else if (is_avro_array(schema)) {
1510    if (strcmp(name, "[]") == 0) {
1511      const struct avro_array_schema_t *aschema =
1512        avro_schema_to_array(schema);
1513      return aschema->items;
1514    }
1515 
1516    avro_set_error("Array subschema must be called \"[]\"");
1517    return NULL;
1518  } else if (is_avro_map(schema)) {
1519    if (strcmp(name, "{}") == 0) {
1520      const struct avro_map_schema_t *mschema =
1521        avro_schema_to_map(schema);
1522      return mschema->values;
1523    }
1524 
1525    avro_set_error("Map subschema must be called \"{}\"");
1526    return NULL;
1527  }
1528 
1529  avro_set_error("Can only retrieve subschemas from record, union, array, or map");
1530  return NULL;
1531 }
1532 
avro_schema_name(const avro_schema_t schema)1533 const char *avro_schema_name(const avro_schema_t schema)
1534 {
1535 	if (is_avro_record(schema)) {
1536 		return (avro_schema_to_record(schema))->name;
1537 	} else if (is_avro_enum(schema)) {
1538 		return (avro_schema_to_enum(schema))->name;
1539 	} else if (is_avro_fixed(schema)) {
1540 		return (avro_schema_to_fixed(schema))->name;
1541 	}
1542 	avro_set_error("Schema has no name");
1543 	return NULL;
1544 }
1545 
avro_schema_namespace(const avro_schema_t schema)1546 const char *avro_schema_namespace(const avro_schema_t schema)
1547 {
1548 	if (is_avro_record(schema)) {
1549 		return (avro_schema_to_record(schema))->space;
1550 	} else if (is_avro_enum(schema)) {
1551 		return (avro_schema_to_enum(schema))->space;
1552 	} else if (is_avro_fixed(schema)) {
1553 		return (avro_schema_to_fixed(schema))->space;
1554 	}
1555 	return NULL;
1556 }
1557 
avro_schema_type_name(const avro_schema_t schema)1558 const char *avro_schema_type_name(const avro_schema_t schema)
1559 {
1560 	if (is_avro_record(schema)) {
1561 		return (avro_schema_to_record(schema))->name;
1562 	} else if (is_avro_enum(schema)) {
1563 		return (avro_schema_to_enum(schema))->name;
1564 	} else if (is_avro_fixed(schema)) {
1565 		return (avro_schema_to_fixed(schema))->name;
1566 	} else if (is_avro_union(schema)) {
1567 		return "union";
1568 	} else if (is_avro_array(schema)) {
1569 		return "array";
1570 	} else if (is_avro_map(schema)) {
1571 		return "map";
1572 	} else if (is_avro_int32(schema)) {
1573 		return "int";
1574 	} else if (is_avro_int64(schema)) {
1575 		return "long";
1576 	} else if (is_avro_float(schema)) {
1577 		return "float";
1578 	} else if (is_avro_double(schema)) {
1579 		return "double";
1580 	} else if (is_avro_boolean(schema)) {
1581 		return "boolean";
1582 	} else if (is_avro_null(schema)) {
1583 		return "null";
1584 	} else if (is_avro_string(schema)) {
1585 		return "string";
1586 	} else if (is_avro_bytes(schema)) {
1587 		return "bytes";
1588 	} else if (is_avro_link(schema)) {
1589 		avro_schema_t  target = avro_schema_link_target(schema);
1590 		return avro_schema_type_name(target);
1591 	}
1592 	avro_set_error("Unknown schema type");
1593 	return NULL;
1594 }
1595 
avro_datum_from_schema(const avro_schema_t schema)1596 avro_datum_t avro_datum_from_schema(const avro_schema_t schema)
1597 {
1598 	check_param(NULL, is_avro_schema(schema), "schema");
1599 
1600 	switch (avro_typeof(schema)) {
1601 		case AVRO_STRING:
1602 			return avro_givestring("", NULL);
1603 
1604 		case AVRO_BYTES:
1605 			return avro_givebytes("", 0, NULL);
1606 
1607 		case AVRO_INT32:
1608 			return avro_int32(0);
1609 
1610 		case AVRO_INT64:
1611 			return avro_int64(0);
1612 
1613 		case AVRO_FLOAT:
1614 			return avro_float(0);
1615 
1616 		case AVRO_DOUBLE:
1617 			return avro_double(0);
1618 
1619 		case AVRO_BOOLEAN:
1620 			return avro_boolean(0);
1621 
1622 		case AVRO_NULL:
1623 			return avro_null();
1624 
1625 		case AVRO_RECORD:
1626 			{
1627 				const struct avro_record_schema_t *record_schema =
1628 				    avro_schema_to_record(schema);
1629 
1630 				avro_datum_t  rec = avro_record(schema);
1631 
1632 				int  i;
1633 				for (i = 0; i < record_schema->fields->num_entries; i++) {
1634 					union {
1635 						st_data_t data;
1636 						struct avro_record_field_t *field;
1637 					} val;
1638 					st_lookup(record_schema->fields, i, &val.data);
1639 
1640 					avro_datum_t  field =
1641 					    avro_datum_from_schema(val.field->type);
1642 					avro_record_set(rec, val.field->name, field);
1643 					avro_datum_decref(field);
1644 				}
1645 
1646 				return rec;
1647 			}
1648 
1649 		case AVRO_ENUM:
1650 			return avro_enum(schema, 0);
1651 
1652 		case AVRO_FIXED:
1653 			{
1654 				const struct avro_fixed_schema_t *fixed_schema =
1655 				    avro_schema_to_fixed(schema);
1656 				return avro_givefixed(schema, NULL, fixed_schema->size, NULL);
1657 			}
1658 
1659 		case AVRO_MAP:
1660 			return avro_map(schema);
1661 
1662 		case AVRO_ARRAY:
1663 			return avro_array(schema);
1664 
1665 		case AVRO_UNION:
1666 			return avro_union(schema, -1, NULL);
1667 
1668 		case AVRO_LINK:
1669 			{
1670 				const struct avro_link_schema_t *link_schema =
1671 				    avro_schema_to_link(schema);
1672 				return avro_datum_from_schema(link_schema->to);
1673 			}
1674 
1675 		default:
1676 			avro_set_error("Unknown schema type");
1677 			return NULL;
1678 	}
1679 }
1680 
1681 /* simple helper for writing strings */
avro_write_str(avro_writer_t out,const char * str)1682 static int avro_write_str(avro_writer_t out, const char *str)
1683 {
1684 	return avro_write(out, (char *)str, strlen(str));
1685 }
1686 
write_field(avro_writer_t out,const struct avro_record_field_t * field,const char * parent_namespace)1687 static int write_field(avro_writer_t out, const struct avro_record_field_t *field,
1688 		       const char *parent_namespace)
1689 {
1690 	int rval;
1691 	check(rval, avro_write_str(out, "{\"name\":\""));
1692 	check(rval, avro_write_str(out, field->name));
1693 	check(rval, avro_write_str(out, "\",\"type\":"));
1694 	check(rval, avro_schema_to_json2(field->type, out, parent_namespace));
1695 	return avro_write_str(out, "}");
1696 }
1697 
write_record(avro_writer_t out,const struct avro_record_schema_t * record,const char * parent_namespace)1698 static int write_record(avro_writer_t out, const struct avro_record_schema_t *record,
1699 			const char *parent_namespace)
1700 {
1701 	int rval;
1702 	long i;
1703 
1704 	check(rval, avro_write_str(out, "{\"type\":\"record\",\"name\":\""));
1705 	check(rval, avro_write_str(out, record->name));
1706 	check(rval, avro_write_str(out, "\","));
1707 	if (nullstrcmp(record->space, parent_namespace)) {
1708 		check(rval, avro_write_str(out, "\"namespace\":\""));
1709 		if (record->space) {
1710 			check(rval, avro_write_str(out, record->space));
1711 		}
1712 		check(rval, avro_write_str(out, "\","));
1713 	}
1714 	check(rval, avro_write_str(out, "\"fields\":["));
1715 	for (i = 0; i < record->fields->num_entries; i++) {
1716 		union {
1717 			st_data_t data;
1718 			struct avro_record_field_t *field;
1719 		} val;
1720 		st_lookup(record->fields, i, &val.data);
1721 		if (i) {
1722 			check(rval, avro_write_str(out, ","));
1723 		}
1724 		check(rval, write_field(out, val.field, record->space));
1725 	}
1726 	return avro_write_str(out, "]}");
1727 }
1728 
write_enum(avro_writer_t out,const struct avro_enum_schema_t * enump,const char * parent_namespace)1729 static int write_enum(avro_writer_t out, const struct avro_enum_schema_t *enump,
1730 			const char *parent_namespace)
1731 {
1732 	int rval;
1733 	long i;
1734 	check(rval, avro_write_str(out, "{\"type\":\"enum\",\"name\":\""));
1735 	check(rval, avro_write_str(out, enump->name));
1736 	check(rval, avro_write_str(out, "\","));
1737 	if (nullstrcmp(enump->space, parent_namespace)) {
1738 		check(rval, avro_write_str(out, "\"namespace\":\""));
1739 		if (enump->space) {
1740 			check(rval, avro_write_str(out, enump->space));
1741 		}
1742 		check(rval, avro_write_str(out, "\","));
1743 	}
1744 	check(rval, avro_write_str(out, "\"symbols\":["));
1745 
1746 	for (i = 0; i < enump->symbols->num_entries; i++) {
1747 		union {
1748 			st_data_t data;
1749 			char *sym;
1750 		} val;
1751 		st_lookup(enump->symbols, i, &val.data);
1752 		if (i) {
1753 			check(rval, avro_write_str(out, ","));
1754 		}
1755 		check(rval, avro_write_str(out, "\""));
1756 		check(rval, avro_write_str(out, val.sym));
1757 		check(rval, avro_write_str(out, "\""));
1758 	}
1759 	return avro_write_str(out, "]}");
1760 }
1761 
write_fixed(avro_writer_t out,const struct avro_fixed_schema_t * fixed,const char * parent_namespace)1762 static int write_fixed(avro_writer_t out, const struct avro_fixed_schema_t *fixed,
1763 			const char *parent_namespace)
1764 {
1765 	int rval;
1766 	char size[16];
1767 	check(rval, avro_write_str(out, "{\"type\":\"fixed\",\"name\":\""));
1768 	check(rval, avro_write_str(out, fixed->name));
1769 	check(rval, avro_write_str(out, "\","));
1770 	if (nullstrcmp(fixed->space, parent_namespace)) {
1771 		check(rval, avro_write_str(out, "\"namespace\":\""));
1772 		if (fixed->space) {
1773 			check(rval, avro_write_str(out, fixed->space));
1774 		}
1775 		check(rval, avro_write_str(out, "\","));
1776 	}
1777 	check(rval, avro_write_str(out, "\"size\":"));
1778 	snprintf(size, sizeof(size), "%" PRId64, fixed->size);
1779 	check(rval, avro_write_str(out, size));
1780 	return avro_write_str(out, "}");
1781 }
1782 
write_map(avro_writer_t out,const struct avro_map_schema_t * map,const char * parent_namespace)1783 static int write_map(avro_writer_t out, const struct avro_map_schema_t *map,
1784 		     const char *parent_namespace)
1785 {
1786 	int rval;
1787 	check(rval, avro_write_str(out, "{\"type\":\"map\",\"values\":"));
1788 	check(rval, avro_schema_to_json2(map->values, out, parent_namespace));
1789 	return avro_write_str(out, "}");
1790 }
write_array(avro_writer_t out,const struct avro_array_schema_t * array,const char * parent_namespace)1791 static int write_array(avro_writer_t out, const struct avro_array_schema_t *array,
1792 		       const char *parent_namespace)
1793 {
1794 	int rval;
1795 	check(rval, avro_write_str(out, "{\"type\":\"array\",\"items\":"));
1796 	check(rval, avro_schema_to_json2(array->items, out, parent_namespace));
1797 	return avro_write_str(out, "}");
1798 }
write_union(avro_writer_t out,const struct avro_union_schema_t * unionp,const char * parent_namespace)1799 static int write_union(avro_writer_t out, const struct avro_union_schema_t *unionp,
1800 		       const char *parent_namespace)
1801 {
1802 	int rval;
1803 	long i;
1804 	check(rval, avro_write_str(out, "["));
1805 
1806 	for (i = 0; i < unionp->branches->num_entries; i++) {
1807 		union {
1808 			st_data_t data;
1809 			avro_schema_t schema;
1810 		} val;
1811 		st_lookup(unionp->branches, i, &val.data);
1812 		if (i) {
1813 			check(rval, avro_write_str(out, ","));
1814 		}
1815 		check(rval, avro_schema_to_json2(val.schema, out, parent_namespace));
1816 	}
1817 	return avro_write_str(out, "]");
1818 }
write_link(avro_writer_t out,const struct avro_link_schema_t * link,const char * parent_namespace)1819 static int write_link(avro_writer_t out, const struct avro_link_schema_t *link,
1820 		      const char *parent_namespace)
1821 {
1822 	int rval;
1823 	check(rval, avro_write_str(out, "\""));
1824 	const char *namespace = avro_schema_namespace(link->to);
1825 	if (namespace && nullstrcmp(namespace, parent_namespace)) {
1826 		check(rval, avro_write_str(out, namespace));
1827 		check(rval, avro_write_str(out, "."));
1828 	}
1829 	check(rval, avro_write_str(out, avro_schema_name(link->to)));
1830 	return avro_write_str(out, "\"");
1831 }
1832 
1833 static int
avro_schema_to_json2(const avro_schema_t schema,avro_writer_t out,const char * parent_namespace)1834 avro_schema_to_json2(const avro_schema_t schema, avro_writer_t out,
1835 		     const char *parent_namespace)
1836 {
1837 	check_param(EINVAL, is_avro_schema(schema), "schema");
1838 	check_param(EINVAL, out, "writer");
1839 
1840 	int rval;
1841 
1842 	if (is_avro_primitive(schema)) {
1843 		check(rval, avro_write_str(out, "{\"type\":\""));
1844 	}
1845 
1846 	switch (avro_typeof(schema)) {
1847 	case AVRO_STRING:
1848 		check(rval, avro_write_str(out, "string"));
1849 		break;
1850 	case AVRO_BYTES:
1851 		check(rval, avro_write_str(out, "bytes"));
1852 		break;
1853 	case AVRO_INT32:
1854 		check(rval, avro_write_str(out, "int"));
1855 		break;
1856 	case AVRO_INT64:
1857 		check(rval, avro_write_str(out, "long"));
1858 		break;
1859 	case AVRO_FLOAT:
1860 		check(rval, avro_write_str(out, "float"));
1861 		break;
1862 	case AVRO_DOUBLE:
1863 		check(rval, avro_write_str(out, "double"));
1864 		break;
1865 	case AVRO_BOOLEAN:
1866 		check(rval, avro_write_str(out, "boolean"));
1867 		break;
1868 	case AVRO_NULL:
1869 		check(rval, avro_write_str(out, "null"));
1870 		break;
1871 	case AVRO_RECORD:
1872 		return write_record(out, avro_schema_to_record(schema), parent_namespace);
1873 	case AVRO_ENUM:
1874 		return write_enum(out, avro_schema_to_enum(schema), parent_namespace);
1875 	case AVRO_FIXED:
1876 		return write_fixed(out, avro_schema_to_fixed(schema), parent_namespace);
1877 	case AVRO_MAP:
1878 		return write_map(out, avro_schema_to_map(schema), parent_namespace);
1879 	case AVRO_ARRAY:
1880 		return write_array(out, avro_schema_to_array(schema), parent_namespace);
1881 	case AVRO_UNION:
1882 		return write_union(out, avro_schema_to_union(schema), parent_namespace);
1883 	case AVRO_LINK:
1884 		return write_link(out, avro_schema_to_link(schema), parent_namespace);
1885 	}
1886 
1887 	if (is_avro_primitive(schema)) {
1888 		return avro_write_str(out, "\"}");
1889 	}
1890 	avro_set_error("Unknown schema type");
1891 	return EINVAL;
1892 }
1893 
avro_schema_to_json(const avro_schema_t schema,avro_writer_t out)1894 int avro_schema_to_json(const avro_schema_t schema, avro_writer_t out)
1895 {
1896 	return avro_schema_to_json2(schema, out, NULL);
1897 }
1898