1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to you under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 * implied. See the License for the specific language governing
15 * permissions and limitations under the License.
16 */
17
18 #include "avro_private.h"
19 #include "avro/allocation.h"
20 #include "avro/generic.h"
21 #include "avro/errors.h"
22 #include "avro/value.h"
23 #include "encoding.h"
24 #include "codec.h"
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <time.h>
30 #include <string.h>
31
32 struct avro_file_reader_t_ {
33 avro_schema_t writers_schema;
34 avro_reader_t reader;
35 avro_reader_t block_reader;
36 avro_codec_t codec;
37 char sync[16];
38 int64_t blocks_read;
39 int64_t blocks_total;
40 int64_t current_blocklen;
41 char * current_blockdata;
42 };
43
44 struct avro_file_writer_t_ {
45 avro_schema_t writers_schema;
46 avro_writer_t writer;
47 avro_codec_t codec;
48 char sync[16];
49 int block_count;
50 size_t block_size;
51 avro_writer_t datum_writer;
52 char* datum_buffer;
53 size_t datum_buffer_size;
54 char schema_buf[64 * 1024];
55 };
56
57 #define DEFAULT_BLOCK_SIZE 16 * 1024
58
59 /* Note: We should not just read /dev/random here, because it may not
60 * exist on all platforms e.g. Win32.
61 */
generate_sync(avro_file_writer_t w)62 static void generate_sync(avro_file_writer_t w)
63 {
64 unsigned int i;
65 srand(time(NULL));
66 for (i = 0; i < sizeof(w->sync); i++) {
67 w->sync[i] = ((double)rand() / (RAND_MAX + 1.0)) * 255;
68 }
69 }
70
write_sync(avro_file_writer_t w)71 static int write_sync(avro_file_writer_t w)
72 {
73 return avro_write(w->writer, w->sync, sizeof(w->sync));
74 }
75
write_header(avro_file_writer_t w)76 static int write_header(avro_file_writer_t w)
77 {
78 int rval;
79 uint8_t version = 1;
80 /* TODO: remove this static buffer */
81 avro_writer_t schema_writer;
82 const avro_encoding_t *enc = &avro_binary_encoding;
83 int64_t schema_len;
84
85 /* Generate random sync */
86 generate_sync(w);
87
88 check(rval, avro_write(w->writer, "Obj", 3));
89 check(rval, avro_write(w->writer, &version, 1));
90
91 check(rval, enc->write_long(w->writer, 2));
92 check(rval, enc->write_string(w->writer, "avro.codec"));
93 check(rval, enc->write_bytes(w->writer, w->codec->name, strlen(w->codec->name)));
94 check(rval, enc->write_string(w->writer, "avro.schema"));
95 schema_writer =
96 avro_writer_memory(&w->schema_buf[0], sizeof(w->schema_buf));
97 rval = avro_schema_to_json(w->writers_schema, schema_writer);
98 if (rval) {
99 avro_writer_free(schema_writer);
100 return rval;
101 }
102 schema_len = avro_writer_tell(schema_writer);
103 avro_writer_free(schema_writer);
104 check(rval,
105 enc->write_bytes(w->writer, w->schema_buf, schema_len));
106 check(rval, enc->write_long(w->writer, 0));
107 return write_sync(w);
108 }
109
110 static int
file_writer_init_fp(FILE * fp,const char * path,int should_close,const char * mode,avro_file_writer_t w)111 file_writer_init_fp(FILE *fp, const char *path, int should_close, const char *mode, avro_file_writer_t w)
112 {
113 if (!fp) {
114 fp = fopen(path, mode);
115 }
116
117 if (!fp) {
118 avro_set_error("Cannot open file for %s", path);
119 return ENOMEM;
120 }
121 w->writer = avro_writer_file_fp(fp, should_close);
122 if (!w->writer) {
123 if (should_close) {
124 fclose(fp);
125 }
126 avro_set_error("Cannot create file writer for %s", path);
127 return ENOMEM;
128 }
129 return 0;
130 }
131
132 /* Exclusive file writing is supported by GCC using the mode
133 * "wx". Win32 does not support exclusive file writing, so for win32
134 * fall back to the non-exclusive file writing.
135 */
136 #ifdef _WIN32
137 #define EXCLUSIVE_WRITE_MODE "wb"
138 #else
139 #define EXCLUSIVE_WRITE_MODE "wbx"
140 #endif
141
142 static int
file_writer_create(FILE * fp,const char * path,int should_close,avro_schema_t schema,avro_file_writer_t w,size_t block_size)143 file_writer_create(FILE *fp, const char *path, int should_close, avro_schema_t schema, avro_file_writer_t w, size_t block_size)
144 {
145 int rval;
146
147 w->block_count = 0;
148 rval = file_writer_init_fp(fp, path, should_close, EXCLUSIVE_WRITE_MODE, w);
149 if (rval) {
150 check(rval, file_writer_init_fp(fp, path, should_close, "wb", w));
151 }
152
153 w->datum_buffer_size = block_size;
154 w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
155
156 if(!w->datum_buffer) {
157 avro_set_error("Could not allocate datum buffer\n");
158 avro_writer_free(w->writer);
159 return ENOMEM;
160 }
161
162 w->datum_writer =
163 avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
164 if (!w->datum_writer) {
165 avro_set_error("Cannot create datum writer for file %s", path);
166 avro_writer_free(w->writer);
167 avro_free(w->datum_buffer, w->datum_buffer_size);
168 return ENOMEM;
169 }
170
171 w->writers_schema = avro_schema_incref(schema);
172 return write_header(w);
173 }
174
175 int
avro_file_writer_create(const char * path,avro_schema_t schema,avro_file_writer_t * writer)176 avro_file_writer_create(const char *path, avro_schema_t schema,
177 avro_file_writer_t * writer)
178 {
179 return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, "null", 0);
180 }
181
182 int
avro_file_writer_create_fp(FILE * fp,const char * path,int should_close,avro_schema_t schema,avro_file_writer_t * writer)183 avro_file_writer_create_fp(FILE *fp, const char *path, int should_close, avro_schema_t schema,
184 avro_file_writer_t * writer)
185 {
186 return avro_file_writer_create_with_codec_fp(fp, path, should_close, schema, writer, "null", 0);
187 }
188
avro_file_writer_create_with_codec(const char * path,avro_schema_t schema,avro_file_writer_t * writer,const char * codec,size_t block_size)189 int avro_file_writer_create_with_codec(const char *path,
190 avro_schema_t schema, avro_file_writer_t * writer,
191 const char *codec, size_t block_size)
192 {
193 return avro_file_writer_create_with_codec_fp(NULL, path, 1, schema, writer, codec, block_size);
194 }
195
avro_file_writer_create_with_codec_fp(FILE * fp,const char * path,int should_close,avro_schema_t schema,avro_file_writer_t * writer,const char * codec,size_t block_size)196 int avro_file_writer_create_with_codec_fp(FILE *fp, const char *path, int should_close,
197 avro_schema_t schema, avro_file_writer_t * writer,
198 const char *codec, size_t block_size)
199 {
200 avro_file_writer_t w;
201 int rval;
202 check_param(EINVAL, path, "path");
203 check_param(EINVAL, is_avro_schema(schema), "schema");
204 check_param(EINVAL, writer, "writer");
205 check_param(EINVAL, codec, "codec");
206
207 if (block_size == 0) {
208 block_size = DEFAULT_BLOCK_SIZE;
209 }
210
211 w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
212 if (!w) {
213 avro_set_error("Cannot allocate new file writer");
214 return ENOMEM;
215 }
216 w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
217 if (!w->codec) {
218 avro_set_error("Cannot allocate new codec");
219 avro_freet(struct avro_file_writer_t_, w);
220 return ENOMEM;
221 }
222 rval = avro_codec(w->codec, codec);
223 if (rval) {
224 avro_codec_reset(w->codec);
225 avro_freet(struct avro_codec_t_, w->codec);
226 avro_freet(struct avro_file_writer_t_, w);
227 return rval;
228 }
229 rval = file_writer_create(fp, path, should_close, schema, w, block_size);
230 if (rval) {
231 avro_codec_reset(w->codec);
232 avro_freet(struct avro_codec_t_, w->codec);
233 avro_freet(struct avro_file_writer_t_, w);
234 return rval;
235 }
236 *writer = w;
237
238 return 0;
239 }
240
file_read_header(avro_reader_t reader,avro_schema_t * writers_schema,avro_codec_t codec,char * sync,int synclen)241 static int file_read_header(avro_reader_t reader,
242 avro_schema_t * writers_schema, avro_codec_t codec,
243 char *sync, int synclen)
244 {
245 int rval;
246 avro_schema_t meta_schema;
247 avro_schema_t meta_values_schema;
248 avro_value_iface_t *meta_iface;
249 avro_value_t meta;
250 char magic[4];
251 avro_value_t codec_val;
252 avro_value_t schema_bytes;
253 const void *p;
254 size_t len;
255
256 check(rval, avro_read(reader, magic, sizeof(magic)));
257 if (magic[0] != 'O' || magic[1] != 'b' || magic[2] != 'j'
258 || magic[3] != 1) {
259 avro_set_error("Incorrect Avro container file magic number");
260 return EILSEQ;
261 }
262
263 meta_values_schema = avro_schema_bytes();
264 meta_schema = avro_schema_map(meta_values_schema);
265 meta_iface = avro_generic_class_from_schema(meta_schema);
266 if (meta_iface == NULL) {
267 return EILSEQ;
268 }
269 check(rval, avro_generic_value_new(meta_iface, &meta));
270 rval = avro_value_read(reader, &meta);
271 if (rval) {
272 avro_prefix_error("Cannot read file header: ");
273 return EILSEQ;
274 }
275 avro_schema_decref(meta_schema);
276
277 rval = avro_value_get_by_name(&meta, "avro.codec", &codec_val, NULL);
278 if (rval) {
279 if (avro_codec(codec, NULL) != 0) {
280 avro_set_error("Codec not specified in header and unable to set 'null' codec");
281 avro_value_decref(&meta);
282 return EILSEQ;
283 }
284 } else {
285 const void *buf;
286 size_t size;
287 char codec_name[11];
288
289 avro_type_t type = avro_value_get_type(&codec_val);
290
291 if (type != AVRO_BYTES) {
292 avro_set_error("Value type of codec is unexpected");
293 avro_value_decref(&meta);
294 return EILSEQ;
295 }
296
297 avro_value_get_bytes(&codec_val, &buf, &size);
298 memset(codec_name, 0, sizeof(codec_name));
299 strncpy(codec_name, (const char *) buf, size < 10 ? size : 10);
300
301 if (avro_codec(codec, codec_name) != 0) {
302 avro_set_error("File header contains an unknown codec");
303 avro_value_decref(&meta);
304 return EILSEQ;
305 }
306 }
307
308 rval = avro_value_get_by_name(&meta, "avro.schema", &schema_bytes, NULL);
309 if (rval) {
310 avro_set_error("File header doesn't contain a schema");
311 avro_value_decref(&meta);
312 return EILSEQ;
313 }
314
315 avro_value_get_bytes(&schema_bytes, &p, &len);
316 rval = avro_schema_from_json_length((const char *) p, len, writers_schema);
317 if (rval) {
318 avro_prefix_error("Cannot parse file header: ");
319 avro_value_decref(&meta);
320 return rval;
321 }
322
323 avro_value_decref(&meta);
324 avro_value_iface_decref(meta_iface);
325 return avro_read(reader, sync, synclen);
326 }
327
328 static int
file_writer_open(const char * path,avro_file_writer_t w,size_t block_size)329 file_writer_open(const char *path, avro_file_writer_t w, size_t block_size)
330 {
331 int rval;
332 FILE *fp;
333 avro_reader_t reader;
334
335 /* Open for read AND write */
336 fp = fopen(path, "r+b");
337 if (!fp) {
338 avro_set_error("Error opening file: %s",
339 strerror(errno));
340 return errno;
341 }
342
343 /* Don`t close the underlying file descriptor, logrotate can
344 * vanish it from sight. */
345 reader = avro_reader_file_fp(fp, 0);
346 if (!reader) {
347 fclose(fp);
348 avro_set_error("Cannot create file reader for %s", path);
349 return ENOMEM;
350 }
351 rval =
352 file_read_header(reader, &w->writers_schema, w->codec, w->sync,
353 sizeof(w->sync));
354
355 avro_reader_free(reader);
356 if (rval) {
357 fclose(fp);
358 return rval;
359 }
360
361 w->block_count = 0;
362
363 /* Position to end of file and get ready to write */
364 fseek(fp, 0, SEEK_END);
365
366 w->writer = avro_writer_file(fp);
367 if (!w->writer) {
368 fclose(fp);
369 avro_set_error("Cannot create file writer for %s", path);
370 return ENOMEM;
371 }
372
373 if (block_size == 0) {
374 block_size = DEFAULT_BLOCK_SIZE;
375 }
376
377 w->datum_buffer_size = block_size;
378 w->datum_buffer = (char *) avro_malloc(w->datum_buffer_size);
379
380 if(!w->datum_buffer) {
381 avro_set_error("Could not allocate datum buffer\n");
382 avro_writer_free(w->writer);
383 return ENOMEM;
384 }
385
386 w->datum_writer =
387 avro_writer_memory(w->datum_buffer, w->datum_buffer_size);
388 if (!w->datum_writer) {
389 avro_set_error("Cannot create datum writer for file %s", path);
390 avro_writer_free(w->writer);
391 avro_free(w->datum_buffer, w->datum_buffer_size);
392 return ENOMEM;
393 }
394
395 return 0;
396 }
397
398 int
avro_file_writer_open_bs(const char * path,avro_file_writer_t * writer,size_t block_size)399 avro_file_writer_open_bs(const char *path, avro_file_writer_t * writer,
400 size_t block_size)
401 {
402 avro_file_writer_t w;
403 int rval;
404 check_param(EINVAL, path, "path");
405 check_param(EINVAL, writer, "writer");
406
407 w = (avro_file_writer_t) avro_new(struct avro_file_writer_t_);
408 if (!w) {
409 avro_set_error("Cannot create new file writer for %s", path);
410 return ENOMEM;
411 }
412 w->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
413 if (!w->codec) {
414 avro_set_error("Cannot allocate new codec");
415 avro_freet(struct avro_file_writer_t_, w);
416 return ENOMEM;
417 }
418 avro_codec(w->codec, NULL);
419 rval = file_writer_open(path, w, block_size);
420 if (rval) {
421 avro_codec_reset(w->codec);
422 avro_freet(struct avro_codec_t_, w->codec);
423 avro_freet(struct avro_file_writer_t_, w);
424 return rval;
425 }
426
427 *writer = w;
428 return 0;
429 }
430
431 int
avro_file_writer_open(const char * path,avro_file_writer_t * writer)432 avro_file_writer_open(const char *path, avro_file_writer_t * writer)
433 {
434 return avro_file_writer_open_bs(path, writer, 0);
435 }
436
file_read_block_count(avro_file_reader_t r)437 static int file_read_block_count(avro_file_reader_t r)
438 {
439 int rval;
440 int64_t len;
441 const avro_encoding_t *enc = &avro_binary_encoding;
442
443 /* For a correctly formatted file, EOF will occur here */
444 rval = enc->read_long(r->reader, &r->blocks_total);
445
446 if (rval == EILSEQ && avro_reader_is_eof(r->reader)) {
447 return EOF;
448 }
449
450 check_prefix(rval, rval,
451 "Cannot read file block count: ");
452 check_prefix(rval, enc->read_long(r->reader, &len),
453 "Cannot read file block size: ");
454
455 if (r->current_blockdata && len > r->current_blocklen) {
456 r->current_blockdata = (char *) avro_realloc(r->current_blockdata, r->current_blocklen, len);
457 r->current_blocklen = len;
458 } else if (!r->current_blockdata) {
459 r->current_blockdata = (char *) avro_malloc(len);
460 r->current_blocklen = len;
461 }
462
463 if (len > 0) {
464 check_prefix(rval, avro_read(r->reader, r->current_blockdata, len),
465 "Cannot read file block: ");
466
467 check_prefix(rval, avro_codec_decode(r->codec, r->current_blockdata, len),
468 "Cannot decode file block: ");
469 }
470
471 avro_reader_memory_set_source(r->block_reader, (const char *) r->codec->block_data, r->codec->used_size);
472
473 r->blocks_read = 0;
474 return 0;
475 }
476
avro_file_reader_fp(FILE * fp,const char * path,int should_close,avro_file_reader_t * reader)477 int avro_file_reader_fp(FILE *fp, const char *path, int should_close,
478 avro_file_reader_t * reader)
479 {
480 int rval;
481 avro_file_reader_t r = (avro_file_reader_t) avro_new(struct avro_file_reader_t_);
482 if (!r) {
483 if (should_close) {
484 fclose(fp);
485 }
486 avro_set_error("Cannot allocate file reader for %s", path);
487 return ENOMEM;
488 }
489
490 r->reader = avro_reader_file_fp(fp, should_close);
491 if (!r->reader) {
492 if (should_close) {
493 fclose(fp);
494 }
495 avro_set_error("Cannot allocate reader for file %s", path);
496 avro_freet(struct avro_file_reader_t_, r);
497 return ENOMEM;
498 }
499 r->block_reader = avro_reader_memory(0, 0);
500 if (!r->block_reader) {
501 avro_set_error("Cannot allocate block reader for file %s", path);
502 avro_reader_free(r->reader);
503 avro_freet(struct avro_file_reader_t_, r);
504 return ENOMEM;
505 }
506
507 r->codec = (avro_codec_t) avro_new(struct avro_codec_t_);
508 if (!r->codec) {
509 avro_set_error("Could not allocate codec for file %s", path);
510 avro_reader_free(r->reader);
511 avro_freet(struct avro_file_reader_t_, r);
512 return ENOMEM;
513 }
514 avro_codec(r->codec, NULL);
515
516 rval = file_read_header(r->reader, &r->writers_schema, r->codec,
517 r->sync, sizeof(r->sync));
518 if (rval) {
519 avro_reader_free(r->reader);
520 avro_codec_reset(r->codec);
521 avro_freet(struct avro_codec_t_, r->codec);
522 avro_freet(struct avro_file_reader_t_, r);
523 return rval;
524 }
525
526 r->current_blockdata = NULL;
527 r->current_blocklen = 0;
528
529 rval = file_read_block_count(r);
530 if (rval == EOF) {
531 r->blocks_total = 0;
532 } else if (rval) {
533 avro_reader_free(r->reader);
534 avro_codec_reset(r->codec);
535 avro_freet(struct avro_codec_t_, r->codec);
536 avro_freet(struct avro_file_reader_t_, r);
537 return rval;
538 }
539
540 *reader = r;
541 return 0;
542 }
543
avro_file_reader(const char * path,avro_file_reader_t * reader)544 int avro_file_reader(const char *path, avro_file_reader_t * reader)
545 {
546 FILE *fp;
547
548 fp = fopen(path, "rb");
549 if (!fp) {
550 return errno;
551 }
552
553 return avro_file_reader_fp(fp, path, 1, reader);
554 }
555
556 avro_schema_t
avro_file_reader_get_writer_schema(avro_file_reader_t r)557 avro_file_reader_get_writer_schema(avro_file_reader_t r)
558 {
559 check_param(NULL, r, "reader");
560 return avro_schema_incref(r->writers_schema);
561 }
562
file_write_block(avro_file_writer_t w)563 static int file_write_block(avro_file_writer_t w)
564 {
565 const avro_encoding_t *enc = &avro_binary_encoding;
566 int rval;
567
568 if (w->block_count) {
569 /* Write the block count */
570 check_prefix(rval, enc->write_long(w->writer, w->block_count),
571 "Cannot write file block count: ");
572 /* Encode the block */
573 check_prefix(rval, avro_codec_encode(w->codec, w->datum_buffer, w->block_size),
574 "Cannot encode file block: ");
575 /* Write the block length */
576 check_prefix(rval, enc->write_long(w->writer, w->codec->used_size),
577 "Cannot write file block size: ");
578 /* Write the block */
579 check_prefix(rval, avro_write(w->writer, w->codec->block_data, w->codec->used_size),
580 "Cannot write file block: ");
581 /* Write the sync marker */
582 check_prefix(rval, write_sync(w),
583 "Cannot write sync marker: ");
584 /* Reset the datum writer */
585 avro_writer_reset(w->datum_writer);
586 w->block_count = 0;
587 w->block_size = 0;
588 }
589 return 0;
590 }
591
avro_file_writer_append(avro_file_writer_t w,avro_datum_t datum)592 int avro_file_writer_append(avro_file_writer_t w, avro_datum_t datum)
593 {
594 int rval;
595 check_param(EINVAL, w, "writer");
596 check_param(EINVAL, datum, "datum");
597
598 rval = avro_write_data(w->datum_writer, w->writers_schema, datum);
599 if (rval) {
600 check(rval, file_write_block(w));
601 rval =
602 avro_write_data(w->datum_writer, w->writers_schema, datum);
603 if (rval) {
604 avro_set_error("Datum too large for file block size");
605 /* TODO: if the datum encoder larger than our buffer,
606 just write a single large datum */
607 return rval;
608 }
609 }
610 w->block_count++;
611 w->block_size = avro_writer_tell(w->datum_writer);
612 return 0;
613 }
614
615 int
avro_file_writer_append_value(avro_file_writer_t w,avro_value_t * value)616 avro_file_writer_append_value(avro_file_writer_t w, avro_value_t *value)
617 {
618 int rval;
619 check_param(EINVAL, w, "writer");
620 check_param(EINVAL, value, "value");
621
622 rval = avro_value_write(w->datum_writer, value);
623 if (rval) {
624 check(rval, file_write_block(w));
625 rval = avro_value_write(w->datum_writer, value);
626 if (rval) {
627 avro_set_error("Value too large for file block size");
628 /* TODO: if the value encoder larger than our buffer,
629 just write a single large datum */
630 return rval;
631 }
632 }
633 w->block_count++;
634 w->block_size = avro_writer_tell(w->datum_writer);
635 return 0;
636 }
637
638 int
avro_file_writer_append_encoded(avro_file_writer_t w,const void * buf,int64_t len)639 avro_file_writer_append_encoded(avro_file_writer_t w,
640 const void *buf, int64_t len)
641 {
642 int rval;
643 check_param(EINVAL, w, "writer");
644
645 rval = avro_write(w->datum_writer, (void *) buf, len);
646 if (rval) {
647 check(rval, file_write_block(w));
648 rval = avro_write(w->datum_writer, (void *) buf, len);
649 if (rval) {
650 avro_set_error("Value too large for file block size");
651 /* TODO: if the value encoder larger than our buffer,
652 just write a single large datum */
653 return rval;
654 }
655 }
656 w->block_count++;
657 w->block_size = avro_writer_tell(w->datum_writer);
658 return 0;
659 }
660
avro_file_writer_sync(avro_file_writer_t w)661 int avro_file_writer_sync(avro_file_writer_t w)
662 {
663 return file_write_block(w);
664 }
665
avro_file_writer_flush(avro_file_writer_t w)666 int avro_file_writer_flush(avro_file_writer_t w)
667 {
668 int rval;
669 check(rval, file_write_block(w));
670 avro_writer_flush(w->writer);
671 return 0;
672 }
673
avro_file_writer_close(avro_file_writer_t w)674 int avro_file_writer_close(avro_file_writer_t w)
675 {
676 int rval;
677 check(rval, avro_file_writer_flush(w));
678 avro_schema_decref(w->writers_schema);
679 avro_writer_free(w->datum_writer);
680 avro_writer_free(w->writer);
681 avro_free(w->datum_buffer, w->datum_buffer_size);
682 avro_codec_reset(w->codec);
683 avro_freet(struct avro_codec_t_, w->codec);
684 avro_freet(struct avro_file_writer_t_, w);
685 return 0;
686 }
687
avro_file_reader_read(avro_file_reader_t r,avro_schema_t readers_schema,avro_datum_t * datum)688 int avro_file_reader_read(avro_file_reader_t r, avro_schema_t readers_schema,
689 avro_datum_t * datum)
690 {
691 int rval;
692 char sync[16];
693
694 check_param(EINVAL, r, "reader");
695 check_param(EINVAL, datum, "datum");
696
697 /* This will be set to zero when an empty file is opened.
698 * Return EOF here when the user attempts to read. */
699 if (r->blocks_total == 0) {
700 return EOF;
701 }
702
703 if (r->blocks_read == r->blocks_total) {
704 check(rval, avro_read(r->reader, sync, sizeof(sync)));
705 if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
706 /* wrong sync bytes */
707 avro_set_error("Incorrect sync bytes");
708 return EILSEQ;
709 }
710 check(rval, file_read_block_count(r));
711 }
712
713 check(rval,
714 avro_read_data(r->block_reader, r->writers_schema, readers_schema,
715 datum));
716 r->blocks_read++;
717
718 return 0;
719 }
720
721 int
avro_file_reader_read_value(avro_file_reader_t r,avro_value_t * value)722 avro_file_reader_read_value(avro_file_reader_t r, avro_value_t *value)
723 {
724 int rval;
725 char sync[16];
726
727 check_param(EINVAL, r, "reader");
728 check_param(EINVAL, value, "value");
729
730 /* This will be set to zero when an empty file is opened.
731 * Return EOF here when the user attempts to read. */
732 if (r->blocks_total == 0) {
733 return EOF;
734 }
735
736 if (r->blocks_read == r->blocks_total) {
737 /* reads sync bytes and buffers further bytes */
738 check(rval, avro_read(r->reader, sync, sizeof(sync)));
739 if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
740 /* wrong sync bytes */
741 avro_set_error("Incorrect sync bytes");
742 return EILSEQ;
743 }
744
745 check(rval, file_read_block_count(r));
746 }
747
748 check(rval, avro_value_read(r->block_reader, value));
749 r->blocks_read++;
750
751 return 0;
752 }
753
avro_file_reader_close(avro_file_reader_t reader)754 int avro_file_reader_close(avro_file_reader_t reader)
755 {
756 avro_schema_decref(reader->writers_schema);
757 avro_reader_free(reader->reader);
758 avro_reader_free(reader->block_reader);
759 avro_codec_reset(reader->codec);
760 avro_freet(struct avro_codec_t_, reader->codec);
761 if (reader->current_blockdata) {
762 avro_free(reader->current_blockdata, reader->current_blocklen);
763 }
764 avro_freet(struct avro_file_reader_t_, reader);
765 return 0;
766 }
767