1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to you under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  * https://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14  * implied.  See the License for the specific language governing
15  * permissions and limitations under the License.
16  */
17 
18 #include "avro/allocation.h"
19 #include "avro/refcount.h"
20 #include "avro/errors.h"
21 #include "avro/io.h"
22 #include "avro_private.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include <string.h>
27 #include "dump.h"
28 
29 enum avro_io_type_t {
30 	AVRO_FILE_IO,
31 	AVRO_MEMORY_IO
32 };
33 typedef enum avro_io_type_t avro_io_type_t;
34 
35 struct avro_reader_t_ {
36 	avro_io_type_t type;
37 	volatile int  refcount;
38 };
39 
40 struct avro_writer_t_ {
41 	avro_io_type_t type;
42 	volatile int  refcount;
43 };
44 
45 struct _avro_reader_file_t {
46 	struct avro_reader_t_ reader;
47 	FILE *fp;
48 	int should_close;
49 	char *cur;
50 	char *end;
51 	char buffer[4096];
52 };
53 
54 struct _avro_writer_file_t {
55 	struct avro_writer_t_ writer;
56 	FILE *fp;
57 	int should_close;
58 };
59 
60 struct _avro_reader_memory_t {
61 	struct avro_reader_t_ reader;
62 	const char *buf;
63 	int64_t len;
64 	int64_t read;
65 };
66 
67 struct _avro_writer_memory_t {
68 	struct avro_writer_t_ writer;
69 	const char *buf;
70 	int64_t len;
71 	int64_t written;
72 };
73 
74 #define avro_io_typeof(obj)      ((obj)->type)
75 #define is_memory_io(obj)        (obj && avro_io_typeof(obj) == AVRO_MEMORY_IO)
76 #define is_file_io(obj)          (obj && avro_io_typeof(obj) == AVRO_FILE_IO)
77 
78 #define avro_reader_to_memory(reader_)  container_of(reader_, struct _avro_reader_memory_t, reader)
79 #define avro_reader_to_file(reader_)    container_of(reader_, struct _avro_reader_file_t, reader)
80 #define avro_writer_to_memory(writer_)  container_of(writer_, struct _avro_writer_memory_t, writer)
81 #define avro_writer_to_file(writer_)    container_of(writer_, struct _avro_writer_file_t, writer)
82 
reader_init(avro_reader_t reader,avro_io_type_t type)83 static void reader_init(avro_reader_t reader, avro_io_type_t type)
84 {
85 	reader->type = type;
86 	avro_refcount_set(&reader->refcount, 1);
87 }
88 
writer_init(avro_writer_t writer,avro_io_type_t type)89 static void writer_init(avro_writer_t writer, avro_io_type_t type)
90 {
91 	writer->type = type;
92 	avro_refcount_set(&writer->refcount, 1);
93 }
94 
avro_reader_file_fp(FILE * fp,int should_close)95 avro_reader_t avro_reader_file_fp(FILE * fp, int should_close)
96 {
97 	struct _avro_reader_file_t *file_reader =
98 	    (struct _avro_reader_file_t *) avro_new(struct _avro_reader_file_t);
99 	if (!file_reader) {
100 		avro_set_error("Cannot allocate new file reader");
101 		return NULL;
102 	}
103 	memset(file_reader, 0, sizeof(struct _avro_reader_file_t));
104 	file_reader->fp = fp;
105 	file_reader->should_close = should_close;
106 	reader_init(&file_reader->reader, AVRO_FILE_IO);
107 	return &file_reader->reader;
108 }
109 
avro_reader_file(FILE * fp)110 avro_reader_t avro_reader_file(FILE * fp)
111 {
112 	return avro_reader_file_fp(fp, 1);
113 }
114 
avro_writer_file_fp(FILE * fp,int should_close)115 avro_writer_t avro_writer_file_fp(FILE * fp, int should_close)
116 {
117 	struct _avro_writer_file_t *file_writer =
118 	    (struct _avro_writer_file_t *) avro_new(struct _avro_writer_file_t);
119 	if (!file_writer) {
120 		avro_set_error("Cannot allocate new file writer");
121 		return NULL;
122 	}
123 	file_writer->fp = fp;
124 	file_writer->should_close = should_close;
125 	writer_init(&file_writer->writer, AVRO_FILE_IO);
126 	return &file_writer->writer;
127 }
128 
avro_writer_file(FILE * fp)129 avro_writer_t avro_writer_file(FILE * fp)
130 {
131 	return avro_writer_file_fp(fp, 1);
132 }
133 
avro_reader_memory(const char * buf,int64_t len)134 avro_reader_t avro_reader_memory(const char *buf, int64_t len)
135 {
136 	struct _avro_reader_memory_t *mem_reader =
137 	    (struct _avro_reader_memory_t *) avro_new(struct _avro_reader_memory_t);
138 	if (!mem_reader) {
139 		avro_set_error("Cannot allocate new memory reader");
140 		return NULL;
141 	}
142 	mem_reader->buf = buf;
143 	mem_reader->len = len;
144 	mem_reader->read = 0;
145 	reader_init(&mem_reader->reader, AVRO_MEMORY_IO);
146 	return &mem_reader->reader;
147 }
148 
149 void
avro_reader_memory_set_source(avro_reader_t reader,const char * buf,int64_t len)150 avro_reader_memory_set_source(avro_reader_t reader, const char *buf, int64_t len)
151 {
152 	if (is_memory_io(reader)) {
153 		struct _avro_reader_memory_t *mem_reader = avro_reader_to_memory(reader);
154 		mem_reader->buf = buf;
155 		mem_reader->len = len;
156 		mem_reader->read = 0;
157 	}
158 }
159 
avro_writer_memory(const char * buf,int64_t len)160 avro_writer_t avro_writer_memory(const char *buf, int64_t len)
161 {
162 	struct _avro_writer_memory_t *mem_writer =
163 	    (struct _avro_writer_memory_t *) avro_new(struct _avro_writer_memory_t);
164 	if (!mem_writer) {
165 		avro_set_error("Cannot allocate new memory writer");
166 		return NULL;
167 	}
168 	mem_writer->buf = buf;
169 	mem_writer->len = len;
170 	mem_writer->written = 0;
171 	writer_init(&mem_writer->writer, AVRO_MEMORY_IO);
172 	return &mem_writer->writer;
173 }
174 
175 void
avro_writer_memory_set_dest(avro_writer_t writer,const char * buf,int64_t len)176 avro_writer_memory_set_dest(avro_writer_t writer, const char *buf, int64_t len)
177 {
178 	if (is_memory_io(writer)) {
179 		struct _avro_writer_memory_t *mem_writer = avro_writer_to_memory(writer);
180 		mem_writer->buf = buf;
181 		mem_writer->len = len;
182 		mem_writer->written = 0;
183 	}
184 }
185 
186 static int
avro_read_memory(struct _avro_reader_memory_t * reader,void * buf,int64_t len)187 avro_read_memory(struct _avro_reader_memory_t *reader, void *buf, int64_t len)
188 {
189 	if (len > 0) {
190 		if ((reader->len - reader->read) < len) {
191 			avro_prefix_error("Cannot read %" PRIsz " bytes from memory buffer",
192 					  (size_t) len);
193 			return ENOSPC;
194 		}
195 		memcpy(buf, reader->buf + reader->read, len);
196 		reader->read += len;
197 	}
198 	return 0;
199 }
200 
201 #define bytes_available(reader) (reader->end - reader->cur)
202 #define buffer_reset(reader) {reader->cur = reader->end = reader->buffer;}
203 
204 static int
avro_read_file(struct _avro_reader_file_t * reader,void * buf,int64_t len)205 avro_read_file(struct _avro_reader_file_t *reader, void *buf, int64_t len)
206 {
207 	int64_t needed = len;
208 	char *p = (char *) buf;
209 	int rval;
210 
211 	if (len == 0) {
212 		return 0;
213 	}
214 
215 	if (needed > (int64_t) sizeof(reader->buffer)) {
216 		if (bytes_available(reader) > 0) {
217 			memcpy(p, reader->cur, bytes_available(reader));
218 			p += bytes_available(reader);
219 			needed -= bytes_available(reader);
220 			buffer_reset(reader);
221 		}
222 		rval = fread(p, 1, needed, reader->fp);
223 		if (rval != needed) {
224 			avro_set_error("Cannot read %" PRIsz " bytes from file",
225 				       (size_t) needed);
226 			return EILSEQ;
227 		}
228 		return 0;
229 	} else if (needed <= bytes_available(reader)) {
230 		memcpy(p, reader->cur, needed);
231 		reader->cur += needed;
232 		return 0;
233 	} else {
234 		memcpy(p, reader->cur, bytes_available(reader));
235 		p += bytes_available(reader);
236 		needed -= bytes_available(reader);
237 
238 		rval =
239 		    fread(reader->buffer, 1, sizeof(reader->buffer),
240 			  reader->fp);
241 		if (rval == 0) {
242 			avro_set_error("Cannot read %" PRIsz " bytes from file",
243 				       (size_t) needed);
244 			return EILSEQ;
245 		}
246 		reader->cur = reader->buffer;
247 		reader->end = reader->cur + rval;
248 
249 		if (bytes_available(reader) < needed) {
250 			avro_set_error("Cannot read %" PRIsz " bytes from file",
251 				       (size_t) needed);
252 			return EILSEQ;
253 		}
254 		memcpy(p, reader->cur, needed);
255 		reader->cur += needed;
256 		return 0;
257 	}
258 	avro_set_error("Cannot read %" PRIsz " bytes from file",
259 		       (size_t) needed);
260 	return EILSEQ;
261 }
262 
avro_read(avro_reader_t reader,void * buf,int64_t len)263 int avro_read(avro_reader_t reader, void *buf, int64_t len)
264 {
265 	if (buf && len >= 0) {
266 		if (is_memory_io(reader)) {
267 			return avro_read_memory(avro_reader_to_memory(reader),
268 						buf, len);
269 		} else if (is_file_io(reader)) {
270 			return avro_read_file(avro_reader_to_file(reader), buf,
271 					      len);
272 		}
273 	}
274 	return EINVAL;
275 }
276 
avro_skip_memory(struct _avro_reader_memory_t * reader,int64_t len)277 static int avro_skip_memory(struct _avro_reader_memory_t *reader, int64_t len)
278 {
279 	if (len > 0) {
280 		if ((reader->len - reader->read) < len) {
281 			avro_set_error("Cannot skip %" PRIsz " bytes in memory buffer",
282 				       (size_t) len);
283 			return ENOSPC;
284 		}
285 		reader->read += len;
286 	}
287 	return 0;
288 }
289 
avro_skip_file(struct _avro_reader_file_t * reader,int64_t len)290 static int avro_skip_file(struct _avro_reader_file_t *reader, int64_t len)
291 {
292 	int rval;
293 	int64_t needed = len;
294 
295 	if (len == 0) {
296 		return 0;
297 	}
298 	if (needed <= bytes_available(reader)) {
299 		reader->cur += needed;
300 	} else {
301 		needed -= bytes_available(reader);
302 		buffer_reset(reader);
303 		rval = fseek(reader->fp, needed, SEEK_CUR);
304 		if (rval < 0) {
305 			avro_set_error("Cannot skip %" PRIsz " bytes in file",
306 				       (size_t) len);
307 			return rval;
308 		}
309 	}
310 	return 0;
311 }
312 
avro_skip(avro_reader_t reader,int64_t len)313 int avro_skip(avro_reader_t reader, int64_t len)
314 {
315 	if (len >= 0) {
316 		if (is_memory_io(reader)) {
317 			return avro_skip_memory(avro_reader_to_memory(reader),
318 						len);
319 		} else if (is_file_io(reader)) {
320 			return avro_skip_file(avro_reader_to_file(reader), len);
321 		}
322 	}
323 	return 0;
324 }
325 
326 static int
avro_write_memory(struct _avro_writer_memory_t * writer,void * buf,int64_t len)327 avro_write_memory(struct _avro_writer_memory_t *writer, void *buf, int64_t len)
328 {
329 	if (len) {
330 		if ((writer->len - writer->written) < len) {
331 			avro_set_error("Cannot write %" PRIsz " bytes in memory buffer",
332 				       (size_t) len);
333 			return ENOSPC;
334 		}
335 		memcpy((void *)(writer->buf + writer->written), buf, len);
336 		writer->written += len;
337 	}
338 	return 0;
339 }
340 
341 static int
avro_write_file(struct _avro_writer_file_t * writer,void * buf,int64_t len)342 avro_write_file(struct _avro_writer_file_t *writer, void *buf, int64_t len)
343 {
344 	int rval;
345 	if (len > 0) {
346 		rval = fwrite(buf, len, 1, writer->fp);
347 		if (rval == 0) {
348 			return EIO;
349 		}
350 	}
351 	return 0;
352 }
353 
avro_write(avro_writer_t writer,void * buf,int64_t len)354 int avro_write(avro_writer_t writer, void *buf, int64_t len)
355 {
356 	if (buf && len >= 0) {
357 		if (is_memory_io(writer)) {
358 			return avro_write_memory(avro_writer_to_memory(writer),
359 						 buf, len);
360 		} else if (is_file_io(writer)) {
361 			return avro_write_file(avro_writer_to_file(writer), buf,
362 					       len);
363 		}
364 	}
365 	return EINVAL;
366 }
367 
368 void
avro_reader_reset(avro_reader_t reader)369 avro_reader_reset(avro_reader_t reader)
370 {
371 	if (is_memory_io(reader)) {
372 		avro_reader_to_memory(reader)->read = 0;
373 	}
374 }
375 
avro_writer_reset(avro_writer_t writer)376 void avro_writer_reset(avro_writer_t writer)
377 {
378 	if (is_memory_io(writer)) {
379 		avro_writer_to_memory(writer)->written = 0;
380 	}
381 }
382 
avro_writer_tell(avro_writer_t writer)383 int64_t avro_writer_tell(avro_writer_t writer)
384 {
385 	if (is_memory_io(writer)) {
386 		return avro_writer_to_memory(writer)->written;
387 	}
388 	return EINVAL;
389 }
390 
avro_writer_flush(avro_writer_t writer)391 void avro_writer_flush(avro_writer_t writer)
392 {
393 	if (is_file_io(writer)) {
394 		fflush(avro_writer_to_file(writer)->fp);
395 	}
396 }
397 
avro_writer_dump(avro_writer_t writer,FILE * fp)398 void avro_writer_dump(avro_writer_t writer, FILE * fp)
399 {
400 	if (is_memory_io(writer)) {
401 		dump(fp, (char *)avro_writer_to_memory(writer)->buf,
402 		     avro_writer_to_memory(writer)->written);
403 	}
404 }
405 
avro_reader_dump(avro_reader_t reader,FILE * fp)406 void avro_reader_dump(avro_reader_t reader, FILE * fp)
407 {
408 	if (is_memory_io(reader)) {
409 		dump(fp, (char *)avro_reader_to_memory(reader)->buf,
410 		     avro_reader_to_memory(reader)->read);
411 	}
412 }
413 
avro_reader_free(avro_reader_t reader)414 void avro_reader_free(avro_reader_t reader)
415 {
416 	if (is_memory_io(reader)) {
417 		avro_freet(struct _avro_reader_memory_t, reader);
418 	} else if (is_file_io(reader)) {
419 		if (avro_reader_to_file(reader)->should_close) {
420 			fclose(avro_reader_to_file(reader)->fp);
421 		}
422 		avro_freet(struct _avro_reader_file_t, reader);
423 	}
424 }
425 
avro_writer_free(avro_writer_t writer)426 void avro_writer_free(avro_writer_t writer)
427 {
428 	if (is_memory_io(writer)) {
429 		avro_freet(struct _avro_writer_memory_t, writer);
430 	} else if (is_file_io(writer)) {
431 		if (avro_writer_to_file(writer)->should_close) {
432 			fclose(avro_writer_to_file(writer)->fp);
433 		}
434 		avro_freet(struct _avro_writer_file_t, writer);
435 	}
436 }
437 
avro_reader_is_eof(avro_reader_t reader)438 int avro_reader_is_eof(avro_reader_t reader)
439 {
440 	if (is_file_io(reader)) {
441 		struct _avro_reader_file_t *file = avro_reader_to_file(reader);
442 		if (feof(file->fp)) {
443 			return file->cur == file->end;
444 		}
445 	}
446 	return 0;
447 }
448