1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to you under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  * https://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14  * implied.  See the License for the specific language governing
15  * permissions and limitations under the License.
16  */
17 
18 #include "avro_private.h"
19 #include "avro/allocation.h"
20 #include "avro/generic.h"
21 #include "avro/errors.h"
22 #include "avro/value.h"
23 #include "encoding.h"
24 #include "codec.h"
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <time.h>
30 #include <string.h>
31 
32 struct avro_file_reader_t_ {
33 	avro_schema_t writers_schema;
34 	avro_reader_t reader;
35 	avro_reader_t block_reader;
36 	avro_codec_t codec;
37 	char sync[16];
38 	int64_t blocks_read;
39 	int64_t blocks_total;
40 	int64_t current_blocklen;
41 	char * current_blockdata;
42 };
43 
44 struct avro_file_writer_t_ {
45 	avro_schema_t writers_schema;
46 	avro_writer_t writer;
47 	avro_codec_t codec;
48 	char sync[16];
49 	int block_count;
50 	size_t block_size;
51 	avro_writer_t datum_writer;
52 	char* datum_buffer;
53 	size_t datum_buffer_size;
54 	char schema_buf[64 * 1024];
55 };
56 
57 #define DEFAULT_BLOCK_SIZE 16 * 1024
58 
59 /* Note: We should not just read /dev/random here, because it may not
60  * exist on all platforms e.g. Win32.
61  */
generate_sync(avro_file_writer_t w)62 static void generate_sync(avro_file_writer_t w)
63 {
64 	unsigned int i;
65 	srand(time(NULL));
66 	for (i = 0; i < sizeof(w->sync); i++) {
67 		w->sync[i] = ((double)rand() / (RAND_MAX + 1.0)) * 255;
68 	}
69 }
70 
write_sync(avro_file_writer_t w)71 static int write_sync(avro_file_writer_t w)
72 {
73 	return avro_write(w->writer, w->sync, sizeof(w->sync));
74 }
75 
write_header(avro_file_writer_t w)76 static int write_header(avro_file_writer_t w)
77 {
78 	int rval;
79 	uint8_t version = 1;
80 	/* TODO: remove this static buffer */
81 	avro_writer_t schema_writer;
82 	const avro_encoding_t *enc = &avro_binary_encoding;
83 	int64_t schema_len;
84 
85 	/* Generate random sync */
86 	generate_sync(w);
87 
88 	check(rval, avro_write(w->writer, "Obj", 3));
89 	check(rval, avro_write(w->writer, &version, 1));
90 
91 	check(rval, enc->write_long(w->writer, 2));
92 	check(rval, enc->write_string(w->writer, "avro.codec"));
93 	check(rval, enc->write_bytes(w->writer, w->codec->name, strlen(w->codec->name)));
94 	check(rval, enc->write_string(w->writer, "avro.schema"));
95 	schema_writer =
96 	    avro_writer_memory(&w->schema_buf[0], sizeof(w->schema_buf));
97 	rval = avro_schema_to_json(w->writers_schema, schema_writer);
98 	if (rval) {
99 		avro_writer_free(schema_writer);
100 		return rval;
101 	}
102 	schema_len = avro_writer_tell(schema_writer);
103 	avro_writer_free(schema_writer);
104 	check(rval,
105 	      enc->write_bytes(w->writer, w->schema_buf, schema_len));
106 	check(rval, enc->write_long(w->writer, 0));
107 	return write_sync(w);
108 }
109 
110 static int
file_writer_init_fp(FILE * fp,const char * path,int should_close,const char * mode,avro_file_writer_t w)111 file_writer_init_fp(FILE *fp, const char *path, int should_close, const char *mode, avro_file_writer_t w)
112 {
113 	if (!fp) {
114 		fp = fopen(path, mode);
115 	}
116 
117 	if (!fp) {
118 		avro_set_error("Cannot open file for %s", path);
119 		return ENOMEM;
120 	}
121 	w->writer = avro_writer_file_fp(fp, should_close);
122 	if (!w->writer) {
123 		if (should_close) {
124 			fclose(fp);
125 		}
126 		avro_set_error("Cannot create file writer for %s", path);
127 		return ENOMEM;
128 	}
129 	return 0;
130 }
131 
132 /* Exclusive file writing is supported by GCC using the mode
133  * "wx". Win32 does not support exclusive file writing, so for win32
134  * fall back to the non-exclusive file writing.
135  */
136 #ifdef _WIN32
137   #define EXCLUSIVE_WRITE_MODE   "wb"
138 #else
139   #define EXCLUSIVE_WRITE_MODE   "wbx"
140 #endif
141 
142 static int
file_writer_create(FILE * fp,const char * path,int should_close,avro_schema_t schema,avro_file_writer_t w,size_t block_size)143 file_writer_create(FILE *fp, const char *path, int should_close, avro_schema_t schema, avro_file_writer_t w, size_t block_size)
144 {
145 	int rval;
146 
147 	w->block_count = 0;
148 	rval = file_writer_init_fp(fp, path, should_close, EXCLUSIVE_WRITE_MODE, w);
149 	if (rval) {
150 		check(rval, file_writer_init_fp(fp, path, should_close, "wb", w));
151 	}
152 
153 	w->datum_buffer_size = block_size;
154 	w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
155 
156 	if(!w->datum_buffer) {
157 		avro_set_error("Could not allocate datum buffer\n");
158 		avro_writer_free(w->writer);
159 		return ENOMEM;
160 	}
161 
162 	w->datum_writer =
163 	    avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
164 	if (!w->datum_writer) {
165 		avro_set_error("Cannot create datum writer for file %s", path);
166 		avro_writer_free(w->writer);
167 		avro_free(w->datum_buffer, w->datum_buffer_size);
168 		return ENOMEM;
169 	}
170 
171 	w->writers_schema = avro_schema_incref(schema);
172 	return write_header(w);
173 }
174 
175 int
avro_file_writer_create(const char * path,avro_schema_t schema,avro_file_writer_t * writer)176 avro_file_writer_create(const char *path, avro_schema_t schema,
177 			avro_file_writer_t * writer)
178 {
179 	return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, "null", 0);
180 }
181 
182 int
avro_file_writer_create_fp(FILE * fp,const char * path,int should_close,avro_schema_t schema,avro_file_writer_t * writer)183 avro_file_writer_create_fp(FILE *fp, const char *path, int should_close, avro_schema_t schema,
184 			avro_file_writer_t * writer)
185 {
186 	return avro_file_writer_create_with_codec_fp(fp, path, should_close, schema, writer, "null", 0);
187 }
188 
avro_file_writer_create_with_codec(const char * path,avro_schema_t schema,avro_file_writer_t * writer,const char * codec,size_t block_size)189 int avro_file_writer_create_with_codec(const char *path,
190 			avro_schema_t schema, avro_file_writer_t * writer,
191 			const char *codec, size_t block_size)
192 {
193 	return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, codec, block_size);
194 }
195 
avro_file_writer_create_with_codec_fp(FILE * fp,const char * path,int should_close,avro_schema_t schema,avro_file_writer_t * writer,const char * codec,size_t block_size)196 int avro_file_writer_create_with_codec_fp(FILE *fp, const char *path, int should_close,
197 			avro_schema_t schema, avro_file_writer_t * writer,
198 			const char *codec, size_t block_size)
199 {
200 	avro_file_writer_t w;
201 	int rval;
202 	check_param(EINVAL, path, "path");
203 	check_param(EINVAL, is_avro_schema(schema), "schema");
204 	check_param(EINVAL, writer, "writer");
205 	check_param(EINVAL, codec, "codec");
206 
207 	if (block_size == 0) {
208 		block_size = DEFAULT_BLOCK_SIZE;
209 	}
210 
211 	w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
212 	if (!w) {
213 		avro_set_error("Cannot allocate new file writer");
214 		return ENOMEM;
215 	}
216 	w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
217 	if (!w->codec) {
218 		avro_set_error("Cannot allocate new codec");
219 		avro_freet(struct avro_file_writer_t_, w);
220 		return ENOMEM;
221 	}
222 	rval = avro_codec(w->codec, codec);
223 	if (rval) {
224 		avro_codec_reset(w->codec);
225 		avro_freet(struct avro_codec_t_, w->codec);
226 		avro_freet(struct avro_file_writer_t_, w);
227 		return rval;
228 	}
229 	rval = file_writer_create(fp, path, should_close, schema, w, block_size);
230 	if (rval) {
231 		avro_codec_reset(w->codec);
232 		avro_freet(struct avro_codec_t_, w->codec);
233 		avro_freet(struct avro_file_writer_t_, w);
234 		return rval;
235 	}
236 	*writer = w;
237 
238 	return 0;
239 }
240 
file_read_header(avro_reader_t reader,avro_schema_t * writers_schema,avro_codec_t codec,char * sync,int synclen)241 static int file_read_header(avro_reader_t reader,
242 			    avro_schema_t * writers_schema, avro_codec_t codec,
243 			    char *sync, int synclen)
244 {
245 	int rval;
246 	avro_schema_t meta_schema;
247 	avro_schema_t meta_values_schema;
248 	avro_value_iface_t *meta_iface;
249 	avro_value_t meta;
250 	char magic[4];
251 	avro_value_t codec_val;
252 	avro_value_t schema_bytes;
253 	const void *p;
254 	size_t len;
255 
256 	check(rval, avro_read(reader, magic, sizeof(magic)));
257 	if (magic[0] != 'O' || magic[1] != 'b' || magic[2] != 'j'
258 	    || magic[3] != 1) {
259 		avro_set_error("Incorrect Avro container file magic number");
260 		return EILSEQ;
261 	}
262 
263 	meta_values_schema = avro_schema_bytes();
264 	meta_schema = avro_schema_map(meta_values_schema);
265 	meta_iface = avro_generic_class_from_schema(meta_schema);
266 	if (meta_iface == NULL) {
267 		return EILSEQ;
268 	}
269 	check(rval, avro_generic_value_new(meta_iface, &meta));
270 	rval = avro_value_read(reader, &meta);
271 	if (rval) {
272 		avro_prefix_error("Cannot read file header: ");
273 		return EILSEQ;
274 	}
275 	avro_schema_decref(meta_schema);
276 
277 	rval = avro_value_get_by_name(&meta, "avro.codec", &codec_val, NULL);
278 	if (rval) {
279 		if (avro_codec(codec, NULL) != 0) {
280 			avro_set_error("Codec not specified in header and unable to set 'null' codec");
281 			avro_value_decref(&meta);
282 			return EILSEQ;
283 		}
284 	} else {
285 		const void *buf;
286 		size_t size;
287 		char codec_name[11];
288 
289 		avro_type_t type = avro_value_get_type(&codec_val);
290 
291 		if (type != AVRO_BYTES) {
292 			avro_set_error("Value type of codec is unexpected");
293 			avro_value_decref(&meta);
294 			return EILSEQ;
295 		}
296 
297 		avro_value_get_bytes(&codec_val, &buf, &size);
298 		memset(codec_name, 0, sizeof(codec_name));
299 		strncpy(codec_name, (const char *) buf, size < 10 ? size : 10);
300 
301 		if (avro_codec(codec, codec_name) != 0) {
302 			avro_set_error("File header contains an unknown codec");
303 			avro_value_decref(&meta);
304 			return EILSEQ;
305 		}
306 	}
307 
308 	rval = avro_value_get_by_name(&meta, "avro.schema", &schema_bytes, NULL);
309 	if (rval) {
310 		avro_set_error("File header doesn't contain a schema");
311 		avro_value_decref(&meta);
312 		return EILSEQ;
313 	}
314 
315 	avro_value_get_bytes(&schema_bytes, &p, &len);
316 	rval = avro_schema_from_json_length((const char *) p, len, writers_schema);
317 	if (rval) {
318 		avro_prefix_error("Cannot parse file header: ");
319 		avro_value_decref(&meta);
320 		return rval;
321 	}
322 
323 	avro_value_decref(&meta);
324 	avro_value_iface_decref(meta_iface);
325 	return avro_read(reader, sync, synclen);
326 }
327 
328 static int
file_writer_open(const char * path,avro_file_writer_t w,size_t block_size)329 file_writer_open(const char *path, avro_file_writer_t w, size_t block_size)
330 {
331 	int rval;
332 	FILE *fp;
333 	avro_reader_t reader;
334 
335 	/* Open for read AND write */
336 	fp = fopen(path, "r+b");
337 	if (!fp) {
338 		avro_set_error("Error opening file: %s",
339 			       strerror(errno));
340 		return errno;
341 	}
342 
343 	/* Don`t close the underlying file descriptor, logrotate can
344 	 * vanish it from sight. */
345 	reader = avro_reader_file_fp(fp, 0);
346 	if (!reader) {
347 		fclose(fp);
348 		avro_set_error("Cannot create file reader for %s", path);
349 		return ENOMEM;
350 	}
351 	rval =
352 	    file_read_header(reader, &w->writers_schema, w->codec, w->sync,
353 			     sizeof(w->sync));
354 
355 	avro_reader_free(reader);
356 	if (rval) {
357 		fclose(fp);
358 		return rval;
359 	}
360 
361 	w->block_count = 0;
362 
363 	/* Position to end of file and get ready to write */
364 	fseek(fp, 0, SEEK_END);
365 
366 	w->writer = avro_writer_file(fp);
367 	if (!w->writer) {
368 		fclose(fp);
369 		avro_set_error("Cannot create file writer for %s", path);
370 		return ENOMEM;
371 	}
372 
373 	if (block_size == 0) {
374 		block_size = DEFAULT_BLOCK_SIZE;
375 	}
376 
377 	w->datum_buffer_size = block_size;
378 	w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
379 
380 	if(!w->datum_buffer) {
381 		avro_set_error("Could not allocate datum buffer\n");
382 		avro_writer_free(w->writer);
383 		return ENOMEM;
384 	}
385 
386 	w->datum_writer =
387 	    avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
388 	if (!w->datum_writer) {
389 		avro_set_error("Cannot create datum writer for file %s", path);
390 		avro_writer_free(w->writer);
391 		avro_free(w->datum_buffer, w->datum_buffer_size);
392 		return ENOMEM;
393 	}
394 
395 	return 0;
396 }
397 
398 int
avro_file_writer_open_bs(const char * path,avro_file_writer_t * writer,size_t block_size)399 avro_file_writer_open_bs(const char *path, avro_file_writer_t * writer,
400 			 size_t block_size)
401 {
402 	avro_file_writer_t w;
403 	int rval;
404 	check_param(EINVAL, path, "path");
405 	check_param(EINVAL, writer, "writer");
406 
407 	w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
408 	if (!w) {
409 		avro_set_error("Cannot create new file writer for %s", path);
410 		return ENOMEM;
411 	}
412 	w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
413 	if (!w->codec) {
414 		avro_set_error("Cannot allocate new codec");
415 		avro_freet(struct avro_file_writer_t_, w);
416 		return ENOMEM;
417 	}
418 	avro_codec(w->codec, NULL);
419 	rval = file_writer_open(path, w, block_size);
420 	if (rval) {
421 		avro_codec_reset(w->codec);
422 		avro_freet(struct avro_codec_t_, w->codec);
423 		avro_freet(struct avro_file_writer_t_, w);
424 		return rval;
425 	}
426 
427 	*writer = w;
428 	return 0;
429 }
430 
431 int
avro_file_writer_open(const char * path,avro_file_writer_t * writer)432 avro_file_writer_open(const char *path, avro_file_writer_t * writer)
433 {
434 	return avro_file_writer_open_bs(path, writer, 0);
435 }
436 
file_read_block_count(avro_file_reader_t r)437 static int file_read_block_count(avro_file_reader_t r)
438 {
439 	int rval;
440 	int64_t len;
441 	const avro_encoding_t *enc = &avro_binary_encoding;
442 
443 	/* For a correctly formatted file, EOF will occur here */
444 	rval = enc->read_long(r->reader, &r->blocks_total);
445 
446 	if (rval == EILSEQ && avro_reader_is_eof(r->reader)) {
447 		return EOF;
448 	}
449 
450 	check_prefix(rval, rval,
451 		     "Cannot read file block count: ");
452 	check_prefix(rval, enc->read_long(r->reader, &len),
453 		     "Cannot read file block size: ");
454 
455 	if (r->current_blockdata && len > r->current_blocklen) {
456 		r->current_blockdata = (char *) avro_realloc(r->current_blockdata, r->current_blocklen, len);
457 		r->current_blocklen = len;
458 	} else if (!r->current_blockdata) {
459 		r->current_blockdata = (char *) avro_malloc(len);
460 		r->current_blocklen = len;
461 	}
462 
463 	if (len > 0) {
464 		check_prefix(rval, avro_read(r->reader, r->current_blockdata, len),
465 			     "Cannot read file block: ");
466 
467 		check_prefix(rval, avro_codec_decode(r->codec, r->current_blockdata, len),
468 			     "Cannot decode file block: ");
469 	}
470 
471 	avro_reader_memory_set_source(r->block_reader, (const char *) r->codec->block_data, r->codec->used_size);
472 
473 	r->blocks_read = 0;
474 	return 0;
475 }
476 
avro_file_reader_fp(FILE * fp,const char * path,int should_close,avro_file_reader_t * reader)477 int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
478 			avro_file_reader_t * reader)
479 {
480 	int rval;
481 	avro_file_reader_t r = (avro_file_reader_t) avro_new(struct avro_file_reader_t_);
482 	if (!r) {
483 		if (should_close) {
484 			fclose(fp);
485 		}
486 		avro_set_error("Cannot allocate file reader for %s", path);
487 		return ENOMEM;
488 	}
489 
490 	r->reader = avro_reader_file_fp(fp, should_close);
491 	if (!r->reader) {
492 		if (should_close) {
493 			fclose(fp);
494 		}
495 		avro_set_error("Cannot allocate reader for file %s", path);
496 		avro_freet(struct avro_file_reader_t_, r);
497 		return ENOMEM;
498 	}
499 	r->block_reader = avro_reader_memory(0, 0);
500 	if (!r->block_reader) {
501 		avro_set_error("Cannot allocate block reader for file %s", path);
502 		avro_reader_free(r->reader);
503 		avro_freet(struct avro_file_reader_t_, r);
504 		return ENOMEM;
505 	}
506 
507 	r->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
508 	if (!r->codec) {
509 		avro_set_error("Could not allocate codec for file %s", path);
510 		avro_reader_free(r->reader);
511 		avro_freet(struct avro_file_reader_t_, r);
512 		return ENOMEM;
513 	}
514 	avro_codec(r->codec, NULL);
515 
516 	rval = file_read_header(r->reader, &r->writers_schema, r->codec,
517 				r->sync, sizeof(r->sync));
518 	if (rval) {
519 		avro_reader_free(r->reader);
520 		avro_codec_reset(r->codec);
521 		avro_freet(struct avro_codec_t_, r->codec);
522 		avro_freet(struct avro_file_reader_t_, r);
523 		return rval;
524 	}
525 
526 	r->current_blockdata = NULL;
527 	r->current_blocklen = 0;
528 
529 	rval = file_read_block_count(r);
530 	if (rval == EOF) {
531 		r->blocks_total = 0;
532 	} else if (rval) {
533 		avro_reader_free(r->reader);
534 		avro_codec_reset(r->codec);
535 		avro_freet(struct avro_codec_t_, r->codec);
536 		avro_freet(struct avro_file_reader_t_, r);
537 		return rval;
538 	}
539 
540 	*reader = r;
541 	return 0;
542 }
543 
avro_file_reader(const char * path,avro_file_reader_t * reader)544 int avro_file_reader(const char *path, avro_file_reader_t * reader)
545 {
546 	FILE *fp;
547 
548 	fp = fopen(path, "rb");
549 	if (!fp) {
550 		return errno;
551 	}
552 
553 	return avro_file_reader_fp(fp, path, 1, reader);
554 }
555 
556 avro_schema_t
avro_file_reader_get_writer_schema(avro_file_reader_t r)557 avro_file_reader_get_writer_schema(avro_file_reader_t r)
558 {
559 	check_param(NULL, r, "reader");
560 	return avro_schema_incref(r->writers_schema);
561 }
562 
file_write_block(avro_file_writer_t w)563 static int file_write_block(avro_file_writer_t w)
564 {
565 	const avro_encoding_t *enc = &avro_binary_encoding;
566 	int rval;
567 
568 	if (w->block_count) {
569 		/* Write the block count */
570 		check_prefix(rval, enc->write_long(w->writer, w->block_count),
571 			     "Cannot write file block count: ");
572 		/* Encode the block */
573 		check_prefix(rval, avro_codec_encode(w->codec, w->datum_buffer, w->block_size),
574 			     "Cannot encode file block: ");
575 		/* Write the block length */
576 		check_prefix(rval, enc->write_long(w->writer, w->codec->used_size),
577 			     "Cannot write file block size: ");
578 		/* Write the block */
579 		check_prefix(rval, avro_write(w->writer, w->codec->block_data, w->codec->used_size),
580 			     "Cannot write file block: ");
581 		/* Write the sync marker */
582 		check_prefix(rval, write_sync(w),
583 			     "Cannot write sync marker: ");
584 		/* Reset the datum writer */
585 		avro_writer_reset(w->datum_writer);
586 		w->block_count = 0;
587 		w->block_size = 0;
588 	}
589 	return 0;
590 }
591 
avro_file_writer_append(avro_file_writer_t w,avro_datum_t datum)592 int avro_file_writer_append(avro_file_writer_t w, avro_datum_t datum)
593 {
594 	int rval;
595 	check_param(EINVAL, w, "writer");
596 	check_param(EINVAL, datum, "datum");
597 
598 	rval = avro_write_data(w->datum_writer, w->writers_schema, datum);
599 	if (rval) {
600 		check(rval, file_write_block(w));
601 		rval =
602 		    avro_write_data(w->datum_writer, w->writers_schema, datum);
603 		if (rval) {
604 			avro_set_error("Datum too large for file block size");
605 			/* TODO: if the datum encoder larger than our buffer,
606 			   just write a single large datum */
607 			return rval;
608 		}
609 	}
610 	w->block_count++;
611 	w->block_size = avro_writer_tell(w->datum_writer);
612 	return 0;
613 }
614 
615 int
avro_file_writer_append_value(avro_file_writer_t w,avro_value_t * value)616 avro_file_writer_append_value(avro_file_writer_t w, avro_value_t *value)
617 {
618 	int rval;
619 	check_param(EINVAL, w, "writer");
620 	check_param(EINVAL, value, "value");
621 
622 	rval = avro_value_write(w->datum_writer, value);
623 	if (rval) {
624 		check(rval, file_write_block(w));
625 		rval = avro_value_write(w->datum_writer, value);
626 		if (rval) {
627 			avro_set_error("Value too large for file block size");
628 			/* TODO: if the value encoder larger than our buffer,
629 			   just write a single large datum */
630 			return rval;
631 		}
632 	}
633 	w->block_count++;
634 	w->block_size = avro_writer_tell(w->datum_writer);
635 	return 0;
636 }
637 
638 int
avro_file_writer_append_encoded(avro_file_writer_t w,const void * buf,int64_t len)639 avro_file_writer_append_encoded(avro_file_writer_t w,
640 				const void *buf, int64_t len)
641 {
642 	int rval;
643 	check_param(EINVAL, w, "writer");
644 
645 	rval = avro_write(w->datum_writer, (void *) buf, len);
646 	if (rval) {
647 		check(rval, file_write_block(w));
648 		rval = avro_write(w->datum_writer, (void *) buf, len);
649 		if (rval) {
650 			avro_set_error("Value too large for file block size");
651 			/* TODO: if the value encoder larger than our buffer,
652 			   just write a single large datum */
653 			return rval;
654 		}
655 	}
656 	w->block_count++;
657 	w->block_size = avro_writer_tell(w->datum_writer);
658 	return 0;
659 }
660 
avro_file_writer_sync(avro_file_writer_t w)661 int avro_file_writer_sync(avro_file_writer_t w)
662 {
663 	return file_write_block(w);
664 }
665 
avro_file_writer_flush(avro_file_writer_t w)666 int avro_file_writer_flush(avro_file_writer_t w)
667 {
668 	int rval;
669 	check(rval, file_write_block(w));
670 	avro_writer_flush(w->writer);
671 	return 0;
672 }
673 
avro_file_writer_close(avro_file_writer_t w)674 int avro_file_writer_close(avro_file_writer_t w)
675 {
676 	int rval;
677 	check(rval, avro_file_writer_flush(w));
678 	avro_schema_decref(w->writers_schema);
679 	avro_writer_free(w->datum_writer);
680 	avro_writer_free(w->writer);
681 	avro_free(w->datum_buffer, w->datum_buffer_size);
682 	avro_codec_reset(w->codec);
683 	avro_freet(struct avro_codec_t_, w->codec);
684 	avro_freet(struct avro_file_writer_t_, w);
685 	return 0;
686 }
687 
avro_file_reader_read(avro_file_reader_t r,avro_schema_t readers_schema,avro_datum_t * datum)688 int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
689 			  avro_datum_t * datum)
690 {
691 	int rval;
692 	char sync[16];
693 
694 	check_param(EINVAL, r, "reader");
695 	check_param(EINVAL, datum, "datum");
696 
697 	/* This will be set to zero when an empty file is opened.
698 	 * Return EOF here when the user attempts to read. */
699 	if (r->blocks_total == 0) {
700 		return EOF;
701 	}
702 
703 	if (r->blocks_read == r->blocks_total) {
704 		check(rval, avro_read(r->reader, sync, sizeof(sync)));
705 		if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
706 			/* wrong sync bytes */
707 			avro_set_error("Incorrect sync bytes");
708 			return EILSEQ;
709 		}
710 		check(rval, file_read_block_count(r));
711 	}
712 
713 	check(rval,
714 	      avro_read_data(r->block_reader, r->writers_schema, readers_schema,
715 			     datum));
716 	r->blocks_read++;
717 
718 	return 0;
719 }
720 
721 int
avro_file_reader_read_value(avro_file_reader_t r,avro_value_t * value)722 avro_file_reader_read_value(avro_file_reader_t r, avro_value_t *value)
723 {
724 	int rval;
725 	char sync[16];
726 
727 	check_param(EINVAL, r, "reader");
728 	check_param(EINVAL, value, "value");
729 
730 	/* This will be set to zero when an empty file is opened.
731 	 * Return EOF here when the user attempts to read. */
732 	if (r->blocks_total == 0) {
733 		return EOF;
734 	}
735 
736 	if (r->blocks_read == r->blocks_total) {
737 		/* reads sync bytes and buffers further bytes */
738 		check(rval, avro_read(r->reader, sync, sizeof(sync)));
739 		if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
740 			/* wrong sync bytes */
741 			avro_set_error("Incorrect sync bytes");
742 			return EILSEQ;
743 		}
744 
745 		check(rval, file_read_block_count(r));
746 	}
747 
748 	check(rval, avro_value_read(r->block_reader, value));
749 	r->blocks_read++;
750 
751 	return 0;
752 }
753 
avro_file_reader_close(avro_file_reader_t reader)754 int avro_file_reader_close(avro_file_reader_t reader)
755 {
756 	avro_schema_decref(reader->writers_schema);
757 	avro_reader_free(reader->reader);
758 	avro_reader_free(reader->block_reader);
759 	avro_codec_reset(reader->codec);
760 	avro_freet(struct avro_codec_t_, reader->codec);
761 	if (reader->current_blockdata) {
762 		avro_free(reader->current_blockdata, reader->current_blocklen);
763 	}
764 	avro_freet(struct avro_file_reader_t_, reader);
765 	return 0;
766 }
767