1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to you under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 * implied. See the License for the specific language governing
15 * permissions and limitations under the License.
16 */
17
18 #include "avro/allocation.h"
19 #include "avro/refcount.h"
20 #include "avro/errors.h"
21 #include "avro/io.h"
22 #include "avro_private.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <string.h>
27 #include "dump.h"
28
29 enum avro_io_type_t {
30 AVRO_FILE_IO,
31 AVRO_MEMORY_IO
32 };
33 typedef enum avro_io_type_t avro_io_type_t;
34
35 struct avro_reader_t_ {
36 avro_io_type_t type;
37 volatile int refcount;
38 };
39
40 struct avro_writer_t_ {
41 avro_io_type_t type;
42 volatile int refcount;
43 };
44
45 struct _avro_reader_file_t {
46 struct avro_reader_t_ reader;
47 FILE *fp;
48 int should_close;
49 char *cur;
50 char *end;
51 char buffer[4096];
52 };
53
54 struct _avro_writer_file_t {
55 struct avro_writer_t_ writer;
56 FILE *fp;
57 int should_close;
58 };
59
60 struct _avro_reader_memory_t {
61 struct avro_reader_t_ reader;
62 const char *buf;
63 int64_t len;
64 int64_t read;
65 };
66
67 struct _avro_writer_memory_t {
68 struct avro_writer_t_ writer;
69 const char *buf;
70 int64_t len;
71 int64_t written;
72 };
73
74 #define avro_io_typeof(obj) ((obj)->type)
75 #define is_memory_io(obj) (obj && avro_io_typeof(obj) == AVRO_MEMORY_IO)
76 #define is_file_io(obj) (obj && avro_io_typeof(obj) == AVRO_FILE_IO)
77
78 #define avro_reader_to_memory(reader_) container_of(reader_, struct _avro_reader_memory_t, reader)
79 #define avro_reader_to_file(reader_) container_of(reader_, struct _avro_reader_file_t, reader)
80 #define avro_writer_to_memory(writer_) container_of(writer_, struct _avro_writer_memory_t, writer)
81 #define avro_writer_to_file(writer_) container_of(writer_, struct _avro_writer_file_t, writer)
82
reader_init(avro_reader_t reader,avro_io_type_t type)83 static void reader_init(avro_reader_t reader, avro_io_type_t type)
84 {
85 reader->type = type;
86 avro_refcount_set(&reader->refcount, 1);
87 }
88
writer_init(avro_writer_t writer,avro_io_type_t type)89 static void writer_init(avro_writer_t writer, avro_io_type_t type)
90 {
91 writer->type = type;
92 avro_refcount_set(&writer->refcount, 1);
93 }
94
avro_reader_file_fp(FILE * fp,int should_close)95 avro_reader_t avro_reader_file_fp(FILE * fp, int should_close)
96 {
97 struct _avro_reader_file_t *file_reader =
98 (struct _avro_reader_file_t *) avro_new(struct _avro_reader_file_t);
99 if (!file_reader) {
100 avro_set_error("Cannot allocate new file reader");
101 return NULL;
102 }
103 memset(file_reader, 0, sizeof(struct _avro_reader_file_t));
104 file_reader->fp = fp;
105 file_reader->should_close = should_close;
106 reader_init(&file_reader->reader, AVRO_FILE_IO);
107 return &file_reader->reader;
108 }
109
avro_reader_file(FILE * fp)110 avro_reader_t avro_reader_file(FILE * fp)
111 {
112 return avro_reader_file_fp(fp, 1);
113 }
114
avro_writer_file_fp(FILE * fp,int should_close)115 avro_writer_t avro_writer_file_fp(FILE * fp, int should_close)
116 {
117 struct _avro_writer_file_t *file_writer =
118 (struct _avro_writer_file_t *) avro_new(struct _avro_writer_file_t);
119 if (!file_writer) {
120 avro_set_error("Cannot allocate new file writer");
121 return NULL;
122 }
123 file_writer->fp = fp;
124 file_writer->should_close = should_close;
125 writer_init(&file_writer->writer, AVRO_FILE_IO);
126 return &file_writer->writer;
127 }
128
avro_writer_file(FILE * fp)129 avro_writer_t avro_writer_file(FILE * fp)
130 {
131 return avro_writer_file_fp(fp, 1);
132 }
133
avro_reader_memory(const char * buf,int64_t len)134 avro_reader_t avro_reader_memory(const char *buf, int64_t len)
135 {
136 struct _avro_reader_memory_t *mem_reader =
137 (struct _avro_reader_memory_t *) avro_new(struct _avro_reader_memory_t);
138 if (!mem_reader) {
139 avro_set_error("Cannot allocate new memory reader");
140 return NULL;
141 }
142 mem_reader->buf = buf;
143 mem_reader->len = len;
144 mem_reader->read = 0;
145 reader_init(&mem_reader->reader, AVRO_MEMORY_IO);
146 return &mem_reader->reader;
147 }
148
149 void
avro_reader_memory_set_source(avro_reader_t reader,const char * buf,int64_t len)150 avro_reader_memory_set_source(avro_reader_t reader, const char *buf, int64_t len)
151 {
152 if (is_memory_io(reader)) {
153 struct _avro_reader_memory_t *mem_reader = avro_reader_to_memory(reader);
154 mem_reader->buf = buf;
155 mem_reader->len = len;
156 mem_reader->read = 0;
157 }
158 }
159
avro_writer_memory(const char * buf,int64_t len)160 avro_writer_t avro_writer_memory(const char *buf, int64_t len)
161 {
162 struct _avro_writer_memory_t *mem_writer =
163 (struct _avro_writer_memory_t *) avro_new(struct _avro_writer_memory_t);
164 if (!mem_writer) {
165 avro_set_error("Cannot allocate new memory writer");
166 return NULL;
167 }
168 mem_writer->buf = buf;
169 mem_writer->len = len;
170 mem_writer->written = 0;
171 writer_init(&mem_writer->writer, AVRO_MEMORY_IO);
172 return &mem_writer->writer;
173 }
174
175 void
avro_writer_memory_set_dest(avro_writer_t writer,const char * buf,int64_t len)176 avro_writer_memory_set_dest(avro_writer_t writer, const char *buf, int64_t len)
177 {
178 if (is_memory_io(writer)) {
179 struct _avro_writer_memory_t *mem_writer = avro_writer_to_memory(writer);
180 mem_writer->buf = buf;
181 mem_writer->len = len;
182 mem_writer->written = 0;
183 }
184 }
185
186 static int
avro_read_memory(struct _avro_reader_memory_t * reader,void * buf,int64_t len)187 avro_read_memory(struct _avro_reader_memory_t *reader, void *buf, int64_t len)
188 {
189 if (len > 0) {
190 if ((reader->len - reader->read) < len) {
191 avro_prefix_error("Cannot read %" PRIsz " bytes from memory buffer",
192 (size_t) len);
193 return ENOSPC;
194 }
195 memcpy(buf, reader->buf + reader->read, len);
196 reader->read += len;
197 }
198 return 0;
199 }
200
201 #define bytes_available(reader) (reader->end - reader->cur)
202 #define buffer_reset(reader) {reader->cur = reader->end = reader->buffer;}
203
204 static int
avro_read_file(struct _avro_reader_file_t * reader,void * buf,int64_t len)205 avro_read_file(struct _avro_reader_file_t *reader, void *buf, int64_t len)
206 {
207 int64_t needed = len;
208 char *p = (char *) buf;
209 int rval;
210
211 if (len == 0) {
212 return 0;
213 }
214
215 if (needed > (int64_t) sizeof(reader->buffer)) {
216 if (bytes_available(reader) > 0) {
217 memcpy(p, reader->cur, bytes_available(reader));
218 p += bytes_available(reader);
219 needed -= bytes_available(reader);
220 buffer_reset(reader);
221 }
222 rval = fread(p, 1, needed, reader->fp);
223 if (rval != needed) {
224 avro_set_error("Cannot read %" PRIsz " bytes from file",
225 (size_t) needed);
226 return EILSEQ;
227 }
228 return 0;
229 } else if (needed <= bytes_available(reader)) {
230 memcpy(p, reader->cur, needed);
231 reader->cur += needed;
232 return 0;
233 } else {
234 memcpy(p, reader->cur, bytes_available(reader));
235 p += bytes_available(reader);
236 needed -= bytes_available(reader);
237
238 rval =
239 fread(reader->buffer, 1, sizeof(reader->buffer),
240 reader->fp);
241 if (rval == 0) {
242 avro_set_error("Cannot read %" PRIsz " bytes from file",
243 (size_t) needed);
244 return EILSEQ;
245 }
246 reader->cur = reader->buffer;
247 reader->end = reader->cur + rval;
248
249 if (bytes_available(reader) < needed) {
250 avro_set_error("Cannot read %" PRIsz " bytes from file",
251 (size_t) needed);
252 return EILSEQ;
253 }
254 memcpy(p, reader->cur, needed);
255 reader->cur += needed;
256 return 0;
257 }
258 avro_set_error("Cannot read %" PRIsz " bytes from file",
259 (size_t) needed);
260 return EILSEQ;
261 }
262
avro_read(avro_reader_t reader,void * buf,int64_t len)263 int avro_read(avro_reader_t reader, void *buf, int64_t len)
264 {
265 if (buf && len >= 0) {
266 if (is_memory_io(reader)) {
267 return avro_read_memory(avro_reader_to_memory(reader),
268 buf, len);
269 } else if (is_file_io(reader)) {
270 return avro_read_file(avro_reader_to_file(reader), buf,
271 len);
272 }
273 }
274 return EINVAL;
275 }
276
avro_skip_memory(struct _avro_reader_memory_t * reader,int64_t len)277 static int avro_skip_memory(struct _avro_reader_memory_t *reader, int64_t len)
278 {
279 if (len > 0) {
280 if ((reader->len - reader->read) < len) {
281 avro_set_error("Cannot skip %" PRIsz " bytes in memory buffer",
282 (size_t) len);
283 return ENOSPC;
284 }
285 reader->read += len;
286 }
287 return 0;
288 }
289
avro_skip_file(struct _avro_reader_file_t * reader,int64_t len)290 static int avro_skip_file(struct _avro_reader_file_t *reader, int64_t len)
291 {
292 int rval;
293 int64_t needed = len;
294
295 if (len == 0) {
296 return 0;
297 }
298 if (needed <= bytes_available(reader)) {
299 reader->cur += needed;
300 } else {
301 needed -= bytes_available(reader);
302 buffer_reset(reader);
303 rval = fseek(reader->fp, needed, SEEK_CUR);
304 if (rval < 0) {
305 avro_set_error("Cannot skip %" PRIsz " bytes in file",
306 (size_t) len);
307 return rval;
308 }
309 }
310 return 0;
311 }
312
avro_skip(avro_reader_t reader,int64_t len)313 int avro_skip(avro_reader_t reader, int64_t len)
314 {
315 if (len >= 0) {
316 if (is_memory_io(reader)) {
317 return avro_skip_memory(avro_reader_to_memory(reader),
318 len);
319 } else if (is_file_io(reader)) {
320 return avro_skip_file(avro_reader_to_file(reader), len);
321 }
322 }
323 return 0;
324 }
325
326 static int
avro_write_memory(struct _avro_writer_memory_t * writer,void * buf,int64_t len)327 avro_write_memory(struct _avro_writer_memory_t *writer, void *buf, int64_t len)
328 {
329 if (len) {
330 if ((writer->len - writer->written) < len) {
331 avro_set_error("Cannot write %" PRIsz " bytes in memory buffer",
332 (size_t) len);
333 return ENOSPC;
334 }
335 memcpy((void *)(writer->buf + writer->written), buf, len);
336 writer->written += len;
337 }
338 return 0;
339 }
340
341 static int
avro_write_file(struct _avro_writer_file_t * writer,void * buf,int64_t len)342 avro_write_file(struct _avro_writer_file_t *writer, void *buf, int64_t len)
343 {
344 int rval;
345 if (len > 0) {
346 rval = fwrite(buf, len, 1, writer->fp);
347 if (rval == 0) {
348 return EIO;
349 }
350 }
351 return 0;
352 }
353
avro_write(avro_writer_t writer,void * buf,int64_t len)354 int avro_write(avro_writer_t writer, void *buf, int64_t len)
355 {
356 if (buf && len >= 0) {
357 if (is_memory_io(writer)) {
358 return avro_write_memory(avro_writer_to_memory(writer),
359 buf, len);
360 } else if (is_file_io(writer)) {
361 return avro_write_file(avro_writer_to_file(writer), buf,
362 len);
363 }
364 }
365 return EINVAL;
366 }
367
368 void
avro_reader_reset(avro_reader_t reader)369 avro_reader_reset(avro_reader_t reader)
370 {
371 if (is_memory_io(reader)) {
372 avro_reader_to_memory(reader)->read = 0;
373 }
374 }
375
avro_writer_reset(avro_writer_t writer)376 void avro_writer_reset(avro_writer_t writer)
377 {
378 if (is_memory_io(writer)) {
379 avro_writer_to_memory(writer)->written = 0;
380 }
381 }
382
avro_writer_tell(avro_writer_t writer)383 int64_t avro_writer_tell(avro_writer_t writer)
384 {
385 if (is_memory_io(writer)) {
386 return avro_writer_to_memory(writer)->written;
387 }
388 return EINVAL;
389 }
390
avro_writer_flush(avro_writer_t writer)391 void avro_writer_flush(avro_writer_t writer)
392 {
393 if (is_file_io(writer)) {
394 fflush(avro_writer_to_file(writer)->fp);
395 }
396 }
397
avro_writer_dump(avro_writer_t writer,FILE * fp)398 void avro_writer_dump(avro_writer_t writer, FILE * fp)
399 {
400 if (is_memory_io(writer)) {
401 dump(fp, (char *)avro_writer_to_memory(writer)->buf,
402 avro_writer_to_memory(writer)->written);
403 }
404 }
405
avro_reader_dump(avro_reader_t reader,FILE * fp)406 void avro_reader_dump(avro_reader_t reader, FILE * fp)
407 {
408 if (is_memory_io(reader)) {
409 dump(fp, (char *)avro_reader_to_memory(reader)->buf,
410 avro_reader_to_memory(reader)->read);
411 }
412 }
413
avro_reader_free(avro_reader_t reader)414 void avro_reader_free(avro_reader_t reader)
415 {
416 if (is_memory_io(reader)) {
417 avro_freet(struct _avro_reader_memory_t, reader);
418 } else if (is_file_io(reader)) {
419 if (avro_reader_to_file(reader)->should_close) {
420 fclose(avro_reader_to_file(reader)->fp);
421 }
422 avro_freet(struct _avro_reader_file_t, reader);
423 }
424 }
425
avro_writer_free(avro_writer_t writer)426 void avro_writer_free(avro_writer_t writer)
427 {
428 if (is_memory_io(writer)) {
429 avro_freet(struct _avro_writer_memory_t, writer);
430 } else if (is_file_io(writer)) {
431 if (avro_writer_to_file(writer)->should_close) {
432 fclose(avro_writer_to_file(writer)->fp);
433 }
434 avro_freet(struct _avro_writer_file_t, writer);
435 }
436 }
437
avro_reader_is_eof(avro_reader_t reader)438 int avro_reader_is_eof(avro_reader_t reader)
439 {
440 if (is_file_io(reader)) {
441 struct _avro_reader_file_t *file = avro_reader_to_file(reader);
442 if (feof(file->fp)) {
443 return file->cur == file->end;
444 }
445 }
446 return 0;
447 }
448