1 /*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007,2008,2009,2010 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Authors: Daniel Lawson
8 * Perry Lorier
9 * Shane Alcock
10 *
11 * All rights reserved.
12 *
13 * This code has been developed by the University of Waikato WAND
14 * research group. For further information please see http://www.wand.net.nz/
15 *
16 * libtrace is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License as published by
18 * the Free Software Foundation; either version 2 of the License, or
19 * (at your option) any later version.
20 *
21 * libtrace is distributed in the hope that it will be useful,
22 * but WITHOUT ANY WARRANTY; without even the implied warranty of
23 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
24 * GNU General Public License for more details.
25 *
26 * You should have received a copy of the GNU General Public License
27 * along with libtrace; if not, write to the Free Software
28 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
29 *
30 * $Id: wandio.c 1831 2013-05-14 22:57:59Z salcock $
31 *
32 */
33
34
35 #include "config.h"
36 #include "wandio.h"
37 #include <stdlib.h>
38 #include <assert.h>
39 #include <errno.h>
40 #include <inttypes.h>
41 #include <string.h>
42
43 /* This file contains the implementation of the libtrace IO API, which format
44 * modules should use to open, read from, write to, seek and close trace files.
45 */
46
47 struct compression_type compression_type[] = {
48 { "GZ", "gz", WANDIO_COMPRESS_ZLIB },
49 { "BZ2", "bz2", WANDIO_COMPRESS_BZ2 },
50 { "LZO", "lzo", WANDIO_COMPRESS_LZO },
51 { "NONE", "", WANDIO_COMPRESS_NONE }
52 };
53
54 int keep_stats = 0;
55 int force_directio_write = 0;
56 int force_directio_read = 0;
57 int use_autodetect = 1;
58 unsigned int use_threads = -1;
59 unsigned int max_buffers = 50;
60
61 uint64_t read_waits = 0;
62 uint64_t write_waits = 0;
63
64 /** Parse an option.
65 * stats -- Show summary stats
66 * directwrite -- bypass the diskcache on write
67 * directread -- bypass the diskcache on read
68 * noautodetect -- disable autodetection of file compression, assume all files
69 * are uncompressed
70 * nothreads -- Don't use threads
71 * threads=n -- Use a maximum of 'n' threads for thread farms
72 */
do_option(const char * option)73 static void do_option(const char *option)
74 {
75 if (*option == '\0')
76 ;
77 else if (strcmp(option,"stats") == 0)
78 keep_stats = 1;
79 /*
80 else if (strcmp(option,"directwrite") == 0)
81 force_directio_write = 1;
82 else if (strcmp(option,"directread") == 0)
83 force_directio_read = 1;
84 */
85 else if (strcmp(option,"nothreads") == 0)
86 use_threads = 0;
87 else if (strcmp(option,"noautodetect") == 0)
88 use_autodetect = 0;
89 else if (strncmp(option,"threads=",8) == 0)
90 use_threads = atoi(option+8);
91 else if (strncmp(option,"buffers=",8) == 0)
92 max_buffers = atoi(option+8);
93 else {
94 fprintf(stderr,"Unknown libtraceio debug option '%s'\n", option);
95 }
96 }
97
parse_env(void)98 static void parse_env(void)
99 {
100 const char *str = getenv("LIBTRACEIO");
101 char option[1024];
102 const char *ip;
103 char *op;
104
105 if (!str)
106 return;
107
108 for(ip=str, op=option; *ip!='\0' && op < option+sizeof(option); ++ip) {
109 if (*ip == ',') {
110 *op='\0';
111 do_option(option);
112 op=option;
113 }
114 else
115 *(op++) = *ip;
116 }
117 *op='\0';
118 do_option(option);
119 }
120
121
122 #define READ_TRACE 0
123 #define WRITE_TRACE 0
124 #define PIPELINE_TRACE 0
125
126 #if PIPELINE_TRACE
127 #define DEBUG_PIPELINE(x) fprintf(stderr,"PIPELINE: %s\n",x)
128 #else
129 #define DEBUG_PIPELINE(x)
130 #endif
131
create_io_reader(const char * filename,int autodetect)132 static io_t *create_io_reader(const char *filename, int autodetect)
133 {
134
135 /* Use a peeking reader to look at the start of the trace file and
136 * determine what type of compression may have been used to write
137 * the file */
138
139 DEBUG_PIPELINE("stdio");
140 DEBUG_PIPELINE("peek");
141 io_t *io = peek_open(stdio_open(filename));
142 char buffer[1024];
143 int len;
144 if (!io)
145 return NULL;
146 len = wandio_peek(io, buffer, sizeof(buffer));
147 /* Auto detect gzip compressed data -- if autodetect is false,
148 * instead we just assume uncompressed.
149 */
150
151 if (autodetect) {
152 if (len>=3 && buffer[0] == '\037' && buffer[1] == '\213' &&
153 buffer[2] == 0x08) {
154 #if HAVE_LIBZ
155 DEBUG_PIPELINE("zlib");
156 io = zlib_open(io);
157 #else
158 fprintf(stderr, "File %s is gzip compressed but libtrace has not been built with zlib support!\n", filename);
159 return NULL;
160 #endif
161 }
162 /* Auto detect compress(1) compressed data (gzip can read this) */
163 if (len>=2 && buffer[0] == '\037' && buffer[1] == '\235') {
164 #if HAVE_LIBZ
165 DEBUG_PIPELINE("zlib");
166 io = zlib_open(io);
167 #else
168 fprintf(stderr, "File %s is compress(1) compressed but libtrace has not been built with zlib support!\n", filename);
169 return NULL;
170 #endif
171 }
172
173 /* Auto detect bzip compressed data */
174 if (len>=3 && buffer[0] == 'B' && buffer[1] == 'Z' && buffer[2] == 'h') {
175 #if HAVE_LIBBZ2
176 DEBUG_PIPELINE("bzip");
177 io = bz_open(io);
178 #else
179 fprintf(stderr, "File %s is bzip compressed but libtrace has not been built with bzip2 support!\n", filename);
180 return NULL;
181 #endif
182 }
183 }
184 /* Now open a threaded, peekable reader using the appropriate module
185 * to read the data */
186
187 if (use_threads) {
188 DEBUG_PIPELINE("thread");
189 io = thread_open(io);
190 }
191
192 DEBUG_PIPELINE("peek");
193 return peek_open(io);
194 }
195
wandio_create(const char * filename)196 DLLEXPORT io_t *wandio_create(const char *filename) {
197 parse_env();
198 return create_io_reader(filename, use_autodetect);
199 }
200
wandio_create_uncompressed(const char * filename)201 DLLEXPORT io_t *wandio_create_uncompressed(const char *filename) {
202 parse_env();
203 return create_io_reader(filename, 0);
204 }
205
206
wandio_tell(io_t * io)207 DLLEXPORT off_t wandio_tell(io_t *io)
208 {
209 if (!io->source->tell) {
210 errno = -ENOSYS;
211 return -1;
212 }
213 return io->source->tell(io);
214 }
215
wandio_seek(io_t * io,off_t offset,int whence)216 DLLEXPORT off_t wandio_seek(io_t *io, off_t offset, int whence)
217 {
218 if (!io->source->seek) {
219 errno = -ENOSYS;
220 return -1;
221 }
222 return io->source->seek(io,offset,whence);
223 }
224
wandio_read(io_t * io,void * buffer,off_t len)225 DLLEXPORT off_t wandio_read(io_t *io, void *buffer, off_t len)
226 {
227 off_t ret;
228 ret=io->source->read(io,buffer,len);
229 #if READ_TRACE
230 fprintf(stderr,"%p: read(%s): %d bytes = %d\n",io,io->source->name, (int)len,(int)ret);
231 #endif
232 return ret;
233 }
234
wandio_peek(io_t * io,void * buffer,off_t len)235 DLLEXPORT off_t wandio_peek(io_t *io, void *buffer, off_t len)
236 {
237 off_t ret;
238 assert(io->source->peek); /* If this fails, it means you're calling
239 * peek on something that doesn't support
240 * peeking. Push a peek_open() on the io
241 * first.
242 */
243 ret=io->source->peek(io, buffer, len);
244 #if READ_TRACE
245 fprintf(stderr,"%p: peek(%s): %d bytes = %d\n",io,io->source->name, (int)len, (int)ret);
246 #endif
247 return ret;
248 }
249
wandio_destroy(io_t * io)250 DLLEXPORT void wandio_destroy(io_t *io)
251 {
252 if (!io)
253 return;
254
255 if (keep_stats)
256 fprintf(stderr,"LIBTRACEIO STATS: %"PRIu64" blocks on read\n", read_waits);
257 io->source->close(io);
258 }
259
wandio_wcreate(const char * filename,int compress_type,int compression_level,int flags)260 DLLEXPORT iow_t *wandio_wcreate(const char *filename, int compress_type, int compression_level, int flags)
261 {
262 iow_t *iow;
263 parse_env();
264
265 assert ( compression_level >= 0 && compression_level <= 9 );
266 assert (compress_type != WANDIO_COMPRESS_MASK);
267
268 iow=stdio_wopen(filename, flags);
269 if (!iow)
270 return NULL;
271
272 /* We prefer zlib if available, otherwise we'll use bzip. If neither
273 * are present, guess we'll just have to write uncompressed */
274 #if HAVE_LIBZ
275 if (compression_level != 0 &&
276 compress_type == WANDIO_COMPRESS_ZLIB) {
277 iow = zlib_wopen(iow,compression_level);
278 }
279 #endif
280 #if HAVE_LIBLZO2
281 else if (compression_level != 0 &&
282 compress_type == WANDIO_COMPRESS_LZO) {
283 iow = lzo_wopen(iow,compression_level);
284 }
285 #endif
286 #if HAVE_LIBBZ2
287 else if (compression_level != 0 &&
288 compress_type == WANDIO_COMPRESS_BZ2) {
289 iow = bz_wopen(iow,compression_level);
290 }
291 #endif
292 /* Open a threaded writer */
293 if (use_threads)
294 return thread_wopen(iow);
295 else
296 return iow;
297 }
298
wandio_wwrite(iow_t * iow,const void * buffer,off_t len)299 DLLEXPORT off_t wandio_wwrite(iow_t *iow, const void *buffer, off_t len)
300 {
301 #if WRITE_TRACE
302 fprintf(stderr,"wwrite(%s): %d bytes\n",iow->source->name, (int)len);
303 #endif
304 return iow->source->write(iow,buffer,len);
305 }
306
wandio_wdestroy(iow_t * iow)307 DLLEXPORT void wandio_wdestroy(iow_t *iow)
308 {
309 iow->source->close(iow);
310 if (keep_stats)
311 fprintf(stderr,"LIBTRACEIO STATS: %"PRIu64" blocks on write\n", write_waits);
312 }
313
314