1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to you under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.	 You may obtain a copy of the License at
8  *
9  * https://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14  * implied.  See the License for the specific language governing
15  * permissions and limitations under the License.
16  */
17 
18 #include <avro/platform.h>
19 #include <stdlib.h>
20 #include <string.h>
21 
22 #include "avro_private.h"
23 #include "avro/allocation.h"
24 #include "avro/basics.h"
25 #include "avro/data.h"
26 #include "avro/errors.h"
27 #include "avro/refcount.h"
28 #include "avro/resolver.h"
29 #include "avro/schema.h"
30 #include "avro/value.h"
31 #include "st.h"
32 
33 #ifndef AVRO_RESOLVER_DEBUG
34 #define AVRO_RESOLVER_DEBUG 0
35 #endif
36 
37 #if AVRO_RESOLVER_DEBUG
38 #include <stdio.h>
39 #define DEBUG(...) \
40 	do { \
41 		fprintf(stderr, __VA_ARGS__); \
42 		fprintf(stderr, "\n"); \
43 	} while (0)
44 #else
45 #define DEBUG(...)  /* don't print messages */
46 #endif
47 
48 
49 typedef struct avro_resolved_writer  avro_resolved_writer_t;
50 
51 struct avro_resolved_writer {
52 	avro_value_iface_t  parent;
53 
54 	/** The reference count for this interface. */
55 	volatile int  refcount;
56 
57 	/** The writer schema. */
58 	avro_schema_t  wschema;
59 
60 	/** The reader schema. */
61 	avro_schema_t  rschema;
62 
63 	/* If the reader schema is a union, but the writer schema is
64 	 * not, this field indicates which branch of the reader union
65 	 * should be selected. */
66 	int  reader_union_branch;
67 
68 	/* The size of the value instances for this resolver. */
69 	size_t  instance_size;
70 
71 	/* A function to calculate the instance size once the overall
72 	 * top-level resolver (and all of its children) have been
73 	 * constructed. */
74 	void
75 	(*calculate_size)(avro_resolved_writer_t *iface);
76 
77 	/* A free function for this resolver interface */
78 	void
79 	(*free_iface)(avro_resolved_writer_t *iface, st_table *freeing);
80 
81 	/* An initialization function for instances of this resolver. */
82 	int
83 	(*init)(const avro_resolved_writer_t *iface, void *self);
84 
85 	/* A finalization function for instances of this resolver. */
86 	void
87 	(*done)(const avro_resolved_writer_t *iface, void *self);
88 
89 	/* Clear out any existing wrappers, if any */
90 	int
91 	(*reset_wrappers)(const avro_resolved_writer_t *iface, void *self);
92 };
93 
94 #define avro_resolved_writer_calculate_size(iface) \
95 	do { \
96 		if ((iface)->calculate_size != NULL) { \
97 			(iface)->calculate_size((iface)); \
98 		} \
99 	} while (0)
100 #define avro_resolved_writer_init(iface, self) \
101 	((iface)->init == NULL? 0: (iface)->init((iface), (self)))
102 #define avro_resolved_writer_done(iface, self) \
103 	((iface)->done == NULL? (void) 0: (iface)->done((iface), (self)))
104 #define avro_resolved_writer_reset_wrappers(iface, self) \
105 	((iface)->reset_wrappers == NULL? 0: \
106 	 (iface)->reset_wrappers((iface), (self)))
107 
108 
109 /*
110  * We assume that each instance type in this value contains an an
111  * avro_value_t as its first element, which is the current wrapped
112  * value.
113  */
114 
115 void
116 avro_resolved_writer_set_dest(avro_value_t *resolved,
117 			      avro_value_t *dest)
118 {
119 	avro_value_t  *self = (avro_value_t *) resolved->self;
120 	if (self->self != NULL) {
121 		avro_value_decref(self);
122 	}
123 	avro_value_copy_ref(self, dest);
124 }
125 
126 void
127 avro_resolved_writer_clear_dest(avro_value_t *resolved)
128 {
129 	avro_value_t  *self = (avro_value_t *) resolved->self;
130 	if (self->self != NULL) {
131 		avro_value_decref(self);
132 	}
133 	self->iface = NULL;
134 	self->self = NULL;
135 }
136 
137 int
138 avro_resolved_writer_new_value(avro_value_iface_t *viface,
139 			       avro_value_t *value)
140 {
141 	int  rval;
142 	avro_resolved_writer_t  *iface =
143 	    container_of(viface, avro_resolved_writer_t, parent);
144 	void  *self = avro_malloc(iface->instance_size + sizeof(volatile int));
145 	if (self == NULL) {
146 		value->iface = NULL;
147 		value->self = NULL;
148 		return ENOMEM;
149 	}
150 
151 	memset(self, 0, iface->instance_size + sizeof(volatile int));
152 	volatile int  *refcount = (volatile int *) self;
153 	self = (char *) self + sizeof(volatile int);
154 
155 	rval = avro_resolved_writer_init(iface, self);
156 	if (rval != 0) {
157 		avro_free(self, iface->instance_size + sizeof(volatile int));
158 		value->iface = NULL;
159 		value->self = NULL;
160 		return rval;
161 	}
162 
163 	*refcount = 1;
164 	value->iface = avro_value_iface_incref(viface);
165 	value->self = self;
166 	return 0;
167 }
168 
169 static void
170 avro_resolved_writer_free_value(const avro_value_iface_t *viface, void *vself)
171 {
172 	avro_resolved_writer_t  *iface =
173 	    container_of(viface, avro_resolved_writer_t, parent);
174 	avro_value_t  *self = (avro_value_t *) vself;
175 
176 	avro_resolved_writer_done(iface, vself);
177 	if (self->self != NULL) {
178 		avro_value_decref(self);
179 	}
180 
181 	vself = (char *) vself - sizeof(volatile int);
182 	avro_free(vself, iface->instance_size + sizeof(volatile int));
183 }
184 
185 static void
186 avro_resolved_writer_incref(avro_value_t *value)
187 {
188 	/*
189 	 * This only works if you pass in the top-level value.
190 	 */
191 
192 	volatile int  *refcount = (volatile int *) ((char *) value->self - sizeof(volatile int));
193 	avro_refcount_inc(refcount);
194 }
195 
196 static void
197 avro_resolved_writer_decref(avro_value_t *value)
198 {
199 	/*
200 	 * This only works if you pass in the top-level value.
201 	 */
202 
203 	volatile int  *refcount = (volatile int *) ((char *) value->self - sizeof(volatile int));
204 	if (avro_refcount_dec(refcount)) {
205 		avro_resolved_writer_free_value(value->iface, value->self);
206 	}
207 }
208 
209 
210 static avro_value_iface_t *
211 avro_resolved_writer_incref_iface(avro_value_iface_t *viface)
212 {
213 	avro_resolved_writer_t  *iface =
214 	    container_of(viface, avro_resolved_writer_t, parent);
215 	avro_refcount_inc(&iface->refcount);
216 	return viface;
217 }
218 
219 static void
220 free_resolver(avro_resolved_writer_t *iface, st_table *freeing)
221 {
222 	/* First check if we've already started freeing this resolver. */
223 	if (st_lookup(freeing, (st_data_t) iface, NULL)) {
224 		DEBUG("Already freed %p", iface);
225 		return;
226 	}
227 
228 	/* Otherwise add this resolver to the freeing set, then free it. */
229 	st_insert(freeing, (st_data_t) iface, (st_data_t) NULL);
230 	DEBUG("Freeing resolver %p (%s->%s)", iface,
231 	      avro_schema_type_name(iface->wschema),
232 	      avro_schema_type_name(iface->rschema));
233 
234 	iface->free_iface(iface, freeing);
235 }
236 
237 static void
238 avro_resolved_writer_calculate_size_(avro_resolved_writer_t *iface)
239 {
240 	/* Only calculate the size for any resolver once */
241 	iface->calculate_size = NULL;
242 
243 	DEBUG("Calculating size for %s->%s",
244 	      avro_schema_type_name((iface)->wschema),
245 	      avro_schema_type_name((iface)->rschema));
246 	iface->instance_size = sizeof(avro_value_t);
247 }
248 
249 static void
250 avro_resolved_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing)
251 {
252 	AVRO_UNUSED(freeing);
253 	avro_schema_decref(iface->wschema);
254 	avro_schema_decref(iface->rschema);
255 	avro_freet(avro_resolved_writer_t, iface);
256 }
257 
258 static void
259 avro_resolved_writer_decref_iface(avro_value_iface_t *viface)
260 {
261 	avro_resolved_writer_t  *iface =
262 	    container_of(viface, avro_resolved_writer_t, parent);
263 	DEBUG("Decref resolver %p (before=%d)", iface, iface->refcount);
264 	if (avro_refcount_dec(&iface->refcount)) {
265 		avro_resolved_writer_t  *iface =
266 		    container_of(viface, avro_resolved_writer_t, parent);
267 
268 		st_table  *freeing = st_init_numtable();
269 		free_resolver(iface, freeing);
270 		st_free_table(freeing);
271 	}
272 }
273 
274 
275 static int
276 avro_resolved_writer_reset(const avro_value_iface_t *viface, void *vself)
277 {
278 	/*
279 	 * To reset a wrapped value, we first clear out any wrappers,
280 	 * and then have the wrapped value reset itself.
281 	 */
282 
283 	int  rval;
284 	avro_resolved_writer_t  *iface =
285 	    container_of(viface, avro_resolved_writer_t, parent);
286 	avro_value_t  *self = (avro_value_t *) vself;
287 	check(rval, avro_resolved_writer_reset_wrappers(iface, vself));
288 	return avro_value_reset(self);
289 }
290 
291 static avro_type_t
292 avro_resolved_writer_get_type(const avro_value_iface_t *viface, const void *vself)
293 {
294 	AVRO_UNUSED(vself);
295 	const avro_resolved_writer_t  *iface =
296 	    container_of(viface, avro_resolved_writer_t, parent);
297 	return avro_typeof(iface->wschema);
298 }
299 
300 static avro_schema_t
301 avro_resolved_writer_get_schema(const avro_value_iface_t *viface, const void *vself)
302 {
303 	AVRO_UNUSED(vself);
304 	avro_resolved_writer_t  *iface =
305 	    container_of(viface, avro_resolved_writer_t, parent);
306 	return iface->wschema;
307 }
308 
309 
310 static avro_resolved_writer_t *
311 avro_resolved_writer_create(avro_schema_t wschema, avro_schema_t rschema)
312 {
313 	avro_resolved_writer_t  *self = (avro_resolved_writer_t *) avro_new(avro_resolved_writer_t);
314 	memset(self, 0, sizeof(avro_resolved_writer_t));
315 
316 	self->parent.incref_iface = avro_resolved_writer_incref_iface;
317 	self->parent.decref_iface = avro_resolved_writer_decref_iface;
318 	self->parent.incref = avro_resolved_writer_incref;
319 	self->parent.decref = avro_resolved_writer_decref;
320 	self->parent.reset = avro_resolved_writer_reset;
321 	self->parent.get_type = avro_resolved_writer_get_type;
322 	self->parent.get_schema = avro_resolved_writer_get_schema;
323 
324 	self->refcount = 1;
325 	self->wschema = avro_schema_incref(wschema);
326 	self->rschema = avro_schema_incref(rschema);
327 	self->reader_union_branch = -1;
328 	self->calculate_size = avro_resolved_writer_calculate_size_;
329 	self->free_iface = avro_resolved_writer_free_iface;
330 	self->reset_wrappers = NULL;
331 	return self;
332 }
333 
334 static inline int
335 avro_resolved_writer_get_real_dest(const avro_resolved_writer_t *iface,
336 				   const avro_value_t *dest, avro_value_t *real_dest)
337 {
338 	if (iface->reader_union_branch < 0) {
339 		/*
340 		 * The reader schema isn't a union, so use the dest
341 		 * field as-is.
342 		 */
343 
344 		*real_dest = *dest;
345 		return 0;
346 	}
347 
348 	DEBUG("Retrieving union branch %d for %s value",
349 	      iface->reader_union_branch,
350 	      avro_schema_type_name(iface->wschema));
351 
352 	return avro_value_set_branch(dest, iface->reader_union_branch, real_dest);
353 }
354 
355 
356 #define skip_links(schema)					\
357 	while (is_avro_link(schema)) {				\
358 		schema = avro_schema_link_target(schema);	\
359 	}
360 
361 
362 /*-----------------------------------------------------------------------
363  * Memoized resolvers
364  */
365 
366 typedef struct avro_resolved_link_writer  avro_resolved_link_writer_t;
367 
368 typedef struct memoize_state_t {
369 	avro_memoize_t  mem;
370 	avro_resolved_link_writer_t  *links;
371 } memoize_state_t;
372 
373 static avro_resolved_writer_t *
374 avro_resolved_writer_new_memoized(memoize_state_t *state,
375 				  avro_schema_t wschema, avro_schema_t rschema);
376 
377 
378 /*-----------------------------------------------------------------------
379  * Reader unions
380  */
381 
382 /*
383  * For each Avro type, we have to check whether the reader schema on its
384  * own is compatible, and also whether the reader is a union that
385  * contains a compatible type.  The macros in this section help us
386  * perform both of these checks with less code.
387  */
388 
389 
390 /**
391  * A helper macro that handles the case where neither writer nor reader
392  * are unions.  Uses @ref check_func to see if the two schemas are
393  * compatible.
394  */
395 
396 #define check_non_union(saved, wschema, rschema, check_func) \
397 do {								\
398 	avro_resolved_writer_t  *self = NULL;			\
399 	int  rc = check_func(saved, &self, wschema, rschema,	\
400 			     rschema);				\
401 	if (self) {						\
402 		DEBUG("Non-union schemas %s (writer) "		\
403 		      "and %s (reader) match",			\
404 		      avro_schema_type_name(wschema),		\
405 		      avro_schema_type_name(rschema));		\
406 								\
407 		self->reader_union_branch = -1;			\
408 		return self;					\
409         }							\
410 								\
411         if (rc) {						\
412 		return NULL;					\
413 	}							\
414 } while (0)
415 
416 
417 /**
418  * Helper macro that handles the case where the reader is a union, and
419  * the writer is not.  Checks each branch of the reader union schema,
420  * looking for the first branch that is compatible with the writer
421  * schema.  The @ref check_func argument should be a function that can
422  * check the compatiblity of each branch schema.
423  */
424 
425 #define check_reader_union(saved, wschema, rschema, check_func)		\
426 do {									\
427 	if (!is_avro_union(rschema)) {					\
428 		break;							\
429 	}								\
430 									\
431 	DEBUG("Checking reader union schema");				\
432 	size_t  num_branches = avro_schema_union_size(rschema);		\
433 	unsigned int  i;						\
434 									\
435 	for (i = 0; i < num_branches; i++) {				\
436 		avro_schema_t  branch_schema =				\
437 		    avro_schema_union_branch(rschema, i);		\
438 		skip_links(branch_schema);				\
439 									\
440 		DEBUG("Trying branch %u %s%s%s->%s", i, \
441 		      is_avro_link(wschema)? "[": "", \
442 		      avro_schema_type_name(wschema), \
443 		      is_avro_link(wschema)? "]": "", \
444 		      avro_schema_type_name(branch_schema)); \
445 									\
446 		avro_resolved_writer_t  *self = NULL;			\
447 		int  rc = check_func(saved, &self,			\
448 				     wschema, branch_schema, rschema);	\
449 		if (self) {						\
450 			DEBUG("Reader union branch %d (%s) "		\
451 			      "and writer %s match",			\
452 			      i, avro_schema_type_name(branch_schema),	\
453 			      avro_schema_type_name(wschema));		\
454 			self->reader_union_branch = i;			\
455 			return self;					\
456 		} else {						\
457 			DEBUG("Reader union branch %d (%s) "		\
458 			      "doesn't match",				\
459 			      i, avro_schema_type_name(branch_schema));	\
460 		}							\
461 									\
462 		if (rc) {						\
463 			return NULL;					\
464 		}							\
465 	}								\
466 									\
467 	DEBUG("No reader union branches match");			\
468 } while (0)
469 
470 /**
471  * A helper macro that wraps together check_non_union and
472  * check_reader_union for a simple (non-union) writer schema type.
473  */
474 
475 #define check_simple_writer(saved, wschema, rschema, type_name)		\
476 do {									\
477 	check_non_union(saved, wschema, rschema, try_##type_name);	\
478 	check_reader_union(saved, wschema, rschema, try_##type_name);	\
479 	DEBUG("Writer %s doesn't match reader %s",			\
480 	      avro_schema_type_name(wschema),				\
481 	      avro_schema_type_name(rschema));				\
482 	avro_set_error("Cannot store " #type_name " into %s",		\
483 		       avro_schema_type_name(rschema));			\
484 	return NULL;							\
485 } while (0)
486 
487 
488 /*-----------------------------------------------------------------------
489  * Recursive schemas
490  */
491 
492 /*
493  * Recursive schemas are handled specially; the value implementation for
494  * an AVRO_LINK schema is simply a wrapper around the value
495  * implementation for the link's target schema.  The value methods all
496  * delegate to the wrapped implementation.
497  */
498 
499 struct avro_resolved_link_writer {
500 	avro_resolved_writer_t  parent;
501 
502 	/**
503 	 * A pointer to the “next” link resolver that we've had to
504 	 * create.  We use this as we're creating the overall top-level
505 	 * resolver to keep track of which ones we have to fix up
506 	 * afterwards.
507 	 */
508 	avro_resolved_link_writer_t  *next;
509 
510 	/** The target's implementation. */
511 	avro_resolved_writer_t  *target_resolver;
512 };
513 
514 typedef struct avro_resolved_link_value {
515 	avro_value_t  wrapped;
516 	avro_value_t  target;
517 } avro_resolved_link_value_t;
518 
519 static void
520 avro_resolved_link_writer_calculate_size(avro_resolved_writer_t *iface)
521 {
522 	/* Only calculate the size for any resolver once */
523 	iface->calculate_size = NULL;
524 
525 	DEBUG("Calculating size for [%s]->%s",
526 	      avro_schema_type_name((iface)->wschema),
527 	      avro_schema_type_name((iface)->rschema));
528 	iface->instance_size = sizeof(avro_resolved_link_value_t);
529 }
530 
531 static void
532 avro_resolved_link_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing)
533 {
534 	avro_resolved_link_writer_t  *liface =
535 	    container_of(iface, avro_resolved_link_writer_t, parent);
536 	if (liface->target_resolver != NULL) {
537 		free_resolver(liface->target_resolver, freeing);
538 	}
539 	avro_schema_decref(iface->wschema);
540 	avro_schema_decref(iface->rschema);
541 	avro_freet(avro_resolved_link_writer_t, iface);
542 }
543 
544 static int
545 avro_resolved_link_writer_init(const avro_resolved_writer_t *iface, void *vself)
546 {
547 	int  rval;
548 	const avro_resolved_link_writer_t  *liface =
549 	    container_of(iface, avro_resolved_link_writer_t, parent);
550 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
551 	size_t  target_instance_size = liface->target_resolver->instance_size;
552 
553 	self->target.iface = &liface->target_resolver->parent;
554 	self->target.self = avro_malloc(target_instance_size);
555 	if (self->target.self == NULL) {
556 		return ENOMEM;
557 	}
558 	DEBUG("Allocated <%p:%" PRIsz "> for link", self->target.self, target_instance_size);
559 
560 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
561 	*target_vself = self->wrapped;
562 
563 	rval = avro_resolved_writer_init(liface->target_resolver, self->target.self);
564 	if (rval != 0) {
565 		avro_free(self->target.self, target_instance_size);
566 	}
567 	return rval;
568 }
569 
570 static void
571 avro_resolved_link_writer_done(const avro_resolved_writer_t *iface, void *vself)
572 {
573 	const avro_resolved_link_writer_t  *liface =
574 	    container_of(iface, avro_resolved_link_writer_t, parent);
575 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
576 	size_t  target_instance_size = liface->target_resolver->instance_size;
577 	DEBUG("Freeing <%p:%" PRIsz "> for link", self->target.self, target_instance_size);
578 	avro_resolved_writer_done(liface->target_resolver, self->target.self);
579 	avro_free(self->target.self, target_instance_size);
580 	self->target.iface = NULL;
581 	self->target.self = NULL;
582 }
583 
584 static int
585 avro_resolved_link_writer_reset(const avro_resolved_writer_t *iface, void *vself)
586 {
587 	const avro_resolved_link_writer_t  *liface =
588 	    container_of(iface, avro_resolved_link_writer_t, parent);
589 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
590 	return avro_resolved_writer_reset_wrappers
591 	    (liface->target_resolver, self->target.self);
592 }
593 
594 static avro_type_t
595 avro_resolved_link_writer_get_type(const avro_value_iface_t *iface, const void *vself)
596 {
597 	AVRO_UNUSED(iface);
598 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
599 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
600 	*target_vself = self->wrapped;
601 	return avro_value_get_type(&self->target);
602 }
603 
604 static avro_schema_t
605 avro_resolved_link_writer_get_schema(const avro_value_iface_t *iface, const void *vself)
606 {
607 	AVRO_UNUSED(iface);
608 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
609 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
610 	*target_vself = self->wrapped;
611 	return avro_value_get_schema(&self->target);
612 }
613 
614 static int
615 avro_resolved_link_writer_get_boolean(const avro_value_iface_t *iface,
616 				      const void *vself, int *out)
617 {
618 	AVRO_UNUSED(iface);
619 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
620 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
621 	*target_vself = self->wrapped;
622 	return avro_value_get_boolean(&self->target, out);
623 }
624 
625 static int
626 avro_resolved_link_writer_get_bytes(const avro_value_iface_t *iface,
627 				    const void *vself, const void **buf, size_t *size)
628 {
629 	AVRO_UNUSED(iface);
630 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
631 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
632 	*target_vself = self->wrapped;
633 	return avro_value_get_bytes(&self->target, buf, size);
634 }
635 
636 static int
637 avro_resolved_link_writer_grab_bytes(const avro_value_iface_t *iface,
638 				     const void *vself, avro_wrapped_buffer_t *dest)
639 {
640 	AVRO_UNUSED(iface);
641 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
642 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
643 	*target_vself = self->wrapped;
644 	return avro_value_grab_bytes(&self->target, dest);
645 }
646 
647 static int
648 avro_resolved_link_writer_get_double(const avro_value_iface_t *iface,
649 				     const void *vself, double *out)
650 {
651 	AVRO_UNUSED(iface);
652 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
653 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
654 	*target_vself = self->wrapped;
655 	return avro_value_get_double(&self->target, out);
656 }
657 
658 static int
659 avro_resolved_link_writer_get_float(const avro_value_iface_t *iface,
660 				    const void *vself, float *out)
661 {
662 	AVRO_UNUSED(iface);
663 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
664 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
665 	*target_vself = self->wrapped;
666 	return avro_value_get_float(&self->target, out);
667 }
668 
669 static int
670 avro_resolved_link_writer_get_int(const avro_value_iface_t *iface,
671 				  const void *vself, int32_t *out)
672 {
673 	AVRO_UNUSED(iface);
674 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
675 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
676 	*target_vself = self->wrapped;
677 	return avro_value_get_int(&self->target, out);
678 }
679 
680 static int
681 avro_resolved_link_writer_get_long(const avro_value_iface_t *iface,
682 				   const void *vself, int64_t *out)
683 {
684 	AVRO_UNUSED(iface);
685 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
686 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
687 	*target_vself = self->wrapped;
688 	return avro_value_get_long(&self->target, out);
689 }
690 
691 static int
692 avro_resolved_link_writer_get_null(const avro_value_iface_t *iface, const void *vself)
693 {
694 	AVRO_UNUSED(iface);
695 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
696 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
697 	*target_vself = self->wrapped;
698 	return avro_value_get_null(&self->target);
699 }
700 
701 static int
702 avro_resolved_link_writer_get_string(const avro_value_iface_t *iface,
703 				     const void *vself, const char **str, size_t *size)
704 {
705 	AVRO_UNUSED(iface);
706 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
707 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
708 	*target_vself = self->wrapped;
709 	return avro_value_get_string(&self->target, str, size);
710 }
711 
712 static int
713 avro_resolved_link_writer_grab_string(const avro_value_iface_t *iface,
714 				      const void *vself, avro_wrapped_buffer_t *dest)
715 {
716 	AVRO_UNUSED(iface);
717 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
718 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
719 	*target_vself = self->wrapped;
720 	return avro_value_grab_string(&self->target, dest);
721 }
722 
723 static int
724 avro_resolved_link_writer_get_enum(const avro_value_iface_t *iface,
725 				   const void *vself, int *out)
726 {
727 	AVRO_UNUSED(iface);
728 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
729 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
730 	*target_vself = self->wrapped;
731 	return avro_value_get_enum(&self->target, out);
732 }
733 
734 static int
735 avro_resolved_link_writer_get_fixed(const avro_value_iface_t *iface,
736 				    const void *vself, const void **buf, size_t *size)
737 {
738 	AVRO_UNUSED(iface);
739 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
740 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
741 	*target_vself = self->wrapped;
742 	return avro_value_get_fixed(&self->target, buf, size);
743 }
744 
745 static int
746 avro_resolved_link_writer_grab_fixed(const avro_value_iface_t *iface,
747 				     const void *vself, avro_wrapped_buffer_t *dest)
748 {
749 	AVRO_UNUSED(iface);
750 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
751 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
752 	*target_vself = self->wrapped;
753 	return avro_value_grab_fixed(&self->target, dest);
754 }
755 
756 static int
757 avro_resolved_link_writer_set_boolean(const avro_value_iface_t *iface,
758 				      void *vself, int val)
759 {
760 	AVRO_UNUSED(iface);
761 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
762 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
763 	*target_vself = self->wrapped;
764 	return avro_value_set_boolean(&self->target, val);
765 }
766 
767 static int
768 avro_resolved_link_writer_set_bytes(const avro_value_iface_t *iface,
769 				    void *vself, void *buf, size_t size)
770 {
771 	AVRO_UNUSED(iface);
772 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
773 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
774 	*target_vself = self->wrapped;
775 	return avro_value_set_bytes(&self->target, buf, size);
776 }
777 
778 static int
779 avro_resolved_link_writer_give_bytes(const avro_value_iface_t *iface,
780 				     void *vself, avro_wrapped_buffer_t *buf)
781 {
782 	AVRO_UNUSED(iface);
783 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
784 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
785 	*target_vself = self->wrapped;
786 	return avro_value_give_bytes(&self->target, buf);
787 }
788 
789 static int
790 avro_resolved_link_writer_set_double(const avro_value_iface_t *iface,
791 				     void *vself, double val)
792 {
793 	AVRO_UNUSED(iface);
794 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
795 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
796 	*target_vself = self->wrapped;
797 	return avro_value_set_double(&self->target, val);
798 }
799 
800 static int
801 avro_resolved_link_writer_set_float(const avro_value_iface_t *iface,
802 				    void *vself, float val)
803 {
804 	AVRO_UNUSED(iface);
805 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
806 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
807 	*target_vself = self->wrapped;
808 	return avro_value_set_float(&self->target, val);
809 }
810 
811 static int
812 avro_resolved_link_writer_set_int(const avro_value_iface_t *iface,
813 				  void *vself, int32_t val)
814 {
815 	AVRO_UNUSED(iface);
816 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
817 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
818 	*target_vself = self->wrapped;
819 	return avro_value_set_int(&self->target, val);
820 }
821 
822 static int
823 avro_resolved_link_writer_set_long(const avro_value_iface_t *iface,
824 				   void *vself, int64_t val)
825 {
826 	AVRO_UNUSED(iface);
827 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
828 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
829 	*target_vself = self->wrapped;
830 	return avro_value_set_long(&self->target, val);
831 }
832 
833 static int
834 avro_resolved_link_writer_set_null(const avro_value_iface_t *iface, void *vself)
835 {
836 	AVRO_UNUSED(iface);
837 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
838 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
839 	*target_vself = self->wrapped;
840 	return avro_value_set_null(&self->target);
841 }
842 
843 static int
844 avro_resolved_link_writer_set_string(const avro_value_iface_t *iface,
845 				     void *vself, const char *str)
846 {
847 	AVRO_UNUSED(iface);
848 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
849 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
850 	*target_vself = self->wrapped;
851 	return avro_value_set_string(&self->target, str);
852 }
853 
854 static int
855 avro_resolved_link_writer_set_string_len(const avro_value_iface_t *iface,
856 					 void *vself, const char *str, size_t size)
857 {
858 	AVRO_UNUSED(iface);
859 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
860 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
861 	*target_vself = self->wrapped;
862 	return avro_value_set_string_len(&self->target, str, size);
863 }
864 
865 static int
866 avro_resolved_link_writer_give_string_len(const avro_value_iface_t *iface,
867 					  void *vself, avro_wrapped_buffer_t *buf)
868 {
869 	AVRO_UNUSED(iface);
870 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
871 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
872 	*target_vself = self->wrapped;
873 	return avro_value_give_string_len(&self->target, buf);
874 }
875 
876 static int
877 avro_resolved_link_writer_set_enum(const avro_value_iface_t *iface,
878 				   void *vself, int val)
879 {
880 	AVRO_UNUSED(iface);
881 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
882 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
883 	*target_vself = self->wrapped;
884 	return avro_value_set_enum(&self->target, val);
885 }
886 
887 static int
888 avro_resolved_link_writer_set_fixed(const avro_value_iface_t *iface,
889 				    void *vself, void *buf, size_t size)
890 {
891 	AVRO_UNUSED(iface);
892 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
893 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
894 	*target_vself = self->wrapped;
895 	return avro_value_set_fixed(&self->target, buf, size);
896 }
897 
898 static int
899 avro_resolved_link_writer_give_fixed(const avro_value_iface_t *iface,
900 				     void *vself, avro_wrapped_buffer_t *buf)
901 {
902 	AVRO_UNUSED(iface);
903 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
904 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
905 	*target_vself = self->wrapped;
906 	return avro_value_give_fixed(&self->target, buf);
907 }
908 
909 static int
910 avro_resolved_link_writer_get_size(const avro_value_iface_t *iface,
911 				   const void *vself, size_t *size)
912 {
913 	AVRO_UNUSED(iface);
914 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
915 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
916 	*target_vself = self->wrapped;
917 	return avro_value_get_size(&self->target, size);
918 }
919 
920 static int
921 avro_resolved_link_writer_get_by_index(const avro_value_iface_t *iface,
922 				       const void *vself, size_t index,
923 				       avro_value_t *child, const char **name)
924 {
925 	AVRO_UNUSED(iface);
926 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
927 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
928 	*target_vself = self->wrapped;
929 	return avro_value_get_by_index(&self->target, index, child, name);
930 }
931 
932 static int
933 avro_resolved_link_writer_get_by_name(const avro_value_iface_t *iface,
934 				      const void *vself, const char *name,
935 				      avro_value_t *child, size_t *index)
936 {
937 	AVRO_UNUSED(iface);
938 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
939 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
940 	*target_vself = self->wrapped;
941 	return avro_value_get_by_name(&self->target, name, child, index);
942 }
943 
944 static int
945 avro_resolved_link_writer_get_discriminant(const avro_value_iface_t *iface,
946 					   const void *vself, int *out)
947 {
948 	AVRO_UNUSED(iface);
949 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
950 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
951 	*target_vself = self->wrapped;
952 	return avro_value_get_discriminant(&self->target, out);
953 }
954 
955 static int
956 avro_resolved_link_writer_get_current_branch(const avro_value_iface_t *iface,
957 					     const void *vself, avro_value_t *branch)
958 {
959 	AVRO_UNUSED(iface);
960 	const avro_resolved_link_value_t  *self = (const avro_resolved_link_value_t *) vself;
961 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
962 	*target_vself = self->wrapped;
963 	return avro_value_get_current_branch(&self->target, branch);
964 }
965 
966 static int
967 avro_resolved_link_writer_append(const avro_value_iface_t *iface,
968 				 void *vself, avro_value_t *child_out,
969 				 size_t *new_index)
970 {
971 	AVRO_UNUSED(iface);
972 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
973 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
974 	*target_vself = self->wrapped;
975 	return avro_value_append(&self->target, child_out, new_index);
976 }
977 
978 static int
979 avro_resolved_link_writer_add(const avro_value_iface_t *iface,
980 			      void *vself, const char *key,
981 			      avro_value_t *child, size_t *index, int *is_new)
982 {
983 	AVRO_UNUSED(iface);
984 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
985 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
986 	*target_vself = self->wrapped;
987 	return avro_value_add(&self->target, key, child, index, is_new);
988 }
989 
990 static int
991 avro_resolved_link_writer_set_branch(const avro_value_iface_t *iface,
992 				     void *vself, int discriminant,
993 				     avro_value_t *branch)
994 {
995 	AVRO_UNUSED(iface);
996 	avro_resolved_link_value_t  *self = (avro_resolved_link_value_t *) vself;
997 	avro_value_t  *target_vself = (avro_value_t *) self->target.self;
998 	*target_vself = self->wrapped;
999 	return avro_value_set_branch(&self->target, discriminant, branch);
1000 }
1001 
1002 static avro_resolved_link_writer_t *
1003 avro_resolved_link_writer_create(avro_schema_t wschema, avro_schema_t rschema)
1004 {
1005 	avro_resolved_writer_t  *self = (avro_resolved_writer_t *) avro_new(avro_resolved_link_writer_t);
1006 	memset(self, 0, sizeof(avro_resolved_link_writer_t));
1007 
1008 	self->parent.incref_iface = avro_resolved_writer_incref_iface;
1009 	self->parent.decref_iface = avro_resolved_writer_decref_iface;
1010 	self->parent.incref = avro_resolved_writer_incref;
1011 	self->parent.decref = avro_resolved_writer_decref;
1012 	self->parent.reset = avro_resolved_writer_reset;
1013 	self->parent.get_type = avro_resolved_link_writer_get_type;
1014 	self->parent.get_schema = avro_resolved_link_writer_get_schema;
1015 	self->parent.get_size = avro_resolved_link_writer_get_size;
1016 	self->parent.get_by_index = avro_resolved_link_writer_get_by_index;
1017 	self->parent.get_by_name = avro_resolved_link_writer_get_by_name;
1018 
1019 	self->refcount = 1;
1020 	self->wschema = avro_schema_incref(wschema);
1021 	self->rschema = avro_schema_incref(rschema);
1022 	self->reader_union_branch = -1;
1023 	self->calculate_size = avro_resolved_link_writer_calculate_size;
1024 	self->free_iface = avro_resolved_link_writer_free_iface;
1025 	self->init = avro_resolved_link_writer_init;
1026 	self->done = avro_resolved_link_writer_done;
1027 	self->reset_wrappers = avro_resolved_link_writer_reset;
1028 
1029 	self->parent.get_boolean = avro_resolved_link_writer_get_boolean;
1030 	self->parent.get_bytes = avro_resolved_link_writer_get_bytes;
1031 	self->parent.grab_bytes = avro_resolved_link_writer_grab_bytes;
1032 	self->parent.get_double = avro_resolved_link_writer_get_double;
1033 	self->parent.get_float = avro_resolved_link_writer_get_float;
1034 	self->parent.get_int = avro_resolved_link_writer_get_int;
1035 	self->parent.get_long = avro_resolved_link_writer_get_long;
1036 	self->parent.get_null = avro_resolved_link_writer_get_null;
1037 	self->parent.get_string = avro_resolved_link_writer_get_string;
1038 	self->parent.grab_string = avro_resolved_link_writer_grab_string;
1039 	self->parent.get_enum = avro_resolved_link_writer_get_enum;
1040 	self->parent.get_fixed = avro_resolved_link_writer_get_fixed;
1041 	self->parent.grab_fixed = avro_resolved_link_writer_grab_fixed;
1042 
1043 	self->parent.set_boolean = avro_resolved_link_writer_set_boolean;
1044 	self->parent.set_bytes = avro_resolved_link_writer_set_bytes;
1045 	self->parent.give_bytes = avro_resolved_link_writer_give_bytes;
1046 	self->parent.set_double = avro_resolved_link_writer_set_double;
1047 	self->parent.set_float = avro_resolved_link_writer_set_float;
1048 	self->parent.set_int = avro_resolved_link_writer_set_int;
1049 	self->parent.set_long = avro_resolved_link_writer_set_long;
1050 	self->parent.set_null = avro_resolved_link_writer_set_null;
1051 	self->parent.set_string = avro_resolved_link_writer_set_string;
1052 	self->parent.set_string_len = avro_resolved_link_writer_set_string_len;
1053 	self->parent.give_string_len = avro_resolved_link_writer_give_string_len;
1054 	self->parent.set_enum = avro_resolved_link_writer_set_enum;
1055 	self->parent.set_fixed = avro_resolved_link_writer_set_fixed;
1056 	self->parent.give_fixed = avro_resolved_link_writer_give_fixed;
1057 
1058 	self->parent.get_size = avro_resolved_link_writer_get_size;
1059 	self->parent.get_by_index = avro_resolved_link_writer_get_by_index;
1060 	self->parent.get_by_name = avro_resolved_link_writer_get_by_name;
1061 	self->parent.get_discriminant = avro_resolved_link_writer_get_discriminant;
1062 	self->parent.get_current_branch = avro_resolved_link_writer_get_current_branch;
1063 
1064 	self->parent.append = avro_resolved_link_writer_append;
1065 	self->parent.add = avro_resolved_link_writer_add;
1066 	self->parent.set_branch = avro_resolved_link_writer_set_branch;
1067 
1068 	return container_of(self, avro_resolved_link_writer_t, parent);
1069 }
1070 
1071 static int
1072 try_link(memoize_state_t *state, avro_resolved_writer_t **self,
1073 	 avro_schema_t wschema, avro_schema_t rschema,
1074 	 avro_schema_t root_rschema)
1075 {
1076 	AVRO_UNUSED(rschema);
1077 
1078 	/*
1079 	 * For link schemas, we create a special value implementation
1080 	 * that allocates space for its wrapped value at runtime.  This
1081 	 * lets us handle recursive types without having to instantiate
1082 	 * in infinite-size value.
1083 	 */
1084 
1085 	avro_schema_t  wtarget = avro_schema_link_target(wschema);
1086 	avro_resolved_link_writer_t  *lself =
1087 	    avro_resolved_link_writer_create(wtarget, root_rschema);
1088 	avro_memoize_set(&state->mem, wschema, root_rschema, lself);
1089 
1090 	avro_resolved_writer_t  *target_resolver =
1091 	    avro_resolved_writer_new_memoized(state, wtarget, rschema);
1092 	if (target_resolver == NULL) {
1093 		avro_memoize_delete(&state->mem, wschema, root_rschema);
1094 		avro_value_iface_decref(&lself->parent.parent);
1095 		avro_prefix_error("Link target isn't compatible: ");
1096 		DEBUG("%s", avro_strerror());
1097 		return EINVAL;
1098 	}
1099 
1100 	lself->target_resolver = target_resolver;
1101 	lself->next = state->links;
1102 	state->links = lself;
1103 
1104 	*self = &lself->parent;
1105 	return 0;
1106 }
1107 
1108 
1109 /*-----------------------------------------------------------------------
1110  * boolean
1111  */
1112 
1113 static int
1114 avro_resolved_writer_set_boolean(const avro_value_iface_t *viface,
1115 				 void *vself, int val)
1116 {
1117 	int  rval;
1118 	const avro_resolved_writer_t  *iface =
1119 	    container_of(viface, avro_resolved_writer_t, parent);
1120 	avro_value_t  *self = (avro_value_t *) vself;
1121 	avro_value_t  dest;
1122 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1123 	DEBUG("Storing %s into %p", val? "TRUE": "FALSE", dest.self);
1124 	return avro_value_set_boolean(&dest, val);
1125 }
1126 
1127 static int
1128 try_boolean(memoize_state_t *state, avro_resolved_writer_t **self,
1129 	    avro_schema_t wschema, avro_schema_t rschema,
1130 	    avro_schema_t root_rschema)
1131 {
1132 	if (is_avro_boolean(rschema)) {
1133 		*self = avro_resolved_writer_create(wschema, root_rschema);
1134 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1135 		(*self)->parent.set_boolean = avro_resolved_writer_set_boolean;
1136 	}
1137 	return 0;
1138 }
1139 
1140 
1141 /*-----------------------------------------------------------------------
1142  * bytes
1143  */
1144 
1145 static int
1146 avro_resolved_writer_set_bytes(const avro_value_iface_t *viface,
1147 			       void *vself, void *buf, size_t size)
1148 {
1149 	int  rval;
1150 	const avro_resolved_writer_t  *iface =
1151 	    container_of(viface, avro_resolved_writer_t, parent);
1152 	avro_value_t  *self = (avro_value_t *) vself;
1153 	avro_value_t  dest;
1154 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1155 	DEBUG("Storing <%p:%" PRIsz "> into %p", buf, size, dest.self);
1156 	return avro_value_set_bytes(&dest, buf, size);
1157 }
1158 
1159 static int
1160 avro_resolved_writer_give_bytes(const avro_value_iface_t *viface,
1161 				void *vself, avro_wrapped_buffer_t *buf)
1162 {
1163 	int  rval;
1164 	const avro_resolved_writer_t  *iface =
1165 	    container_of(viface, avro_resolved_writer_t, parent);
1166 	avro_value_t  *self = (avro_value_t *) vself;
1167 	avro_value_t  dest;
1168 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1169 	DEBUG("Storing [%p] into %p", buf, dest.self);
1170 	return avro_value_give_bytes(&dest, buf);
1171 }
1172 
1173 static int
1174 try_bytes(memoize_state_t *state, avro_resolved_writer_t **self,
1175 	  avro_schema_t wschema, avro_schema_t rschema,
1176 	  avro_schema_t root_rschema)
1177 {
1178 	if (is_avro_bytes(rschema)) {
1179 		*self = avro_resolved_writer_create(wschema, root_rschema);
1180 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1181 		(*self)->parent.set_bytes = avro_resolved_writer_set_bytes;
1182 		(*self)->parent.give_bytes = avro_resolved_writer_give_bytes;
1183 	}
1184 	return 0;
1185 }
1186 
1187 
1188 /*-----------------------------------------------------------------------
1189  * double
1190  */
1191 
1192 static int
1193 avro_resolved_writer_set_double(const avro_value_iface_t *viface,
1194 				void *vself, double val)
1195 {
1196 	int  rval;
1197 	const avro_resolved_writer_t  *iface =
1198 	    container_of(viface, avro_resolved_writer_t, parent);
1199 	avro_value_t  *self = (avro_value_t *) vself;
1200 	avro_value_t  dest;
1201 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1202 	DEBUG("Storing %le into %p", val, dest.self);
1203 	return avro_value_set_double(&dest, val);
1204 }
1205 
1206 static int
1207 try_double(memoize_state_t *state, avro_resolved_writer_t **self,
1208 	   avro_schema_t wschema, avro_schema_t rschema,
1209 	   avro_schema_t root_rschema)
1210 {
1211 	if (is_avro_double(rschema)) {
1212 		*self = avro_resolved_writer_create(wschema, root_rschema);
1213 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1214 		(*self)->parent.set_double = avro_resolved_writer_set_double;
1215 	}
1216 	return 0;
1217 }
1218 
1219 
1220 /*-----------------------------------------------------------------------
1221  * float
1222  */
1223 
1224 static int
1225 avro_resolved_writer_set_float(const avro_value_iface_t *viface,
1226 			       void *vself, float val)
1227 {
1228 	int  rval;
1229 	const avro_resolved_writer_t  *iface =
1230 	    container_of(viface, avro_resolved_writer_t, parent);
1231 	avro_value_t  *self = (avro_value_t *) vself;
1232 	avro_value_t  dest;
1233 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1234 	DEBUG("Storing %e into %p", val, dest.self);
1235 	return avro_value_set_float(&dest, val);
1236 }
1237 
1238 static int
1239 avro_resolved_writer_set_float_double(const avro_value_iface_t *viface,
1240 				      void *vself, float val)
1241 {
1242 	int  rval;
1243 	const avro_resolved_writer_t  *iface =
1244 	    container_of(viface, avro_resolved_writer_t, parent);
1245 	avro_value_t  *self = (avro_value_t *) vself;
1246 	avro_value_t  dest;
1247 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1248 	DEBUG("Promoting float %e into double %p", val, dest.self);
1249 	return avro_value_set_double(&dest, val);
1250 }
1251 
1252 static int
1253 try_float(memoize_state_t *state, avro_resolved_writer_t **self,
1254 	  avro_schema_t wschema, avro_schema_t rschema,
1255 	  avro_schema_t root_rschema)
1256 {
1257 	if (is_avro_float(rschema)) {
1258 		*self = avro_resolved_writer_create(wschema, root_rschema);
1259 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1260 		(*self)->parent.set_float = avro_resolved_writer_set_float;
1261 	}
1262 
1263 	else if (is_avro_double(rschema)) {
1264 		*self = avro_resolved_writer_create(wschema, root_rschema);
1265 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1266 		(*self)->parent.set_float = avro_resolved_writer_set_float_double;
1267 	}
1268 
1269 	return 0;
1270 }
1271 
1272 
1273 /*-----------------------------------------------------------------------
1274  * int
1275  */
1276 
1277 static int
1278 avro_resolved_writer_set_int(const avro_value_iface_t *viface,
1279 			     void *vself, int32_t val)
1280 {
1281 	int  rval;
1282 	const avro_resolved_writer_t  *iface =
1283 	    container_of(viface, avro_resolved_writer_t, parent);
1284 	avro_value_t  *self = (avro_value_t *) vself;
1285 	avro_value_t  dest;
1286 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1287 	DEBUG("Storing %" PRId32 " into %p", val, dest.self);
1288 	return avro_value_set_int(&dest, val);
1289 }
1290 
1291 static int
1292 avro_resolved_writer_set_int_double(const avro_value_iface_t *viface,
1293 				    void *vself, int32_t val)
1294 {
1295 	int  rval;
1296 	const avro_resolved_writer_t  *iface =
1297 	    container_of(viface, avro_resolved_writer_t, parent);
1298 	avro_value_t  *self = (avro_value_t *) vself;
1299 	avro_value_t  dest;
1300 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1301 	DEBUG("Promoting int %" PRId32 " into double %p", val, dest.self);
1302 	return avro_value_set_double(&dest, val);
1303 }
1304 
1305 static int
1306 avro_resolved_writer_set_int_float(const avro_value_iface_t *viface,
1307 				   void *vself, int32_t val)
1308 {
1309 	int  rval;
1310 	const avro_resolved_writer_t  *iface =
1311 	    container_of(viface, avro_resolved_writer_t, parent);
1312 	avro_value_t  *self = (avro_value_t *) vself;
1313 	avro_value_t  dest;
1314 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1315 	DEBUG("Promoting int %" PRId32 " into float %p", val, dest.self);
1316 	return avro_value_set_float(&dest, (float) val);
1317 }
1318 
1319 static int
1320 avro_resolved_writer_set_int_long(const avro_value_iface_t *viface,
1321 				  void *vself, int32_t val)
1322 {
1323 	int  rval;
1324 	const avro_resolved_writer_t  *iface =
1325 	    container_of(viface, avro_resolved_writer_t, parent);
1326 	avro_value_t  *self = (avro_value_t *) vself;
1327 	avro_value_t  dest;
1328 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1329 	DEBUG("Promoting int %" PRId32 " into long %p", val, dest.self);
1330 	return avro_value_set_long(&dest, val);
1331 }
1332 
1333 static int
1334 try_int(memoize_state_t *state, avro_resolved_writer_t **self,
1335 	avro_schema_t wschema, avro_schema_t rschema,
1336 	avro_schema_t root_rschema)
1337 {
1338 	if (is_avro_int32(rschema)) {
1339 		*self = avro_resolved_writer_create(wschema, root_rschema);
1340 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1341 		(*self)->parent.set_int = avro_resolved_writer_set_int;
1342 	}
1343 
1344 	else if (is_avro_int64(rschema)) {
1345 		*self = avro_resolved_writer_create(wschema, root_rschema);
1346 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1347 		(*self)->parent.set_int = avro_resolved_writer_set_int_long;
1348 	}
1349 
1350 	else if (is_avro_double(rschema)) {
1351 		*self = avro_resolved_writer_create(wschema, root_rschema);
1352 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1353 		(*self)->parent.set_int = avro_resolved_writer_set_int_double;
1354 	}
1355 
1356 	else if (is_avro_float(rschema)) {
1357 		*self = avro_resolved_writer_create(wschema, root_rschema);
1358 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1359 		(*self)->parent.set_int = avro_resolved_writer_set_int_float;
1360 	}
1361 
1362 	return 0;
1363 }
1364 
1365 
1366 /*-----------------------------------------------------------------------
1367  * long
1368  */
1369 
1370 static int
1371 avro_resolved_writer_set_long(const avro_value_iface_t *viface,
1372 			      void *vself, int64_t val)
1373 {
1374 	int  rval;
1375 	const avro_resolved_writer_t  *iface =
1376 	    container_of(viface, avro_resolved_writer_t, parent);
1377 	avro_value_t  *self = (avro_value_t *) vself;
1378 	avro_value_t  dest;
1379 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1380 	DEBUG("Storing %" PRId64 " into %p", val, dest.self);
1381 	return avro_value_set_long(&dest, val);
1382 }
1383 
1384 static int
1385 avro_resolved_writer_set_long_double(const avro_value_iface_t *viface,
1386 				     void *vself, int64_t val)
1387 {
1388 	int  rval;
1389 	const avro_resolved_writer_t  *iface =
1390 	    container_of(viface, avro_resolved_writer_t, parent);
1391 	avro_value_t  *self = (avro_value_t *) vself;
1392 	avro_value_t  dest;
1393 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1394 	DEBUG("Promoting long %" PRId64 " into double %p", val, dest.self);
1395 	return avro_value_set_double(&dest, (double) val);
1396 }
1397 
1398 static int
1399 avro_resolved_writer_set_long_float(const avro_value_iface_t *viface,
1400 				    void *vself, int64_t val)
1401 {
1402 	int  rval;
1403 	const avro_resolved_writer_t  *iface =
1404 	    container_of(viface, avro_resolved_writer_t, parent);
1405 	avro_value_t  *self = (avro_value_t *) vself;
1406 	avro_value_t  dest;
1407 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1408 	DEBUG("Promoting long %" PRId64 " into float %p", val, dest.self);
1409 	return avro_value_set_float(&dest, (float) val);
1410 }
1411 
1412 static int
1413 try_long(memoize_state_t *state, avro_resolved_writer_t **self,
1414 	 avro_schema_t wschema, avro_schema_t rschema,
1415 	 avro_schema_t root_rschema)
1416 {
1417 	if (is_avro_int64(rschema)) {
1418 		*self = avro_resolved_writer_create(wschema, root_rschema);
1419 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1420 		(*self)->parent.set_long = avro_resolved_writer_set_long;
1421 	}
1422 
1423 	else if (is_avro_double(rschema)) {
1424 		*self = avro_resolved_writer_create(wschema, root_rschema);
1425 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1426 		(*self)->parent.set_long = avro_resolved_writer_set_long_double;
1427 	}
1428 
1429 	else if (is_avro_float(rschema)) {
1430 		*self = avro_resolved_writer_create(wschema, root_rschema);
1431 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1432 		(*self)->parent.set_long = avro_resolved_writer_set_long_float;
1433 	}
1434 
1435 	return 0;
1436 }
1437 
1438 
1439 /*-----------------------------------------------------------------------
1440  * null
1441  */
1442 
1443 static int
1444 avro_resolved_writer_set_null(const avro_value_iface_t *viface,
1445 			      void *vself)
1446 {
1447 	int  rval;
1448 	const avro_resolved_writer_t  *iface =
1449 	    container_of(viface, avro_resolved_writer_t, parent);
1450 	avro_value_t  *self = (avro_value_t *) vself;
1451 	avro_value_t  dest;
1452 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1453 	DEBUG("Storing NULL into %p", dest.self);
1454 	return avro_value_set_null(&dest);
1455 }
1456 
1457 static int
1458 try_null(memoize_state_t *state, avro_resolved_writer_t **self,
1459 	 avro_schema_t wschema, avro_schema_t rschema,
1460 	 avro_schema_t root_rschema)
1461 {
1462 	if (is_avro_null(rschema)) {
1463 		*self = avro_resolved_writer_create(wschema, root_rschema);
1464 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1465 		(*self)->parent.set_null = avro_resolved_writer_set_null;
1466 	}
1467 	return 0;
1468 }
1469 
1470 
1471 /*-----------------------------------------------------------------------
1472  * string
1473  */
1474 
1475 static int
1476 avro_resolved_writer_set_string(const avro_value_iface_t *viface,
1477 				void *vself, const char *str)
1478 {
1479 	int  rval;
1480 	const avro_resolved_writer_t  *iface =
1481 	    container_of(viface, avro_resolved_writer_t, parent);
1482 	avro_value_t  *self = (avro_value_t *) vself;
1483 	avro_value_t  dest;
1484 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1485 	DEBUG("Storing \"%s\" into %p", str, dest.self);
1486 	return avro_value_set_string(&dest, str);
1487 }
1488 
1489 static int
1490 avro_resolved_writer_set_string_len(const avro_value_iface_t *viface,
1491 				    void *vself, const char *str, size_t size)
1492 {
1493 	int  rval;
1494 	const avro_resolved_writer_t  *iface =
1495 	    container_of(viface, avro_resolved_writer_t, parent);
1496 	avro_value_t  *self = (avro_value_t *) vself;
1497 	avro_value_t  dest;
1498 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1499 	DEBUG("Storing <%p:%" PRIsz "> into %p", str, size, dest.self);
1500 	return avro_value_set_string_len(&dest, str, size);
1501 }
1502 
1503 static int
1504 avro_resolved_writer_give_string_len(const avro_value_iface_t *viface,
1505 				     void *vself, avro_wrapped_buffer_t *buf)
1506 {
1507 	int  rval;
1508 	const avro_resolved_writer_t  *iface =
1509 	    container_of(viface, avro_resolved_writer_t, parent);
1510 	avro_value_t  *self = (avro_value_t *) vself;
1511 	avro_value_t  dest;
1512 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1513 	DEBUG("Storing [%p] into %p", buf, dest.self);
1514 	return avro_value_give_string_len(&dest, buf);
1515 }
1516 
1517 static int
1518 try_string(memoize_state_t *state, avro_resolved_writer_t **self,
1519 	   avro_schema_t wschema, avro_schema_t rschema,
1520 	   avro_schema_t root_rschema)
1521 {
1522 	if (is_avro_string(rschema)) {
1523 		*self = avro_resolved_writer_create(wschema, root_rschema);
1524 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1525 		(*self)->parent.set_string = avro_resolved_writer_set_string;
1526 		(*self)->parent.set_string_len = avro_resolved_writer_set_string_len;
1527 		(*self)->parent.give_string_len = avro_resolved_writer_give_string_len;
1528 	}
1529 	return 0;
1530 }
1531 
1532 
1533 /*-----------------------------------------------------------------------
1534  * array
1535  */
1536 
1537 typedef struct avro_resolved_array_writer {
1538 	avro_resolved_writer_t  parent;
1539 	avro_resolved_writer_t  *child_resolver;
1540 } avro_resolved_array_writer_t;
1541 
1542 typedef struct avro_resolved_array_value {
1543 	avro_value_t  wrapped;
1544 	avro_raw_array_t  children;
1545 } avro_resolved_array_value_t;
1546 
1547 static void
1548 avro_resolved_array_writer_calculate_size(avro_resolved_writer_t *iface)
1549 {
1550 	avro_resolved_array_writer_t  *aiface =
1551 	    container_of(iface, avro_resolved_array_writer_t, parent);
1552 
1553 	/* Only calculate the size for any resolver once */
1554 	iface->calculate_size = NULL;
1555 
1556 	DEBUG("Calculating size for %s->%s",
1557 	      avro_schema_type_name((iface)->wschema),
1558 	      avro_schema_type_name((iface)->rschema));
1559 	iface->instance_size = sizeof(avro_resolved_array_value_t);
1560 
1561 	avro_resolved_writer_calculate_size(aiface->child_resolver);
1562 }
1563 
1564 static void
1565 avro_resolved_array_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing)
1566 {
1567 	avro_resolved_array_writer_t  *aiface =
1568 	    container_of(iface, avro_resolved_array_writer_t, parent);
1569 	free_resolver(aiface->child_resolver, freeing);
1570 	avro_schema_decref(iface->wschema);
1571 	avro_schema_decref(iface->rschema);
1572 	avro_freet(avro_resolved_array_writer_t, iface);
1573 }
1574 
1575 static int
1576 avro_resolved_array_writer_init(const avro_resolved_writer_t *iface, void *vself)
1577 {
1578 	const avro_resolved_array_writer_t  *aiface =
1579 	    container_of(iface, avro_resolved_array_writer_t, parent);
1580 	avro_resolved_array_value_t  *self = (avro_resolved_array_value_t *) vself;
1581 	size_t  child_instance_size = aiface->child_resolver->instance_size;
1582 	DEBUG("Initializing child array (child_size=%" PRIsz ")", child_instance_size);
1583 	avro_raw_array_init(&self->children, child_instance_size);
1584 	return 0;
1585 }
1586 
1587 static void
1588 avro_resolved_array_writer_free_elements(const avro_resolved_writer_t *child_iface,
1589 					 avro_resolved_array_value_t *self)
1590 {
1591 	size_t  i;
1592 	for (i = 0; i < avro_raw_array_size(&self->children); i++) {
1593 		void  *child_self = avro_raw_array_get_raw(&self->children, i);
1594 		avro_resolved_writer_done(child_iface, child_self);
1595 	}
1596 }
1597 
1598 static void
1599 avro_resolved_array_writer_done(const avro_resolved_writer_t *iface, void *vself)
1600 {
1601 	const avro_resolved_array_writer_t  *aiface =
1602 	    container_of(iface, avro_resolved_array_writer_t, parent);
1603 	avro_resolved_array_value_t  *self = (avro_resolved_array_value_t *) vself;
1604 	avro_resolved_array_writer_free_elements(aiface->child_resolver, self);
1605 	avro_raw_array_done(&self->children);
1606 }
1607 
1608 static int
1609 avro_resolved_array_writer_reset(const avro_resolved_writer_t *iface, void *vself)
1610 {
1611 	const avro_resolved_array_writer_t  *aiface =
1612 	    container_of(iface, avro_resolved_array_writer_t, parent);
1613 	avro_resolved_array_value_t  *self = (avro_resolved_array_value_t *) vself;
1614 
1615 	/* Clear out our cache of wrapped children */
1616 	avro_resolved_array_writer_free_elements(aiface->child_resolver, self);
1617 	avro_raw_array_clear(&self->children);
1618 	return 0;
1619 }
1620 
1621 static int
1622 avro_resolved_array_writer_get_size(const avro_value_iface_t *viface,
1623 				    const void *vself, size_t *size)
1624 {
1625 	int  rval;
1626 	const avro_resolved_writer_t  *iface =
1627 	    container_of(viface, avro_resolved_writer_t, parent);
1628 	const avro_resolved_array_value_t  *self = (const avro_resolved_array_value_t *) vself;
1629 	avro_value_t  dest;
1630 	check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest));
1631 	return avro_value_get_size(&dest, size);
1632 }
1633 
1634 static int
1635 avro_resolved_array_writer_append(const avro_value_iface_t *viface,
1636 				  void *vself, avro_value_t *child_out,
1637 				  size_t *new_index)
1638 {
1639 	int  rval;
1640 	const avro_resolved_writer_t  *iface =
1641 	    container_of(viface, avro_resolved_writer_t, parent);
1642 	const avro_resolved_array_writer_t  *aiface =
1643 	    container_of(iface, avro_resolved_array_writer_t, parent);
1644 	avro_resolved_array_value_t  *self = (avro_resolved_array_value_t *) vself;
1645 	avro_value_t  dest;
1646 	check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest));
1647 
1648 	child_out->iface = &aiface->child_resolver->parent;
1649 	child_out->self = avro_raw_array_append(&self->children);
1650 	if (child_out->self == NULL) {
1651 		avro_set_error("Couldn't expand array");
1652 		return ENOMEM;
1653 	}
1654 
1655 	DEBUG("Appending to array %p", dest.self);
1656 	check(rval, avro_value_append(&dest, (avro_value_t *) child_out->self, new_index));
1657 	return avro_resolved_writer_init(aiface->child_resolver, child_out->self);
1658 }
1659 
1660 static avro_resolved_array_writer_t *
1661 avro_resolved_array_writer_create(avro_schema_t wschema, avro_schema_t rschema)
1662 {
1663 	avro_resolved_writer_t  *self = (avro_resolved_writer_t *) avro_new(avro_resolved_array_writer_t);
1664 	memset(self, 0, sizeof(avro_resolved_array_writer_t));
1665 
1666 	self->parent.incref_iface = avro_resolved_writer_incref_iface;
1667 	self->parent.decref_iface = avro_resolved_writer_decref_iface;
1668 	self->parent.incref = avro_resolved_writer_incref;
1669 	self->parent.decref = avro_resolved_writer_decref;
1670 	self->parent.reset = avro_resolved_writer_reset;
1671 	self->parent.get_type = avro_resolved_writer_get_type;
1672 	self->parent.get_schema = avro_resolved_writer_get_schema;
1673 	self->parent.get_size = avro_resolved_array_writer_get_size;
1674 	self->parent.append = avro_resolved_array_writer_append;
1675 
1676 	self->refcount = 1;
1677 	self->wschema = avro_schema_incref(wschema);
1678 	self->rschema = avro_schema_incref(rschema);
1679 	self->reader_union_branch = -1;
1680 	self->calculate_size = avro_resolved_array_writer_calculate_size;
1681 	self->free_iface = avro_resolved_array_writer_free_iface;
1682 	self->init = avro_resolved_array_writer_init;
1683 	self->done = avro_resolved_array_writer_done;
1684 	self->reset_wrappers = avro_resolved_array_writer_reset;
1685 	return container_of(self, avro_resolved_array_writer_t, parent);
1686 }
1687 
1688 static int
1689 try_array(memoize_state_t *state, avro_resolved_writer_t **self,
1690 	  avro_schema_t wschema, avro_schema_t rschema,
1691 	  avro_schema_t root_rschema)
1692 {
1693 	/*
1694 	 * First verify that the reader is an array.
1695 	 */
1696 
1697 	if (!is_avro_array(rschema)) {
1698 		return 0;
1699 	}
1700 
1701 	/*
1702 	 * Array schemas have to have compatible element schemas to be
1703 	 * compatible themselves.  Try to create an resolver to check
1704 	 * the compatibility.
1705 	 */
1706 
1707 	avro_resolved_array_writer_t  *aself =
1708 	    avro_resolved_array_writer_create(wschema, root_rschema);
1709 	avro_memoize_set(&state->mem, wschema, root_rschema, aself);
1710 
1711 	avro_schema_t  witems = avro_schema_array_items(wschema);
1712 	avro_schema_t  ritems = avro_schema_array_items(rschema);
1713 
1714 	avro_resolved_writer_t  *item_resolver =
1715 	    avro_resolved_writer_new_memoized(state, witems, ritems);
1716 	if (item_resolver == NULL) {
1717 		avro_memoize_delete(&state->mem, wschema, root_rschema);
1718 		avro_value_iface_decref(&aself->parent.parent);
1719 		avro_prefix_error("Array values aren't compatible: ");
1720 		return EINVAL;
1721 	}
1722 
1723 	/*
1724 	 * The two schemas are compatible.  Store the item schema's
1725 	 * resolver into the child_resolver field.
1726 	 */
1727 
1728 	aself->child_resolver = item_resolver;
1729 	*self = &aself->parent;
1730 	return 0;
1731 }
1732 
1733 
1734 /*-----------------------------------------------------------------------
1735  * enum
1736  */
1737 
1738 static int
1739 avro_resolved_writer_set_enum(const avro_value_iface_t *viface,
1740 			      void *vself, int val)
1741 {
1742 	int  rval;
1743 	const avro_resolved_writer_t  *iface =
1744 	    container_of(viface, avro_resolved_writer_t, parent);
1745 	avro_value_t  *self = (avro_value_t *) vself;
1746 	avro_value_t  dest;
1747 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1748 	DEBUG("Storing %d into %p", val, dest.self);
1749 	return avro_value_set_enum(&dest, val);
1750 }
1751 
1752 static int
1753 try_enum(memoize_state_t *state, avro_resolved_writer_t **self,
1754 	 avro_schema_t wschema, avro_schema_t rschema,
1755 	 avro_schema_t root_rschema)
1756 {
1757 	/*
1758 	 * Enum schemas have to have the same name — but not the same
1759 	 * list of symbols — to be compatible.
1760 	 */
1761 
1762 	if (is_avro_enum(rschema)) {
1763 		const char  *wname = avro_schema_name(wschema);
1764 		const char  *rname = avro_schema_name(rschema);
1765 
1766 		if (strcmp(wname, rname) == 0) {
1767 			*self = avro_resolved_writer_create(wschema, root_rschema);
1768 			avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1769 			(*self)->parent.set_enum = avro_resolved_writer_set_enum;
1770 		}
1771 	}
1772 	return 0;
1773 }
1774 
1775 
1776 /*-----------------------------------------------------------------------
1777  * fixed
1778  */
1779 
1780 static int
1781 avro_resolved_writer_set_fixed(const avro_value_iface_t *viface,
1782 			       void *vself, void *buf, size_t size)
1783 {
1784 	int  rval;
1785 	const avro_resolved_writer_t  *iface =
1786 	    container_of(viface, avro_resolved_writer_t, parent);
1787 	avro_value_t  *self = (avro_value_t *) vself;
1788 	avro_value_t  dest;
1789 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1790 	DEBUG("Storing <%p:%" PRIsz "> into (fixed) %p", buf, size, dest.self);
1791 	return avro_value_set_fixed(&dest, buf, size);
1792 }
1793 
1794 static int
1795 avro_resolved_writer_give_fixed(const avro_value_iface_t *viface,
1796 				void *vself, avro_wrapped_buffer_t *buf)
1797 {
1798 	int  rval;
1799 	const avro_resolved_writer_t  *iface =
1800 	    container_of(viface, avro_resolved_writer_t, parent);
1801 	avro_value_t  *self = (avro_value_t *) vself;
1802 	avro_value_t  dest;
1803 	check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest));
1804 	DEBUG("Storing [%p] into (fixed) %p", buf, dest.self);
1805 	return avro_value_give_fixed(&dest, buf);
1806 }
1807 
1808 static int
1809 try_fixed(memoize_state_t *state, avro_resolved_writer_t **self,
1810 	  avro_schema_t wschema, avro_schema_t rschema,
1811 	  avro_schema_t root_rschema)
1812 {
1813 	/*
1814 	 * Fixed schemas need the same name and size to be compatible.
1815 	 */
1816 
1817 	if (avro_schema_equal(wschema, rschema)) {
1818 		*self = avro_resolved_writer_create(wschema, root_rschema);
1819 		avro_memoize_set(&state->mem, wschema, root_rschema, *self);
1820 		(*self)->parent.set_fixed = avro_resolved_writer_set_fixed;
1821 		(*self)->parent.give_fixed = avro_resolved_writer_give_fixed;
1822 	}
1823 	return 0;
1824 }
1825 
1826 
1827 /*-----------------------------------------------------------------------
1828  * map
1829  */
1830 
1831 typedef struct avro_resolved_map_writer {
1832 	avro_resolved_writer_t  parent;
1833 	avro_resolved_writer_t  *child_resolver;
1834 } avro_resolved_map_writer_t;
1835 
1836 typedef struct avro_resolved_map_value {
1837 	avro_value_t  wrapped;
1838 	avro_raw_array_t  children;
1839 } avro_resolved_map_value_t;
1840 
1841 static void
1842 avro_resolved_map_writer_calculate_size(avro_resolved_writer_t *iface)
1843 {
1844 	avro_resolved_map_writer_t  *miface =
1845 	    container_of(iface, avro_resolved_map_writer_t, parent);
1846 
1847 	/* Only calculate the size for any resolver once */
1848 	iface->calculate_size = NULL;
1849 
1850 	DEBUG("Calculating size for %s->%s",
1851 	      avro_schema_type_name((iface)->wschema),
1852 	      avro_schema_type_name((iface)->rschema));
1853 	iface->instance_size = sizeof(avro_resolved_map_value_t);
1854 
1855 	avro_resolved_writer_calculate_size(miface->child_resolver);
1856 }
1857 
1858 static void
1859 avro_resolved_map_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing)
1860 {
1861 	avro_resolved_map_writer_t  *miface =
1862 	    container_of(iface, avro_resolved_map_writer_t, parent);
1863 	free_resolver(miface->child_resolver, freeing);
1864 	avro_schema_decref(iface->wschema);
1865 	avro_schema_decref(iface->rschema);
1866 	avro_freet(avro_resolved_map_writer_t, iface);
1867 }
1868 
1869 static int
1870 avro_resolved_map_writer_init(const avro_resolved_writer_t *iface, void *vself)
1871 {
1872 	const avro_resolved_map_writer_t  *miface =
1873 	    container_of(iface, avro_resolved_map_writer_t, parent);
1874 	avro_resolved_map_value_t  *self = (avro_resolved_map_value_t *) vself;
1875 	size_t  child_instance_size = miface->child_resolver->instance_size;
1876 	DEBUG("Initializing child array for map (child_size=%" PRIsz ")", child_instance_size);
1877 	avro_raw_array_init(&self->children, child_instance_size);
1878 	return 0;
1879 }
1880 
1881 static void
1882 avro_resolved_map_writer_free_elements(const avro_resolved_writer_t *child_iface,
1883 				       avro_resolved_map_value_t *self)
1884 {
1885 	size_t  i;
1886 	for (i = 0; i < avro_raw_array_size(&self->children); i++) {
1887 		void  *child_self = avro_raw_array_get_raw(&self->children, i);
1888 		avro_resolved_writer_done(child_iface, child_self);
1889 	}
1890 }
1891 
1892 static void
1893 avro_resolved_map_writer_done(const avro_resolved_writer_t *iface, void *vself)
1894 {
1895 	const avro_resolved_map_writer_t  *miface =
1896 	    container_of(iface, avro_resolved_map_writer_t, parent);
1897 	avro_resolved_map_value_t  *self = (avro_resolved_map_value_t *) vself;
1898 	avro_resolved_map_writer_free_elements(miface->child_resolver, self);
1899 	avro_raw_array_done(&self->children);
1900 }
1901 
1902 static int
1903 avro_resolved_map_writer_reset(const avro_resolved_writer_t *iface, void *vself)
1904 {
1905 	const avro_resolved_map_writer_t  *miface =
1906 	    container_of(iface, avro_resolved_map_writer_t, parent);
1907 	avro_resolved_map_value_t  *self = (avro_resolved_map_value_t *) vself;
1908 
1909 	/* Clear out our cache of wrapped children */
1910 	avro_resolved_map_writer_free_elements(miface->child_resolver, self);
1911 	return 0;
1912 }
1913 
1914 static int
1915 avro_resolved_map_writer_get_size(const avro_value_iface_t *viface,
1916 				  const void *vself, size_t *size)
1917 {
1918 	int  rval;
1919 	const avro_resolved_writer_t  *iface =
1920 	    container_of(viface, avro_resolved_writer_t, parent);
1921 	const avro_resolved_map_value_t  *self = (const avro_resolved_map_value_t *) vself;
1922 	avro_value_t  dest;
1923 	check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest));
1924 	return avro_value_get_size(&dest, size);
1925 }
1926 
1927 static int
1928 avro_resolved_map_writer_add(const avro_value_iface_t *viface,
1929 			     void *vself, const char *key,
1930 			     avro_value_t *child, size_t *index, int *is_new)
1931 {
1932 	int  rval;
1933 	const avro_resolved_writer_t  *iface =
1934 	    container_of(viface, avro_resolved_writer_t, parent);
1935 	const avro_resolved_map_writer_t  *miface =
1936 	    container_of(iface, avro_resolved_map_writer_t, parent);
1937 	avro_resolved_map_value_t  *self = (avro_resolved_map_value_t *) vself;
1938 	avro_value_t  dest;
1939 	check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest));
1940 
1941 	/*
1942 	 * This is a bit convoluted.  We need to stash the wrapped child
1943 	 * value somewhere in our children array.  But we don't know
1944 	 * where to put it until the wrapped map tells us whether this
1945 	 * is a new value, and if not, which index the value should go
1946 	 * in.
1947 	 */
1948 
1949 	avro_value_t  real_child;
1950 	size_t  real_index;
1951 	int  real_is_new;
1952 
1953 	DEBUG("Adding %s to map %p", key, dest.self);
1954 	check(rval, avro_value_add(&dest, key, &real_child, &real_index, &real_is_new));
1955 
1956 	child->iface = &miface->child_resolver->parent;
1957 	if (real_is_new) {
1958 		child->self = avro_raw_array_append(&self->children);
1959 		DEBUG("Element is new (child resolver=%p)", child->self);
1960 		if (child->self == NULL) {
1961 			avro_set_error("Couldn't expand map");
1962 			return ENOMEM;
1963 		}
1964 		check(rval, avro_resolved_writer_init
1965 		      (miface->child_resolver, child->self));
1966 	} else {
1967 		child->self = avro_raw_array_get_raw(&self->children, real_index);
1968 		DEBUG("Element is old (child resolver=%p)", child->self);
1969 	}
1970 	avro_value_t  *child_vself = (avro_value_t *) child->self;
1971 	*child_vself = real_child;
1972 
1973 	if (index != NULL) {
1974 		*index = real_index;
1975 	}
1976 	if (is_new != NULL) {
1977 		*is_new = real_is_new;
1978 	}
1979 	return 0;
1980 }
1981 
1982 static avro_resolved_map_writer_t *
1983 avro_resolved_map_writer_create(avro_schema_t wschema, avro_schema_t rschema)
1984 {
1985 	avro_resolved_writer_t  *self = (avro_resolved_writer_t *) avro_new(avro_resolved_map_writer_t);
1986 	memset(self, 0, sizeof(avro_resolved_map_writer_t));
1987 
1988 	self->parent.incref_iface = avro_resolved_writer_incref_iface;
1989 	self->parent.decref_iface = avro_resolved_writer_decref_iface;
1990 	self->parent.incref = avro_resolved_writer_incref;
1991 	self->parent.decref = avro_resolved_writer_decref;
1992 	self->parent.reset = avro_resolved_writer_reset;
1993 	self->parent.get_type = avro_resolved_writer_get_type;
1994 	self->parent.get_schema = avro_resolved_writer_get_schema;
1995 	self->parent.get_size = avro_resolved_map_writer_get_size;
1996 	self->parent.add = avro_resolved_map_writer_add;
1997 
1998 	self->refcount = 1;
1999 	self->wschema = avro_schema_incref(wschema);
2000 	self->rschema = avro_schema_incref(rschema);
2001 	self->reader_union_branch = -1;
2002 	self->calculate_size = avro_resolved_map_writer_calculate_size;
2003 	self->free_iface = avro_resolved_map_writer_free_iface;
2004 	self->init = avro_resolved_map_writer_init;
2005 	self->done = avro_resolved_map_writer_done;
2006 	self->reset_wrappers = avro_resolved_map_writer_reset;
2007 	return container_of(self, avro_resolved_map_writer_t, parent);
2008 }
2009 
2010 static int
2011 try_map(memoize_state_t *state, avro_resolved_writer_t **self,
2012 	avro_schema_t wschema, avro_schema_t rschema,
2013 	avro_schema_t root_rschema)
2014 {
2015 	/*
2016 	 * First verify that the reader is an map.
2017 	 */
2018 
2019 	if (!is_avro_map(rschema)) {
2020 		return 0;
2021 	}
2022 
2023 	/*
2024 	 * Map schemas have to have compatible element schemas to be
2025 	 * compatible themselves.  Try to create an resolver to check
2026 	 * the compatibility.
2027 	 */
2028 
2029 	avro_resolved_map_writer_t  *mself =
2030 	    avro_resolved_map_writer_create(wschema, root_rschema);
2031 	avro_memoize_set(&state->mem, wschema, root_rschema, mself);
2032 
2033 	avro_schema_t  witems = avro_schema_map_values(wschema);
2034 	avro_schema_t  ritems = avro_schema_map_values(rschema);
2035 
2036 	avro_resolved_writer_t  *item_resolver =
2037 	    avro_resolved_writer_new_memoized(state, witems, ritems);
2038 	if (item_resolver == NULL) {
2039 		avro_memoize_delete(&state->mem, wschema, root_rschema);
2040 		avro_value_iface_decref(&mself->parent.parent);
2041 		avro_prefix_error("Map values aren't compatible: ");
2042 		return EINVAL;
2043 	}
2044 
2045 	/*
2046 	 * The two schemas are compatible.  Store the item schema's
2047 	 * resolver into the child_resolver field.
2048 	 */
2049 
2050 	mself->child_resolver = item_resolver;
2051 	*self = &mself->parent;
2052 	return 0;
2053 }
2054 
2055 
2056 /*-----------------------------------------------------------------------
2057  * record
2058  */
2059 
2060 typedef struct avro_resolved_record_writer {
2061 	avro_resolved_writer_t  parent;
2062 	size_t  field_count;
2063 	size_t  *field_offsets;
2064 	avro_resolved_writer_t  **field_resolvers;
2065 	size_t  *index_mapping;
2066 } avro_resolved_record_writer_t;
2067 
2068 typedef struct avro_resolved_record_value {
2069 	avro_value_t  wrapped;
2070 	/* The rest of the struct is taken up by the inline storage
2071 	 * needed for each field. */
2072 } avro_resolved_record_value_t;
2073 
2074 /** Return a pointer to the given field within a record struct. */
2075 #define avro_resolved_record_field(iface, rec, index) \
2076 	(((char *) (rec)) + (iface)->field_offsets[(index)])
2077 
2078 
2079 static void
2080 avro_resolved_record_writer_calculate_size(avro_resolved_writer_t *iface)
2081 {
2082 	avro_resolved_record_writer_t  *riface =
2083 	    container_of(iface, avro_resolved_record_writer_t, parent);
2084 
2085 	/* Only calculate the size for any resolver once */
2086 	iface->calculate_size = NULL;
2087 
2088 	DEBUG("Calculating size for %s->%s",
2089 	      avro_schema_type_name((iface)->wschema),
2090 	      avro_schema_type_name((iface)->rschema));
2091 
2092 	/*
2093 	 * Once we've figured out which writer fields we actually need,
2094 	 * calculate an offset for each one.
2095 	 */
2096 
2097 	size_t  wi;
2098 	size_t  next_offset = sizeof(avro_resolved_record_value_t);
2099 	for (wi = 0; wi < riface->field_count; wi++) {
2100 		riface->field_offsets[wi] = next_offset;
2101 		if (riface->field_resolvers[wi] != NULL) {
2102 			avro_resolved_writer_calculate_size
2103 			    (riface->field_resolvers[wi]);
2104 			size_t  field_size =
2105 			    riface->field_resolvers[wi]->instance_size;
2106 			DEBUG("Field %" PRIsz " has size %" PRIsz, wi, field_size);
2107 			next_offset += field_size;
2108 		} else {
2109 			DEBUG("Field %" PRIsz " is being skipped", wi);
2110 		}
2111 	}
2112 
2113 	DEBUG("Record has size %" PRIsz, next_offset);
2114 	iface->instance_size = next_offset;
2115 }
2116 
2117 static void
2118 avro_resolved_record_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing)
2119 {
2120 	avro_resolved_record_writer_t  *riface =
2121 	    container_of(iface, avro_resolved_record_writer_t, parent);
2122 
2123 	if (riface->field_offsets != NULL) {
2124 		avro_free(riface->field_offsets,
2125 			  riface->field_count * sizeof(size_t));
2126 	}
2127 
2128 	if (riface->field_resolvers != NULL) {
2129 		size_t  i;
2130 		for (i = 0; i < riface->field_count; i++) {
2131 			if (riface->field_resolvers[i] != NULL) {
2132 				DEBUG("Freeing field %" PRIsz " %p", i,
2133 				      riface->field_resolvers[i]);
2134 				free_resolver(riface->field_resolvers[i], freeing);
2135 			}
2136 		}
2137 		avro_free(riface->field_resolvers,
2138 			  riface->field_count * sizeof(avro_resolved_writer_t *));
2139 	}
2140 
2141 	if (riface->index_mapping != NULL) {
2142 		avro_free(riface->index_mapping,
2143 			  riface->field_count * sizeof(size_t));
2144 	}
2145 
2146 	avro_schema_decref(iface->wschema);
2147 	avro_schema_decref(iface->rschema);
2148 	avro_freet(avro_resolved_record_writer_t, iface);
2149 }
2150 
2151 static int
2152 avro_resolved_record_writer_init(const avro_resolved_writer_t *iface, void *vself)
2153 {
2154 	int  rval;
2155 	const avro_resolved_record_writer_t  *riface =
2156 	    container_of(iface, avro_resolved_record_writer_t, parent);
2157 	avro_resolved_record_value_t  *self = (avro_resolved_record_value_t *) vself;
2158 
2159 	/* Initialize each field */
2160 	size_t  i;
2161 	for (i = 0; i < riface->field_count; i++) {
2162 		if (riface->field_resolvers[i] != NULL) {
2163 			check(rval, avro_resolved_writer_init
2164 			      (riface->field_resolvers[i],
2165 			       avro_resolved_record_field(riface, self, i)));
2166 		}
2167 	}
2168 
2169 	return 0;
2170 }
2171 
2172 static void
2173 avro_resolved_record_writer_done(const avro_resolved_writer_t *iface, void *vself)
2174 {
2175 	const avro_resolved_record_writer_t  *riface =
2176 	    container_of(iface, avro_resolved_record_writer_t, parent);
2177 	avro_resolved_record_value_t  *self = (avro_resolved_record_value_t *) vself;
2178 
2179 	/* Finalize each field */
2180 	size_t  i;
2181 	for (i = 0; i < riface->field_count; i++) {
2182 		if (riface->field_resolvers[i] != NULL) {
2183 			avro_resolved_writer_done
2184 			    (riface->field_resolvers[i],
2185 			     avro_resolved_record_field(riface, self, i));
2186 		}
2187 	}
2188 }
2189 
2190 static int
2191 avro_resolved_record_writer_reset(const avro_resolved_writer_t *iface, void *vself)
2192 {
2193 	int  rval;
2194 	const avro_resolved_record_writer_t  *riface =
2195 	    container_of(iface, avro_resolved_record_writer_t, parent);
2196 	avro_resolved_record_value_t  *self = (avro_resolved_record_value_t *) vself;
2197 
2198 	/* Reset each field */
2199 	size_t  i;
2200 	for (i = 0; i < riface->field_count; i++) {
2201 		if (riface->field_resolvers[i] != NULL) {
2202 			check(rval, avro_resolved_writer_reset_wrappers
2203 			      (riface->field_resolvers[i],
2204 			       avro_resolved_record_field(riface, self, i)));
2205 		}
2206 	}
2207 
2208 	return 0;
2209 }
2210 
2211 static int
2212 avro_resolved_record_writer_get_size(const avro_value_iface_t *viface,
2213 				     const void *vself, size_t *size)
2214 {
2215 	AVRO_UNUSED(vself);
2216 	const avro_resolved_writer_t  *iface =
2217 	    container_of(viface, avro_resolved_writer_t, parent);
2218 	const avro_resolved_record_writer_t  *riface =
2219 	    container_of(iface, avro_resolved_record_writer_t, parent);
2220 	*size = riface->field_count;
2221 	return 0;
2222 }
2223 
2224 static int
2225 avro_resolved_record_writer_get_by_index(const avro_value_iface_t *viface,
2226 					 const void *vself, size_t index,
2227 					 avro_value_t *child, const char **name)
2228 {
2229 	int  rval;
2230 	const avro_resolved_writer_t  *iface =
2231 	    container_of(viface, avro_resolved_writer_t, parent);
2232 	const avro_resolved_record_writer_t  *riface =
2233 	    container_of(iface, avro_resolved_record_writer_t, parent);
2234 	const avro_resolved_record_value_t  *self = (const avro_resolved_record_value_t *) vself;
2235 	avro_value_t  dest;
2236 
2237 	DEBUG("Getting writer field %" PRIsz " from record %p", index, self);
2238 	if (riface->field_resolvers[index] == NULL) {
2239 		DEBUG("Reader doesn't have field, skipping");
2240 		child->iface = NULL;
2241 		child->self = NULL;
2242 		return 0;
2243 	}
2244 
2245 	check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest));
2246 	size_t  reader_index = riface->index_mapping[index];
2247 	DEBUG("  Reader field is %" PRIsz, reader_index);
2248 	child->iface = &riface->field_resolvers[index]->parent;
2249 	child->self = avro_resolved_record_field(riface, self, index);
2250 
2251 	return avro_value_get_by_index(&dest, reader_index, (avro_value_t *) child->self, name);
2252 }
2253 
2254 static int
2255 avro_resolved_record_writer_get_by_name(const avro_value_iface_t *viface,
2256 					const void *vself, const char *name,
2257 					avro_value_t *child, size_t *index)
2258 {
2259 	const avro_resolved_writer_t  *iface =
2260 	    container_of(viface, avro_resolved_writer_t, parent);
2261 
2262 	int  wi = avro_schema_record_field_get_index(iface->wschema, name);
2263 	if (wi == -1) {
2264 		avro_set_error("Record doesn't have field named %s", name);
2265 		return EINVAL;
2266 	}
2267 
2268 	DEBUG("Writer field %s is at index %d", name, wi);
2269 	if (index != NULL) {
2270 		*index = wi;
2271 	}
2272 	return avro_resolved_record_writer_get_by_index(viface, vself, wi, child, NULL);
2273 }
2274 
2275 static avro_resolved_record_writer_t *
2276 avro_resolved_record_writer_create(avro_schema_t wschema, avro_schema_t rschema)
2277 {
2278 	avro_resolved_writer_t  *self = (avro_resolved_writer_t *) avro_new(avro_resolved_record_writer_t);
2279 	memset(self, 0, sizeof(avro_resolved_record_writer_t));
2280 
2281 	self->parent.incref_iface = avro_resolved_writer_incref_iface;
2282 	self->parent.decref_iface = avro_resolved_writer_decref_iface;
2283 	self->parent.incref = avro_resolved_writer_incref;
2284 	self->parent.decref = avro_resolved_writer_decref;
2285 	self->parent.reset = avro_resolved_writer_reset;
2286 	self->parent.get_type = avro_resolved_writer_get_type;
2287 	self->parent.get_schema = avro_resolved_writer_get_schema;
2288 	self->parent.get_size = avro_resolved_record_writer_get_size;
2289 	self->parent.get_by_index = avro_resolved_record_writer_get_by_index;
2290 	self->parent.get_by_name = avro_resolved_record_writer_get_by_name;
2291 
2292 	self->refcount = 1;
2293 	self->wschema = avro_schema_incref(wschema);
2294 	self->rschema = avro_schema_incref(rschema);
2295 	self->reader_union_branch = -1;
2296 	self->calculate_size = avro_resolved_record_writer_calculate_size;
2297 	self->free_iface = avro_resolved_record_writer_free_iface;
2298 	self->init = avro_resolved_record_writer_init;
2299 	self->done = avro_resolved_record_writer_done;
2300 	self->reset_wrappers = avro_resolved_record_writer_reset;
2301 	return container_of(self, avro_resolved_record_writer_t, parent);
2302 }
2303 
2304 static int
2305 try_record(memoize_state_t *state, avro_resolved_writer_t **self,
2306 	   avro_schema_t wschema, avro_schema_t rschema,
2307 	   avro_schema_t root_rschema)
2308 {
2309 	/*
2310 	 * First verify that the reader is also a record, and has the
2311 	 * same name as the writer.
2312 	 */
2313 
2314 	if (!is_avro_record(rschema)) {
2315 		return 0;
2316 	}
2317 
2318 	const char  *wname = avro_schema_name(wschema);
2319 	const char  *rname = avro_schema_name(rschema);
2320 
2321 	if (strcmp(wname, rname) != 0) {
2322 		return 0;
2323 	}
2324 
2325 	/*
2326 	 * Categorize the fields in the record schemas.  Fields that are
2327 	 * only in the writer are ignored.  Fields that are only in the
2328 	 * reader raise a schema mismatch error, unless the field has a
2329 	 * default value.  Fields that are in both are resolved
2330 	 * recursively.
2331 	 *
2332 	 * The field_resolvers array will contain an avro_value_iface_t
2333 	 * for each field in the writer schema.  To build this array, we
2334 	 * loop through the fields of the reader schema.  If that field
2335 	 * is also in the writer schema, we resolve them recursively,
2336 	 * and store the resolver into the array.  If the field isn't in
2337 	 * the writer schema, we raise an error.  (TODO: Eventually,
2338 	 * we'll handle default values here.)  After this loop finishes,
2339 	 * any NULLs in the field_resolvers array will represent fields
2340 	 * in the writer but not the reader; these fields will be
2341 	 * skipped when processing the input.
2342 	 */
2343 
2344 	avro_resolved_record_writer_t  *rself =
2345 	    avro_resolved_record_writer_create(wschema, root_rschema);
2346 	avro_memoize_set(&state->mem, wschema, root_rschema, rself);
2347 
2348 	size_t  wfields = avro_schema_record_size(wschema);
2349 	size_t  rfields = avro_schema_record_size(rschema);
2350 
2351 	DEBUG("Checking writer record schema %s", wname);
2352 
2353 	avro_resolved_writer_t  **field_resolvers =
2354 	    (avro_resolved_writer_t **) avro_calloc(wfields, sizeof(avro_resolved_writer_t *));
2355 	size_t  *field_offsets = (size_t *) avro_calloc(wfields, sizeof(size_t));
2356 	size_t  *index_mapping = (size_t *) avro_calloc(wfields, sizeof(size_t));
2357 
2358 	size_t  ri;
2359 	for (ri = 0; ri < rfields; ri++) {
2360 		avro_schema_t  rfield =
2361 		    avro_schema_record_field_get_by_index(rschema, ri);
2362 		const char  *field_name =
2363 		    avro_schema_record_field_name(rschema, ri);
2364 
2365 		DEBUG("Resolving reader record field %" PRIsz " (%s)", ri, field_name);
2366 
2367 		/*
2368 		 * See if this field is also in the writer schema.
2369 		 */
2370 
2371 		int  wi = avro_schema_record_field_get_index(wschema, field_name);
2372 
2373 		if (wi == -1) {
2374 			/*
2375 			 * This field isn't in the writer schema —
2376 			 * that's an error!  TODO: Handle default
2377 			 * values!
2378 			 */
2379 
2380 			DEBUG("Field %s isn't in writer", field_name);
2381 
2382 			/* Allow missing fields in the writer. They
2383 			 * will default to zero. So skip over the
2384 			 * missing field, and continue building the
2385 			 * resolver. Note also that all missing values
2386 			 * are zero because avro_generic_value_new()
2387 			 * initializes all values of the reader to 0
2388 			 * on creation. This is a work-around because
2389 			 * default values are not implemented yet.
2390 			 */
2391 			#ifdef AVRO_ALLOW_MISSING_FIELDS_IN_RESOLVED_WRITER
2392 			continue;
2393 			#else
2394 			avro_set_error("Reader field %s doesn't appear in writer",
2395 				       field_name);
2396 			goto error;
2397 			#endif
2398 		}
2399 
2400 		/*
2401 		 * Try to recursively resolve the schemas for this
2402 		 * field.  If they're not compatible, that's an error.
2403 		 */
2404 
2405 		avro_schema_t  wfield =
2406 		    avro_schema_record_field_get_by_index(wschema, wi);
2407 		avro_resolved_writer_t  *field_resolver =
2408 		    avro_resolved_writer_new_memoized(state, wfield, rfield);
2409 
2410 		if (field_resolver == NULL) {
2411 			avro_prefix_error("Field %s isn't compatible: ", field_name);
2412 			goto error;
2413 		}
2414 
2415 		/*
2416 		 * Save the details for this field.
2417 		 */
2418 
2419 		DEBUG("Found match for field %s (%" PRIsz " in reader, %d in writer)",
2420 		      field_name, ri, wi);
2421 		field_resolvers[wi] = field_resolver;
2422 		index_mapping[wi] = ri;
2423 	}
2424 
2425 	/*
2426 	 * We might not have found matches for all of the writer fields,
2427 	 * but that's okay — any extras will be ignored.
2428 	 */
2429 
2430 	rself->field_count = wfields;
2431 	rself->field_offsets = field_offsets;
2432 	rself->field_resolvers = field_resolvers;
2433 	rself->index_mapping = index_mapping;
2434 	*self = &rself->parent;
2435 	return 0;
2436 
2437 error:
2438 	/*
2439 	 * Clean up any resolver we might have already created.
2440 	 */
2441 
2442 	avro_memoize_delete(&state->mem, wschema, root_rschema);
2443 	avro_value_iface_decref(&rself->parent.parent);
2444 
2445 	{
2446 		unsigned int  i;
2447 		for (i = 0; i < wfields; i++) {
2448 			if (field_resolvers[i]) {
2449 				avro_value_iface_decref(&field_resolvers[i]->parent);
2450 			}
2451 		}
2452 	}
2453 
2454 	avro_free(field_resolvers, wfields * sizeof(avro_resolved_writer_t *));
2455 	avro_free(field_offsets, wfields * sizeof(size_t));
2456 	avro_free(index_mapping, wfields * sizeof(size_t));
2457 	return EINVAL;
2458 }
2459 
2460 
2461 /*-----------------------------------------------------------------------
2462  * union
2463  */
2464 
2465 typedef struct avro_resolved_union_writer {
2466 	avro_resolved_writer_t  parent;
2467 	size_t  branch_count;
2468 	avro_resolved_writer_t  **branch_resolvers;
2469 } avro_resolved_union_writer_t;
2470 
2471 typedef struct avro_resolved_union_value {
2472 	avro_value_t  wrapped;
2473 
2474 	/** The currently active branch of the union.  -1 if no branch
2475 	 * is selected. */
2476 	int  discriminant;
2477 
2478 	/* The rest of the struct is taken up by the inline storage
2479 	 * needed for the active branch. */
2480 } avro_resolved_union_value_t;
2481 
2482 /** Return a pointer to the active branch within a union struct. */
2483 #define avro_resolved_union_branch(_union) \
2484 	(((char *) (_union)) + sizeof(avro_resolved_union_value_t))
2485 
2486 
2487 static void
2488 avro_resolved_union_writer_calculate_size(avro_resolved_writer_t *iface)
2489 {
2490 	avro_resolved_union_writer_t  *uiface =
2491 	    container_of(iface, avro_resolved_union_writer_t, parent);
2492 
2493 	/* Only calculate the size for any resolver once */
2494 	iface->calculate_size = NULL;
2495 
2496 	DEBUG("Calculating size for %s->%s",
2497 	      avro_schema_type_name((iface)->wschema),
2498 	      avro_schema_type_name((iface)->rschema));
2499 
2500 	size_t  i;
2501 	size_t  max_branch_size = 0;
2502 	for (i = 0; i < uiface->branch_count; i++) {
2503 		if (uiface->branch_resolvers[i] == NULL) {
2504 			DEBUG("No match for writer union branch %" PRIsz, i);
2505 		} else {
2506 			avro_resolved_writer_calculate_size
2507 			    (uiface->branch_resolvers[i]);
2508 			size_t  branch_size =
2509 			    uiface->branch_resolvers[i]->instance_size;
2510 			DEBUG("Writer branch %" PRIsz " has size %" PRIsz, i, branch_size);
2511 			if (branch_size > max_branch_size) {
2512 				max_branch_size = branch_size;
2513 			}
2514 		}
2515 	}
2516 
2517 	DEBUG("Maximum branch size is %" PRIsz, max_branch_size);
2518 	iface->instance_size =
2519 	    sizeof(avro_resolved_union_value_t) + max_branch_size;
2520 	DEBUG("Total union size is %" PRIsz, iface->instance_size);
2521 }
2522 
2523 static void
2524 avro_resolved_union_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing)
2525 {
2526 	avro_resolved_union_writer_t  *uiface =
2527 	    container_of(iface, avro_resolved_union_writer_t, parent);
2528 
2529 	if (uiface->branch_resolvers != NULL) {
2530 		size_t  i;
2531 		for (i = 0; i < uiface->branch_count; i++) {
2532 			if (uiface->branch_resolvers[i] != NULL) {
2533 				free_resolver(uiface->branch_resolvers[i], freeing);
2534 			}
2535 		}
2536 		avro_free(uiface->branch_resolvers,
2537 			  uiface->branch_count * sizeof(avro_resolved_writer_t *));
2538 	}
2539 
2540 	avro_schema_decref(iface->wschema);
2541 	avro_schema_decref(iface->rschema);
2542 	avro_freet(avro_resolved_union_writer_t, iface);
2543 }
2544 
2545 static int
2546 avro_resolved_union_writer_init(const avro_resolved_writer_t *iface, void *vself)
2547 {
2548 	AVRO_UNUSED(iface);
2549 	avro_resolved_union_value_t  *self = (avro_resolved_union_value_t *) vself;
2550 	self->discriminant = -1;
2551 	return 0;
2552 }
2553 
2554 static void
2555 avro_resolved_union_writer_done(const avro_resolved_writer_t *iface, void *vself)
2556 {
2557 	const avro_resolved_union_writer_t  *uiface =
2558 	    container_of(iface, avro_resolved_union_writer_t, parent);
2559 	avro_resolved_union_value_t  *self = (avro_resolved_union_value_t *) vself;
2560 	if (self->discriminant >= 0) {
2561 		avro_resolved_writer_done
2562 		    (uiface->branch_resolvers[self->discriminant],
2563 		     avro_resolved_union_branch(self));
2564 		self->discriminant = -1;
2565 	}
2566 }
2567 
2568 static int
2569 avro_resolved_union_writer_reset(const avro_resolved_writer_t *iface, void *vself)
2570 {
2571 	const avro_resolved_union_writer_t  *uiface =
2572 	    container_of(iface, avro_resolved_union_writer_t, parent);
2573 	avro_resolved_union_value_t  *self = (avro_resolved_union_value_t *) vself;
2574 
2575 	/* Keep the same branch selected, for the common case that we're
2576 	 * about to reuse it. */
2577 	if (self->discriminant >= 0) {
2578 		return avro_resolved_writer_reset_wrappers
2579 		    (uiface->branch_resolvers[self->discriminant],
2580 		     avro_resolved_union_branch(self));
2581 	}
2582 
2583 	return 0;
2584 }
2585 
2586 static int
2587 avro_resolved_union_writer_set_branch(const avro_value_iface_t *viface,
2588 				      void *vself, int discriminant,
2589 				      avro_value_t *branch)
2590 {
2591 	int  rval;
2592 	const avro_resolved_writer_t  *iface =
2593 	    container_of(viface, avro_resolved_writer_t, parent);
2594 	const avro_resolved_union_writer_t  *uiface =
2595 	    container_of(iface, avro_resolved_union_writer_t, parent);
2596 	avro_resolved_union_value_t  *self = (avro_resolved_union_value_t *) vself;
2597 
2598 	DEBUG("Getting writer branch %d from union %p", discriminant, vself);
2599 	avro_resolved_writer_t  *branch_resolver =
2600 	    uiface->branch_resolvers[discriminant];
2601 	if (branch_resolver == NULL) {
2602 		DEBUG("Reader doesn't have branch, skipping");
2603 		avro_set_error("Writer union branch %d is incompatible "
2604 			       "with reader schema \"%s\"",
2605 			       discriminant, avro_schema_type_name(iface->rschema));
2606 		return EINVAL;
2607 	}
2608 
2609 	if (self->discriminant == discriminant) {
2610 		DEBUG("Writer branch %d already selected", discriminant);
2611 	} else {
2612 		if (self->discriminant >= 0) {
2613 			DEBUG("Finalizing old writer branch %d", self->discriminant);
2614 			avro_resolved_writer_done
2615 			    (uiface->branch_resolvers[self->discriminant],
2616 			     avro_resolved_union_branch(self));
2617 		}
2618 		DEBUG("Initializing writer branch %d", discriminant);
2619 		check(rval, avro_resolved_writer_init
2620 		      (uiface->branch_resolvers[discriminant],
2621 		       avro_resolved_union_branch(self)));
2622 		self->discriminant = discriminant;
2623 	}
2624 
2625 	branch->iface = &branch_resolver->parent;
2626 	branch->self = avro_resolved_union_branch(self);
2627 	avro_value_t  *branch_vself = (avro_value_t *) branch->self;
2628 	*branch_vself = self->wrapped;
2629 	return 0;
2630 }
2631 
2632 static avro_resolved_union_writer_t *
2633 avro_resolved_union_writer_create(avro_schema_t wschema, avro_schema_t rschema)
2634 {
2635 	avro_resolved_writer_t  *self = (avro_resolved_writer_t *) avro_new(avro_resolved_union_writer_t);
2636 	memset(self, 0, sizeof(avro_resolved_union_writer_t));
2637 
2638 	self->parent.incref_iface = avro_resolved_writer_incref_iface;
2639 	self->parent.decref_iface = avro_resolved_writer_decref_iface;
2640 	self->parent.incref = avro_resolved_writer_incref;
2641 	self->parent.decref = avro_resolved_writer_decref;
2642 	self->parent.reset = avro_resolved_writer_reset;
2643 	self->parent.get_type = avro_resolved_writer_get_type;
2644 	self->parent.get_schema = avro_resolved_writer_get_schema;
2645 	self->parent.set_branch = avro_resolved_union_writer_set_branch;
2646 
2647 	self->refcount = 1;
2648 	self->wschema = avro_schema_incref(wschema);
2649 	self->rschema = avro_schema_incref(rschema);
2650 	self->reader_union_branch = -1;
2651 	self->calculate_size = avro_resolved_union_writer_calculate_size;
2652 	self->free_iface = avro_resolved_union_writer_free_iface;
2653 	self->init = avro_resolved_union_writer_init;
2654 	self->done = avro_resolved_union_writer_done;
2655 	self->reset_wrappers = avro_resolved_union_writer_reset;
2656 	return container_of(self, avro_resolved_union_writer_t, parent);
2657 }
2658 
2659 static avro_resolved_writer_t *
2660 try_union(memoize_state_t *state,
2661 	  avro_schema_t wschema, avro_schema_t rschema)
2662 {
2663 	/*
2664 	 * For a writer union, we recursively try to resolve each branch
2665 	 * against the reader schema.  This will work correctly whether
2666 	 * or not the reader is also a union — if the reader is a union,
2667 	 * then we'll resolve each (non-union) writer branch against the
2668 	 * reader union, which will be checked in our calls to
2669 	 * check_simple_writer below.  The net result is that we might
2670 	 * end up trying every combination of writer and reader
2671 	 * branches, when looking for compatible schemas.
2672 	 *
2673 	 * Regardless of what the reader schema is, for each writer
2674 	 * branch, we stash away the recursive resolver into the
2675 	 * branch_resolvers array.  A NULL entry in this array means
2676 	 * that that branch isn't compatible with the reader.  This
2677 	 * isn't an immediate schema resolution error, since we allow
2678 	 * incompatible branches in the types as long as that branch
2679 	 * never appears in the actual data.  We only return an error if
2680 	 * there are *no* branches that are compatible.
2681 	 */
2682 
2683 	size_t  branch_count = avro_schema_union_size(wschema);
2684 	DEBUG("Checking %" PRIsz "-branch writer union schema", branch_count);
2685 
2686 	avro_resolved_union_writer_t  *uself =
2687 	    avro_resolved_union_writer_create(wschema, rschema);
2688 	avro_memoize_set(&state->mem, wschema, rschema, uself);
2689 
2690 	avro_resolved_writer_t  **branch_resolvers =
2691 	    (avro_resolved_writer_t **) avro_calloc(branch_count, sizeof(avro_resolved_writer_t *));
2692 	int  some_branch_compatible = 0;
2693 
2694 	size_t  i;
2695 	for (i = 0; i < branch_count; i++) {
2696 		avro_schema_t  branch_schema =
2697 		    avro_schema_union_branch(wschema, i);
2698 
2699 		DEBUG("Resolving writer union branch %" PRIsz " (%s)", i,
2700 		      avro_schema_type_name(branch_schema));
2701 
2702 		/*
2703 		 * Try to recursively resolve this branch of the writer
2704 		 * union.  Don't raise an error if this fails — it's
2705 		 * okay for some of the branches to not be compatible
2706 		 * with the reader, as long as those branches never
2707 		 * appear in the input.
2708 		 */
2709 
2710 		branch_resolvers[i] =
2711 		    avro_resolved_writer_new_memoized(state, branch_schema, rschema);
2712 		if (branch_resolvers[i] == NULL) {
2713 			DEBUG("No match for writer union branch %" PRIsz, i);
2714 		} else {
2715 			DEBUG("Found match for writer union branch %" PRIsz, i);
2716 			some_branch_compatible = 1;
2717 		}
2718 	}
2719 
2720 	/*
2721 	 * As long as there's at least one branch that's compatible with
2722 	 * the reader, then we consider this schema resolution a
2723 	 * success.
2724 	 */
2725 
2726 	if (!some_branch_compatible) {
2727 		DEBUG("No writer union branches match");
2728 		avro_set_error("No branches in the writer are compatible "
2729 			       "with reader schema %s",
2730 			       avro_schema_type_name(rschema));
2731 		goto error;
2732 	}
2733 
2734 	uself->branch_count = branch_count;
2735 	uself->branch_resolvers = branch_resolvers;
2736 	return &uself->parent;
2737 
2738 error:
2739 	/*
2740 	 * Clean up any resolver we might have already created.
2741 	 */
2742 
2743 	avro_memoize_delete(&state->mem, wschema, rschema);
2744 	avro_value_iface_decref(&uself->parent.parent);
2745 
2746 	{
2747 		unsigned int  i;
2748 		for (i = 0; i < branch_count; i++) {
2749 			if (branch_resolvers[i]) {
2750 				avro_value_iface_decref(&branch_resolvers[i]->parent);
2751 			}
2752 		}
2753 	}
2754 
2755 	avro_free(branch_resolvers, branch_count * sizeof(avro_resolved_writer_t *));
2756 	return NULL;
2757 }
2758 
2759 
2760 /*-----------------------------------------------------------------------
2761  * Schema type dispatcher
2762  */
2763 
2764 static avro_resolved_writer_t *
2765 avro_resolved_writer_new_memoized(memoize_state_t *state,
2766 				  avro_schema_t wschema, avro_schema_t rschema)
2767 {
2768 	check_param(NULL, is_avro_schema(wschema), "writer schema");
2769 	check_param(NULL, is_avro_schema(rschema), "reader schema");
2770 
2771 	skip_links(rschema);
2772 
2773 	/*
2774 	 * First see if we've already matched these two schemas.  If so,
2775 	 * just return that resolver.
2776 	 */
2777 
2778 	avro_resolved_writer_t  *saved = NULL;
2779 	if (avro_memoize_get(&state->mem, wschema, rschema, (void **) &saved)) {
2780 		DEBUG("Already resolved %s%s%s->%s",
2781 		      is_avro_link(wschema)? "[": "",
2782 		      avro_schema_type_name(wschema),
2783 		      is_avro_link(wschema)? "]": "",
2784 		      avro_schema_type_name(rschema));
2785 		avro_value_iface_incref(&saved->parent);
2786 		return saved;
2787 	} else {
2788 		DEBUG("Resolving %s%s%s->%s",
2789 		      is_avro_link(wschema)? "[": "",
2790 		      avro_schema_type_name(wschema),
2791 		      is_avro_link(wschema)? "]": "",
2792 		      avro_schema_type_name(rschema));
2793 	}
2794 
2795 	/*
2796 	 * Otherwise we have some work to do.
2797 	 */
2798 
2799 	switch (avro_typeof(wschema))
2800 	{
2801 		case AVRO_BOOLEAN:
2802 			check_simple_writer(state, wschema, rschema, boolean);
2803 			return NULL;
2804 
2805 		case AVRO_BYTES:
2806 			check_simple_writer(state, wschema, rschema, bytes);
2807 			return NULL;
2808 
2809 		case AVRO_DOUBLE:
2810 			check_simple_writer(state, wschema, rschema, double);
2811 			return NULL;
2812 
2813 		case AVRO_FLOAT:
2814 			check_simple_writer(state, wschema, rschema, float);
2815 			return NULL;
2816 
2817 		case AVRO_INT32:
2818 			check_simple_writer(state, wschema, rschema, int);
2819 			return NULL;
2820 
2821 		case AVRO_INT64:
2822 			check_simple_writer(state, wschema, rschema, long);
2823 			return NULL;
2824 
2825 		case AVRO_NULL:
2826 			check_simple_writer(state, wschema, rschema, null);
2827 			return NULL;
2828 
2829 		case AVRO_STRING:
2830 			check_simple_writer(state, wschema, rschema, string);
2831 			return NULL;
2832 
2833 		case AVRO_ARRAY:
2834 			check_simple_writer(state, wschema, rschema, array);
2835 			return NULL;
2836 
2837 		case AVRO_ENUM:
2838 			check_simple_writer(state, wschema, rschema, enum);
2839 			return NULL;
2840 
2841 		case AVRO_FIXED:
2842 			check_simple_writer(state, wschema, rschema, fixed);
2843 			return NULL;
2844 
2845 		case AVRO_MAP:
2846 			check_simple_writer(state, wschema, rschema, map);
2847 			return NULL;
2848 
2849 		case AVRO_RECORD:
2850 			check_simple_writer(state, wschema, rschema, record);
2851 			return NULL;
2852 
2853 		case AVRO_UNION:
2854 			return try_union(state, wschema, rschema);
2855 
2856 		case AVRO_LINK:
2857 			check_simple_writer(state, wschema, rschema, link);
2858 			return NULL;
2859 
2860 		default:
2861 			avro_set_error("Unknown schema type");
2862 			return NULL;
2863 	}
2864 
2865 	return NULL;
2866 }
2867 
2868 
2869 avro_value_iface_t *
2870 avro_resolved_writer_new(avro_schema_t wschema, avro_schema_t rschema)
2871 {
2872 	/*
2873 	 * Create a state to keep track of the value implementations
2874 	 * that we create for each subschema.
2875 	 */
2876 
2877 	memoize_state_t  state;
2878 	avro_memoize_init(&state.mem);
2879 	state.links = NULL;
2880 
2881 	/*
2882 	 * Create the value implementations.
2883 	 */
2884 
2885 	avro_resolved_writer_t  *result =
2886 	    avro_resolved_writer_new_memoized(&state, wschema, rschema);
2887 	if (result == NULL) {
2888 		avro_memoize_done(&state.mem);
2889 		return NULL;
2890 	}
2891 
2892 	/*
2893 	 * Fix up any link schemas so that their value implementations
2894 	 * point to their target schemas' implementations.
2895 	 */
2896 
2897 	avro_resolved_writer_calculate_size(result);
2898 	while (state.links != NULL) {
2899 		avro_resolved_link_writer_t  *liface = state.links;
2900 		avro_resolved_writer_calculate_size(liface->target_resolver);
2901 		state.links = liface->next;
2902 		liface->next = NULL;
2903 	}
2904 
2905 	/*
2906 	 * And now we can return.
2907 	 */
2908 
2909 	avro_memoize_done(&state.mem);
2910 	return &result->parent;
2911 }
2912