1 /******************************************************
2 Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3 
4 The xbstream utility: serialize/deserialize files in the XBSTREAM format.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
18 
19 *******************************************************/
20 
21 #include <my_global.h>
22 #include <my_base.h>
23 #include <my_getopt.h>
24 #include <hash.h>
25 #include <my_pthread.h>
26 #include "common.h"
27 #include "xbstream.h"
28 #include "datasink.h"
29 #include "crc_glue.h"
30 
31 #define XBSTREAM_VERSION "1.0"
32 #define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL)
33 
34 #define START_FILE_HASH_SIZE 16
35 
36 typedef enum {
37 	RUN_MODE_NONE,
38 	RUN_MODE_CREATE,
39 	RUN_MODE_EXTRACT
40 } run_mode_t;
41 
42 /* Need the following definitions to avoid linking with ds_*.o and their link
43 dependencies */
44 datasink_t datasink_archive;
45 datasink_t datasink_xbstream;
46 datasink_t datasink_compress;
47 datasink_t datasink_tmpfile;
48 
49 static run_mode_t	opt_mode;
50 static char *		opt_directory = NULL;
51 static my_bool		opt_verbose = 0;
52 static int		opt_parallel = 1;
53 
54 static struct my_option my_long_options[] =
55 {
56 	{"help", '?', "Display this help and exit.",
57 	 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
58 	{"create", 'c', "Stream the specified files to the standard output.",
59 	 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
60 	{"extract", 'x', "Extract to disk files from the stream on the "
61 	 "standard input.",
62 	 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
63 	{"directory", 'C', "Change the current directory to the specified one "
64 	 "before streaming or extracting.", &opt_directory, &opt_directory, 0,
65 	 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
66 	{"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
67 	 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
68 	{"parallel", 'p', "Number of worker threads for reading / writing.",
69 	 &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG,
70 	 1, 1, INT_MAX, 0, 0, 0},
71 
72 	{0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
73 };
74 
75 typedef struct {
76 	HASH			*filehash;
77 	xb_rstream_t		*stream;
78 	ds_ctxt_t		*ds_ctxt;
79 	pthread_mutex_t		*mutex;
80 } extract_ctxt_t;
81 
82 typedef struct {
83 	char 		*path;
84 	uint		pathlen;
85 	my_off_t	offset;
86 	ds_file_t	*file;
87 	pthread_mutex_t	mutex;
88 } file_entry_t;
89 
90 static int get_options(int *argc, char ***argv);
91 static int mode_create(int argc, char **argv);
92 static int mode_extract(int n_threads, int argc, char **argv);
93 static my_bool get_one_option(int optid, const struct my_option *opt,
94 			      char *argument);
95 
96 int
97 main(int argc, char **argv)
98 {
99 	MY_INIT(argv[0]);
100 
101 	crc_init();
102 
103 	if (get_options(&argc, &argv)) {
104 		goto err;
105 	}
106 
107 	if (opt_mode == RUN_MODE_NONE) {
108 		msg("%s: either -c or -x must be specified.", my_progname);
109 		goto err;
110 	}
111 
112 	/* Change the current directory if -C is specified */
113 	if (opt_directory && my_setwd(opt_directory, MYF(MY_WME))) {
114 		goto err;
115 	}
116 
117 	if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) {
118 		goto err;
119 	} else if (opt_mode == RUN_MODE_EXTRACT &&
120 		   mode_extract(opt_parallel, argc, argv)) {
121 		goto err;
122 	}
123 
124 	my_cleanup_options(my_long_options);
125 
126 	my_end(0);
127 
128 	return EXIT_SUCCESS;
129 err:
130 	my_cleanup_options(my_long_options);
131 
132 	my_end(0);
133 
134 	exit(EXIT_FAILURE);
135 }
136 
137 static
138 int
139 get_options(int *argc, char ***argv)
140 {
141 	int ho_error;
142 
143 	if ((ho_error= handle_options(argc, argv, my_long_options,
144 				      get_one_option))) {
145 		exit(EXIT_FAILURE);
146 	}
147 
148 	return 0;
149 }
150 
151 static
152 void
153 print_version(void)
154 {
155 	printf("%s  Ver %s for %s (%s)\n", my_progname, XBSTREAM_VERSION,
156 	       SYSTEM_TYPE, MACHINE_TYPE);
157 }
158 
159 static
160 void
161 usage(void)
162 {
163 	print_version();
164 	puts("Copyright (C) 2011-2013 Percona LLC and/or its affiliates.");
165 	puts("This software comes with ABSOLUTELY NO WARRANTY. "
166 	     "This is free software,\nand you are welcome to modify and "
167 	     "redistribute it under the GPL license.\n");
168 
169 	puts("Serialize/deserialize files in the XBSTREAM format.\n");
170 
171 	puts("Usage: ");
172 	printf("  %s -c [OPTIONS...] FILES...	# stream specified files to "
173 	       "standard output.\n", my_progname);
174 	printf("  %s -x [OPTIONS...]		# extract files from the stream"
175 	       "on the standard input.\n", my_progname);
176 
177 	puts("\nOptions:");
178 	my_print_help(my_long_options);
179 }
180 
181 static
182 int
183 set_run_mode(run_mode_t mode)
184 {
185 	if (opt_mode != RUN_MODE_NONE) {
186 		msg("%s: can't set specify both -c and -x.", my_progname);
187 		return 1;
188 	}
189 
190 	opt_mode = mode;
191 
192 	return 0;
193 }
194 
195 static
196 my_bool
197 get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
198 	       char *argument __attribute__((unused)))
199 {
200 	switch (optid) {
201 	case 'c':
202 		if (set_run_mode(RUN_MODE_CREATE)) {
203 			return TRUE;
204 		}
205 		break;
206 	case 'x':
207 		if (set_run_mode(RUN_MODE_EXTRACT)) {
208 			return TRUE;
209 		}
210 		break;
211 	case '?':
212 		usage();
213 		exit(0);
214 	}
215 
216 	return FALSE;
217 }
218 
219 static
220 int
221 stream_one_file(File file, xb_wstream_file_t *xbfile)
222 {
223 	uchar	*buf;
224 	ssize_t	bytes;
225 	my_off_t	offset;
226 
227 	posix_fadvise(file, 0, 0, POSIX_FADV_SEQUENTIAL);
228 	offset = my_tell(file, MYF(MY_WME));
229 
230 	buf = (uchar*)(my_malloc(XBSTREAM_BUFFER_SIZE, MYF(MY_FAE)));
231 
232 	while ((bytes = (ssize_t)my_read(file, buf, XBSTREAM_BUFFER_SIZE,
233 				MYF(MY_WME))) > 0) {
234 		if (xb_stream_write_data(xbfile, buf, bytes)) {
235 			msg("%s: xb_stream_write_data() failed.",
236 			    my_progname);
237 			my_free(buf);
238 			return 1;
239 		}
240 		posix_fadvise(file, offset, XBSTREAM_BUFFER_SIZE,
241 			      POSIX_FADV_DONTNEED);
242 		offset += XBSTREAM_BUFFER_SIZE;
243 
244 	}
245 
246 	my_free(buf);
247 
248 	if (bytes < 0) {
249 		return 1;
250 	}
251 
252 	return 0;
253 }
254 
255 static
256 int
257 mode_create(int argc, char **argv)
258 {
259 	int		i;
260 	MY_STAT		mystat;
261 	xb_wstream_t	*stream;
262 
263 	if (argc < 1) {
264 		msg("%s: no files are specified.", my_progname);
265 		return 1;
266 	}
267 
268 	stream = xb_stream_write_new();
269 	if (stream == NULL) {
270 		msg("%s: xb_stream_write_new() failed.", my_progname);
271 		return 1;
272 	}
273 
274 	for (i = 0; i < argc; i++) {
275 		char			*filepath = argv[i];
276 		File			src_file;
277 		xb_wstream_file_t	*file;
278 
279 		if (my_stat(filepath, &mystat, MYF(MY_WME)) == NULL) {
280 			goto err;
281 		}
282 		if (!MY_S_ISREG(mystat.st_mode)) {
283 			msg("%s: %s is not a regular file, exiting.",
284 			    my_progname, filepath);
285 			goto err;
286 		}
287 
288 		if ((src_file = my_open(filepath, O_RDONLY, MYF(MY_WME))) < 0) {
289 			msg("%s: failed to open %s.", my_progname, filepath);
290 			goto err;
291 		}
292 
293 		file = xb_stream_write_open(stream, filepath, &mystat, NULL, NULL);
294 		if (file == NULL) {
295 			goto err;
296 		}
297 
298 		if (opt_verbose) {
299 			msg("%s", filepath);
300 		}
301 
302 		if (stream_one_file(src_file, file) ||
303 		    xb_stream_write_close(file) ||
304 		    my_close(src_file, MYF(MY_WME))) {
305 			goto err;
306 		}
307 	}
308 
309 	xb_stream_write_done(stream);
310 
311 	return 0;
312 err:
313 	xb_stream_write_done(stream);
314 
315 	return 1;
316 }
317 
318 static
319 file_entry_t *
320 file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen)
321 {
322 	file_entry_t	*entry;
323 	ds_file_t	*file;
324 
325 	entry = (file_entry_t *) my_malloc(sizeof(file_entry_t),
326 					   MYF(MY_WME | MY_ZEROFILL));
327 	if (entry == NULL) {
328 		return NULL;
329 	}
330 
331 	entry->path = my_strndup(path, pathlen, MYF(MY_WME));
332 	if (entry->path == NULL) {
333 		goto err;
334 	}
335 	entry->pathlen = pathlen;
336 
337 	file = ds_open(ctxt->ds_ctxt, path, NULL);
338 
339 	if (file == NULL) {
340 		msg("%s: failed to create file.", my_progname);
341 		goto err;
342 	}
343 
344 	if (opt_verbose) {
345 		msg("%s", entry->path);
346 	}
347 
348 	entry->file = file;
349 
350 	pthread_mutex_init(&entry->mutex, NULL);
351 
352 	return entry;
353 
354 err:
355 	if (entry->path != NULL) {
356 		my_free(entry->path);
357 	}
358 	my_free(entry);
359 
360 	return NULL;
361 }
362 
363 static
364 uchar *
365 get_file_entry_key(file_entry_t *entry, size_t *length,
366 		   my_bool not_used __attribute__((unused)))
367 {
368 	*length = entry->pathlen;
369 	return (uchar *) entry->path;
370 }
371 
372 static
373 void
374 file_entry_free(file_entry_t *entry)
375 {
376 	pthread_mutex_destroy(&entry->mutex);
377 	ds_close(entry->file);
378 	my_free(entry->path);
379 	my_free(entry);
380 }
381 
382 static
383 void *
384 extract_worker_thread_func(void *arg)
385 {
386 	xb_rstream_chunk_t	chunk;
387 	file_entry_t		*entry;
388 	xb_rstream_result_t	res;
389 
390 	extract_ctxt_t *ctxt = (extract_ctxt_t *) arg;
391 
392 	my_thread_init();
393 
394 	memset(&chunk, 0, sizeof(chunk));
395 
396 	while (1) {
397 
398 		pthread_mutex_lock(ctxt->mutex);
399 		res = xb_stream_read_chunk(ctxt->stream, &chunk);
400 
401 		if (res != XB_STREAM_READ_CHUNK) {
402 			pthread_mutex_unlock(ctxt->mutex);
403 			break;
404 		}
405 
406 		/* If unknown type and ignorable flag is set, skip this chunk */
407 		if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \
408 		    !(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) {
409 			pthread_mutex_unlock(ctxt->mutex);
410 			continue;
411 		}
412 
413 		/* See if we already have this file open */
414 		entry = (file_entry_t *) my_hash_search(ctxt->filehash,
415 							(uchar *) chunk.path,
416 							chunk.pathlen);
417 
418 		if (entry == NULL) {
419 			entry = file_entry_new(ctxt,
420 					       chunk.path,
421 					       chunk.pathlen);
422 			if (entry == NULL) {
423 				pthread_mutex_unlock(ctxt->mutex);
424 				break;
425 			}
426 			if (my_hash_insert(ctxt->filehash, (uchar *) entry)) {
427 				msg("%s: my_hash_insert() failed.",
428 				    my_progname);
429 				pthread_mutex_unlock(ctxt->mutex);
430 				break;
431 			}
432 		}
433 
434 		pthread_mutex_lock(&entry->mutex);
435 
436 		pthread_mutex_unlock(ctxt->mutex);
437 
438 		res = xb_stream_validate_checksum(&chunk);
439 
440 		if (res != XB_STREAM_READ_CHUNK) {
441 			pthread_mutex_unlock(&entry->mutex);
442 			break;
443 		}
444 
445 		if (chunk.type == XB_CHUNK_TYPE_EOF) {
446 			pthread_mutex_unlock(&entry->mutex);
447 			pthread_mutex_lock(ctxt->mutex);
448 			my_hash_delete(ctxt->filehash, (uchar *) entry);
449 			pthread_mutex_unlock(ctxt->mutex);
450 
451 			continue;
452 		}
453 
454 		if (entry->offset != chunk.offset) {
455 			msg("%s: out-of-order chunk: real offset = 0x%llx, "
456 			    "expected offset = 0x%llx", my_progname,
457 			    chunk.offset, entry->offset);
458 			pthread_mutex_unlock(&entry->mutex);
459 			res = XB_STREAM_READ_ERROR;
460 			break;
461 		}
462 
463 		if (ds_write(entry->file, chunk.data, chunk.length)) {
464 			msg("%s: my_write() failed.", my_progname);
465 			pthread_mutex_unlock(&entry->mutex);
466 			res = XB_STREAM_READ_ERROR;
467 			break;
468 		}
469 
470 		entry->offset += chunk.length;
471 
472 		pthread_mutex_unlock(&entry->mutex);
473 	}
474 
475 	if (chunk.data)
476 		my_free(chunk.data);
477 
478 	my_thread_end();
479 
480 	return (void *)(res);
481 }
482 
483 
484 static
485 int
486 mode_extract(int n_threads, int argc __attribute__((unused)),
487 	     char **argv __attribute__((unused)))
488 {
489 	xb_rstream_t		*stream = NULL;
490 	HASH			filehash;
491 	ds_ctxt_t		*ds_ctxt = NULL;
492 	extract_ctxt_t		ctxt;
493 	int			i;
494 	pthread_t		*tids = NULL;
495 	void			**retvals = NULL;
496 	pthread_mutex_t		mutex;
497 	int			ret = 0;
498 
499 	if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,
500 			  0, 0, (my_hash_get_key) get_file_entry_key,
501 			  (my_hash_free_key) file_entry_free, MYF(0))) {
502 		msg("%s: failed to initialize file hash.", my_progname);
503 		return 1;
504 	}
505 
506 	if (pthread_mutex_init(&mutex, NULL)) {
507 		msg("%s: failed to initialize mutex.", my_progname);
508 		my_hash_free(&filehash);
509 		return 1;
510 	}
511 
512 	/* If --directory is specified, it is already set as CWD by now. */
513 	ds_ctxt = ds_create(".", DS_TYPE_LOCAL);
514 	if (ds_ctxt == NULL) {
515 		ret = 1;
516 		goto exit;
517 	}
518 
519 
520 	stream = xb_stream_read_new();
521 	if (stream == NULL) {
522 		msg("%s: xb_stream_read_new() failed.", my_progname);
523 		pthread_mutex_destroy(&mutex);
524 		ret = 1;
525 		goto exit;
526 	}
527 
528 	ctxt.stream = stream;
529 	ctxt.filehash = &filehash;
530 	ctxt.ds_ctxt = ds_ctxt;
531 	ctxt.mutex = &mutex;
532 
533 	tids = (pthread_t *)calloc(n_threads, sizeof(pthread_t));
534 	retvals = (void **)calloc(n_threads, sizeof(void*));
535 
536 	for (i = 0; i < n_threads; i++)
537 		pthread_create(tids + i, NULL, extract_worker_thread_func,
538 			       &ctxt);
539 
540 	for (i = 0; i < n_threads; i++)
541 		pthread_join(tids[i], retvals + i);
542 
543 	for (i = 0; i < n_threads; i++) {
544 		if ((size_t)retvals[i] == XB_STREAM_READ_ERROR) {
545 			ret = 1;
546 			goto exit;
547 		}
548 	}
549 
550 exit:
551 	pthread_mutex_destroy(&mutex);
552 
553 	free(tids);
554 	free(retvals);
555 
556 	my_hash_free(&filehash);
557 	if (ds_ctxt != NULL) {
558 		ds_destroy(ds_ctxt);
559 	}
560 	xb_stream_read_done(stream);
561 
562 	return ret;
563 }
564