1 /*
2  * This file is part of libtrace
3  *
4  * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5  * New Zealand.
6  *
7  * Authors: Daniel Lawson
8  *          Perry Lorier
9  *          Shane Alcock
10  *
11  * All rights reserved.
12  *
13  * This code has been developed by the University of Waikato WAND
14  * research group. For further information please see http://www.wand.net.nz/
15  *
16  * libtrace is free software; you can redistribute it and/or modify
17  * it under the terms of the GNU General Public License as published by
18  * the Free Software Foundation; either version 2 of the License, or
19  * (at your option) any later version.
20  *
21  * libtrace is distributed in the hope that it will be useful,
22  * but WITHOUT ANY WARRANTY; without even the implied warranty of
23  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
24  * GNU General Public License for more details.
25  *
26  * You should have received a copy of the GNU General Public License
27  * along with libtrace; if not, write to the Free Software
28  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
29  *
30  * $Id: wandio.c 1831 2013-05-14 22:57:59Z salcock $
31  *
32  */
33 
34 
35 #include "config.h"
36 #include "wandio.h"
37 #include <stdlib.h>
38 #include <assert.h>
39 #include <errno.h>
40 #include <inttypes.h>
41 #include <string.h>
42 
43 /* This file contains the implementation of the libtrace IO API, which format
44  * modules should use to open, read from, write to, seek and close trace files.
45  */
46 
47 struct compression_type compression_type[]  = {
48 	{ "GZ",		"gz", 	WANDIO_COMPRESS_ZLIB 	},
49 	{ "BZ2",	"bz2", 	WANDIO_COMPRESS_BZ2	},
50 	{ "LZO",	"lzo",  WANDIO_COMPRESS_LZO	},
51 	{ "NONE",	"",	WANDIO_COMPRESS_NONE	}
52 };
53 
54 int keep_stats = 0;
55 int force_directio_write = 0;
56 int force_directio_read = 0;
57 int use_autodetect = 1;
58 unsigned int use_threads = -1;
59 unsigned int max_buffers = 50;
60 
61 uint64_t read_waits = 0;
62 uint64_t write_waits = 0;
63 
64 /** Parse an option.
65  * stats -- Show summary stats
66  * directwrite -- bypass the diskcache on write
67  * directread -- bypass the diskcache on read
68  * noautodetect -- disable autodetection of file compression, assume all files
69  *		   are uncompressed
70  * nothreads -- Don't use threads
71  * threads=n -- Use a maximum of 'n' threads for thread farms
72  */
do_option(const char * option)73 static void do_option(const char *option)
74 {
75 	if (*option == '\0')
76 		;
77 	else if (strcmp(option,"stats") == 0)
78 		keep_stats = 1;
79 	/*
80 	else if (strcmp(option,"directwrite") == 0)
81 		force_directio_write = 1;
82 	else if (strcmp(option,"directread") == 0)
83 		force_directio_read  = 1;
84 	*/
85 	else if (strcmp(option,"nothreads") == 0)
86 		use_threads = 0;
87 	else if (strcmp(option,"noautodetect") == 0)
88 		use_autodetect = 0;
89 	else if (strncmp(option,"threads=",8) == 0)
90 		use_threads = atoi(option+8);
91 	else if (strncmp(option,"buffers=",8) == 0)
92 		max_buffers = atoi(option+8);
93 	else {
94 		fprintf(stderr,"Unknown libtraceio debug option '%s'\n", option);
95 	}
96 }
97 
parse_env(void)98 static void parse_env(void)
99 {
100 	const char *str = getenv("LIBTRACEIO");
101 	char option[1024];
102 	const char *ip;
103 	char *op;
104 
105 	if (!str)
106 		return;
107 
108 	for(ip=str, op=option; *ip!='\0' && op < option+sizeof(option); ++ip) {
109 		if (*ip == ',') {
110 			*op='\0';
111 			do_option(option);
112 			op=option;
113 		}
114 		else
115 			*(op++) = *ip;
116 	}
117 	*op='\0';
118 	do_option(option);
119 }
120 
121 
122 #define READ_TRACE 0
123 #define WRITE_TRACE 0
124 #define PIPELINE_TRACE 0
125 
126 #if PIPELINE_TRACE
127 #define DEBUG_PIPELINE(x) fprintf(stderr,"PIPELINE: %s\n",x)
128 #else
129 #define DEBUG_PIPELINE(x)
130 #endif
131 
create_io_reader(const char * filename,int autodetect)132 static io_t *create_io_reader(const char *filename, int autodetect)
133 {
134 
135 	/* Use a peeking reader to look at the start of the trace file and
136 	 * determine what type of compression may have been used to write
137 	 * the file */
138 
139 	DEBUG_PIPELINE("stdio");
140 	DEBUG_PIPELINE("peek");
141 	io_t *io = peek_open(stdio_open(filename));
142 	char buffer[1024];
143 	int len;
144 	if (!io)
145 		return NULL;
146 	len = wandio_peek(io, buffer, sizeof(buffer));
147 	/* Auto detect gzip compressed data -- if autodetect is false,
148 	 * instead we just assume uncompressed.
149 	 */
150 
151 	if (autodetect) {
152 		if (len>=3 && buffer[0] == '\037' && buffer[1] == '\213' &&
153 				buffer[2] == 0x08) {
154 #if HAVE_LIBZ
155 			DEBUG_PIPELINE("zlib");
156 			io = zlib_open(io);
157 #else
158 			fprintf(stderr, "File %s is gzip compressed but libtrace has not been built with zlib support!\n", filename);
159 			return NULL;
160 #endif
161 		}
162 		/* Auto detect compress(1) compressed data (gzip can read this) */
163 		if (len>=2 && buffer[0] == '\037' && buffer[1] == '\235') {
164 #if HAVE_LIBZ
165 			DEBUG_PIPELINE("zlib");
166 			io = zlib_open(io);
167 #else
168 			fprintf(stderr, "File %s is compress(1) compressed but libtrace has not been built with zlib support!\n", filename);
169 			return NULL;
170 #endif
171 		}
172 
173 		/* Auto detect bzip compressed data */
174 		if (len>=3 && buffer[0] == 'B' && buffer[1] == 'Z' && buffer[2] == 'h') {
175 #if HAVE_LIBBZ2
176 			DEBUG_PIPELINE("bzip");
177 			io = bz_open(io);
178 #else
179 			fprintf(stderr, "File %s is bzip compressed but libtrace has not been built with bzip2 support!\n", filename);
180 			return NULL;
181 #endif
182 		}
183 	}
184 	/* Now open a threaded, peekable reader using the appropriate module
185 	 * to read the data */
186 
187 	if (use_threads) {
188 		DEBUG_PIPELINE("thread");
189 		io = thread_open(io);
190 	}
191 
192 	DEBUG_PIPELINE("peek");
193 	return peek_open(io);
194 }
195 
wandio_create(const char * filename)196 DLLEXPORT io_t *wandio_create(const char *filename) {
197 	parse_env();
198 	return create_io_reader(filename, use_autodetect);
199 }
200 
wandio_create_uncompressed(const char * filename)201 DLLEXPORT io_t *wandio_create_uncompressed(const char *filename) {
202 	parse_env();
203 	return create_io_reader(filename, 0);
204 }
205 
206 
wandio_tell(io_t * io)207 DLLEXPORT off_t wandio_tell(io_t *io)
208 {
209 	if (!io->source->tell) {
210 		errno = -ENOSYS;
211 		return -1;
212 	}
213 	return io->source->tell(io);
214 }
215 
wandio_seek(io_t * io,off_t offset,int whence)216 DLLEXPORT off_t wandio_seek(io_t *io, off_t offset, int whence)
217 {
218 	if (!io->source->seek) {
219 		errno = -ENOSYS;
220 		return -1;
221 	}
222 	return io->source->seek(io,offset,whence);
223 }
224 
wandio_read(io_t * io,void * buffer,off_t len)225 DLLEXPORT off_t wandio_read(io_t *io, void *buffer, off_t len)
226 {
227 	off_t ret;
228 	ret=io->source->read(io,buffer,len);
229 #if READ_TRACE
230 	fprintf(stderr,"%p: read(%s): %d bytes = %d\n",io,io->source->name, (int)len,(int)ret);
231 #endif
232 	return ret;
233 }
234 
wandio_peek(io_t * io,void * buffer,off_t len)235 DLLEXPORT off_t wandio_peek(io_t *io, void *buffer, off_t len)
236 {
237 	off_t ret;
238 	assert(io->source->peek); /* If this fails, it means you're calling
239 				   * peek on something that doesn't support
240 				   * peeking.   Push a peek_open() on the io
241 				   * first.
242 				   */
243 	ret=io->source->peek(io, buffer, len);
244 #if READ_TRACE
245 	fprintf(stderr,"%p: peek(%s): %d bytes = %d\n",io,io->source->name, (int)len, (int)ret);
246 #endif
247 	return ret;
248 }
249 
wandio_destroy(io_t * io)250 DLLEXPORT void wandio_destroy(io_t *io)
251 {
252 	if (!io)
253 		return;
254 
255 	if (keep_stats)
256 		fprintf(stderr,"LIBTRACEIO STATS: %"PRIu64" blocks on read\n", read_waits);
257 	io->source->close(io);
258 }
259 
wandio_wcreate(const char * filename,int compress_type,int compression_level,int flags)260 DLLEXPORT iow_t *wandio_wcreate(const char *filename, int compress_type, int compression_level, int flags)
261 {
262 	iow_t *iow;
263 	parse_env();
264 
265 	assert ( compression_level >= 0 && compression_level <= 9 );
266 	assert (compress_type != WANDIO_COMPRESS_MASK);
267 
268 	iow=stdio_wopen(filename, flags);
269 	if (!iow)
270 		return NULL;
271 
272 	/* We prefer zlib if available, otherwise we'll use bzip. If neither
273 	 * are present, guess we'll just have to write uncompressed */
274 #if HAVE_LIBZ
275 	if (compression_level != 0 &&
276 	    compress_type == WANDIO_COMPRESS_ZLIB) {
277 		iow = zlib_wopen(iow,compression_level);
278 	}
279 #endif
280 #if HAVE_LIBLZO2
281 	else if (compression_level != 0 &&
282 	    compress_type == WANDIO_COMPRESS_LZO) {
283 		iow = lzo_wopen(iow,compression_level);
284 	}
285 #endif
286 #if HAVE_LIBBZ2
287 	else if (compression_level != 0 &&
288 	    compress_type == WANDIO_COMPRESS_BZ2) {
289 		iow = bz_wopen(iow,compression_level);
290 	}
291 #endif
292 	/* Open a threaded writer */
293 	if (use_threads)
294 		return thread_wopen(iow);
295 	else
296 		return iow;
297 }
298 
wandio_wwrite(iow_t * iow,const void * buffer,off_t len)299 DLLEXPORT off_t wandio_wwrite(iow_t *iow, const void *buffer, off_t len)
300 {
301 #if WRITE_TRACE
302 	fprintf(stderr,"wwrite(%s): %d bytes\n",iow->source->name, (int)len);
303 #endif
304 	return iow->source->write(iow,buffer,len);
305 }
306 
wandio_wdestroy(iow_t * iow)307 DLLEXPORT void wandio_wdestroy(iow_t *iow)
308 {
309 	iow->source->close(iow);
310 	if (keep_stats)
311 		fprintf(stderr,"LIBTRACEIO STATS: %"PRIu64" blocks on write\n", write_waits);
312 }
313 
314