1 /******************************************************
2 Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3
4 The xbstream utility: serialize/deserialize files in the XBSTREAM format.
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *******************************************************/
20
21 #include <my_global.h>
22 #include <my_base.h>
23 #include <my_getopt.h>
24 #include <hash.h>
25 #include <my_pthread.h>
26 #include "common.h"
27 #include "xbstream.h"
28 #include "datasink.h"
29 #include "crc_glue.h"
30
31 #define XBSTREAM_VERSION "1.0"
32 #define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL)
33
34 #define START_FILE_HASH_SIZE 16
35
36 typedef enum {
37 RUN_MODE_NONE,
38 RUN_MODE_CREATE,
39 RUN_MODE_EXTRACT
40 } run_mode_t;
41
42 /* Need the following definitions to avoid linking with ds_*.o and their link
43 dependencies */
44 datasink_t datasink_archive;
45 datasink_t datasink_xbstream;
46 datasink_t datasink_compress;
47 datasink_t datasink_tmpfile;
48
49 static run_mode_t opt_mode;
50 static char * opt_directory = NULL;
51 static my_bool opt_verbose = 0;
52 static int opt_parallel = 1;
53
54 static struct my_option my_long_options[] =
55 {
56 {"help", '?', "Display this help and exit.",
57 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
58 {"create", 'c', "Stream the specified files to the standard output.",
59 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
60 {"extract", 'x', "Extract to disk files from the stream on the "
61 "standard input.",
62 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
63 {"directory", 'C', "Change the current directory to the specified one "
64 "before streaming or extracting.", &opt_directory, &opt_directory, 0,
65 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
66 {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose,
67 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
68 {"parallel", 'p', "Number of worker threads for reading / writing.",
69 &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG,
70 1, 1, INT_MAX, 0, 0, 0},
71
72 {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
73 };
74
75 typedef struct {
76 HASH *filehash;
77 xb_rstream_t *stream;
78 ds_ctxt_t *ds_ctxt;
79 pthread_mutex_t *mutex;
80 } extract_ctxt_t;
81
82 typedef struct {
83 char *path;
84 uint pathlen;
85 my_off_t offset;
86 ds_file_t *file;
87 pthread_mutex_t mutex;
88 } file_entry_t;
89
90 static int get_options(int *argc, char ***argv);
91 static int mode_create(int argc, char **argv);
92 static int mode_extract(int n_threads, int argc, char **argv);
93 static my_bool get_one_option(int optid, const struct my_option *opt,
94 char *argument);
95
96 int
main(int argc,char ** argv)97 main(int argc, char **argv)
98 {
99 MY_INIT(argv[0]);
100
101 crc_init();
102
103 if (get_options(&argc, &argv)) {
104 goto err;
105 }
106
107 if (opt_mode == RUN_MODE_NONE) {
108 msg("%s: either -c or -x must be specified.", my_progname);
109 goto err;
110 }
111
112 /* Change the current directory if -C is specified */
113 if (opt_directory && my_setwd(opt_directory, MYF(MY_WME))) {
114 goto err;
115 }
116
117 if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) {
118 goto err;
119 } else if (opt_mode == RUN_MODE_EXTRACT &&
120 mode_extract(opt_parallel, argc, argv)) {
121 goto err;
122 }
123
124 my_cleanup_options(my_long_options);
125
126 my_end(0);
127
128 return EXIT_SUCCESS;
129 err:
130 my_cleanup_options(my_long_options);
131
132 my_end(0);
133
134 exit(EXIT_FAILURE);
135 }
136
137 static
138 int
get_options(int * argc,char *** argv)139 get_options(int *argc, char ***argv)
140 {
141 int ho_error;
142
143 if ((ho_error= handle_options(argc, argv, my_long_options,
144 get_one_option))) {
145 exit(EXIT_FAILURE);
146 }
147
148 return 0;
149 }
150
151 static
152 void
print_version(void)153 print_version(void)
154 {
155 printf("%s Ver %s for %s (%s)\n", my_progname, XBSTREAM_VERSION,
156 SYSTEM_TYPE, MACHINE_TYPE);
157 }
158
159 static
160 void
usage(void)161 usage(void)
162 {
163 print_version();
164 puts("Copyright (C) 2011-2013 Percona LLC and/or its affiliates.");
165 puts("This software comes with ABSOLUTELY NO WARRANTY. "
166 "This is free software,\nand you are welcome to modify and "
167 "redistribute it under the GPL license.\n");
168
169 puts("Serialize/deserialize files in the XBSTREAM format.\n");
170
171 puts("Usage: ");
172 printf(" %s -c [OPTIONS...] FILES... # stream specified files to "
173 "standard output.\n", my_progname);
174 printf(" %s -x [OPTIONS...] # extract files from the stream"
175 "on the standard input.\n", my_progname);
176
177 puts("\nOptions:");
178 my_print_help(my_long_options);
179 }
180
181 static
182 int
set_run_mode(run_mode_t mode)183 set_run_mode(run_mode_t mode)
184 {
185 if (opt_mode != RUN_MODE_NONE) {
186 msg("%s: can't set specify both -c and -x.", my_progname);
187 return 1;
188 }
189
190 opt_mode = mode;
191
192 return 0;
193 }
194
195 static
196 my_bool
get_one_option(int optid,const struct my_option * opt,char * argument)197 get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
198 char *argument __attribute__((unused)))
199 {
200 switch (optid) {
201 case 'c':
202 if (set_run_mode(RUN_MODE_CREATE)) {
203 return TRUE;
204 }
205 break;
206 case 'x':
207 if (set_run_mode(RUN_MODE_EXTRACT)) {
208 return TRUE;
209 }
210 break;
211 case '?':
212 usage();
213 exit(0);
214 }
215
216 return FALSE;
217 }
218
219 static
220 int
stream_one_file(File file,xb_wstream_file_t * xbfile)221 stream_one_file(File file, xb_wstream_file_t *xbfile)
222 {
223 uchar *buf;
224 ssize_t bytes;
225 my_off_t offset;
226
227 posix_fadvise(file, 0, 0, POSIX_FADV_SEQUENTIAL);
228 offset = my_tell(file, MYF(MY_WME));
229
230 buf = (uchar*)(my_malloc(XBSTREAM_BUFFER_SIZE, MYF(MY_FAE)));
231
232 while ((bytes = (ssize_t)my_read(file, buf, XBSTREAM_BUFFER_SIZE,
233 MYF(MY_WME))) > 0) {
234 if (xb_stream_write_data(xbfile, buf, bytes)) {
235 msg("%s: xb_stream_write_data() failed.",
236 my_progname);
237 my_free(buf);
238 return 1;
239 }
240 posix_fadvise(file, offset, XBSTREAM_BUFFER_SIZE,
241 POSIX_FADV_DONTNEED);
242 offset += XBSTREAM_BUFFER_SIZE;
243
244 }
245
246 my_free(buf);
247
248 if (bytes < 0) {
249 return 1;
250 }
251
252 return 0;
253 }
254
255 static
256 int
mode_create(int argc,char ** argv)257 mode_create(int argc, char **argv)
258 {
259 int i;
260 MY_STAT mystat;
261 xb_wstream_t *stream;
262
263 if (argc < 1) {
264 msg("%s: no files are specified.", my_progname);
265 return 1;
266 }
267
268 stream = xb_stream_write_new();
269 if (stream == NULL) {
270 msg("%s: xb_stream_write_new() failed.", my_progname);
271 return 1;
272 }
273
274 for (i = 0; i < argc; i++) {
275 char *filepath = argv[i];
276 File src_file;
277 xb_wstream_file_t *file;
278
279 if (my_stat(filepath, &mystat, MYF(MY_WME)) == NULL) {
280 goto err;
281 }
282 if (!MY_S_ISREG(mystat.st_mode)) {
283 msg("%s: %s is not a regular file, exiting.",
284 my_progname, filepath);
285 goto err;
286 }
287
288 if ((src_file = my_open(filepath, O_RDONLY, MYF(MY_WME))) < 0) {
289 msg("%s: failed to open %s.", my_progname, filepath);
290 goto err;
291 }
292
293 file = xb_stream_write_open(stream, filepath, &mystat, NULL, NULL);
294 if (file == NULL) {
295 goto err;
296 }
297
298 if (opt_verbose) {
299 msg("%s", filepath);
300 }
301
302 if (stream_one_file(src_file, file) ||
303 xb_stream_write_close(file) ||
304 my_close(src_file, MYF(MY_WME))) {
305 goto err;
306 }
307 }
308
309 xb_stream_write_done(stream);
310
311 return 0;
312 err:
313 xb_stream_write_done(stream);
314
315 return 1;
316 }
317
318 static
319 file_entry_t *
file_entry_new(extract_ctxt_t * ctxt,const char * path,uint pathlen)320 file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen)
321 {
322 file_entry_t *entry;
323 ds_file_t *file;
324
325 entry = (file_entry_t *) my_malloc(sizeof(file_entry_t),
326 MYF(MY_WME | MY_ZEROFILL));
327 if (entry == NULL) {
328 return NULL;
329 }
330
331 entry->path = my_strndup(path, pathlen, MYF(MY_WME));
332 if (entry->path == NULL) {
333 goto err;
334 }
335 entry->pathlen = pathlen;
336
337 file = ds_open(ctxt->ds_ctxt, path, NULL);
338
339 if (file == NULL) {
340 msg("%s: failed to create file.", my_progname);
341 goto err;
342 }
343
344 if (opt_verbose) {
345 msg("%s", entry->path);
346 }
347
348 entry->file = file;
349
350 pthread_mutex_init(&entry->mutex, NULL);
351
352 return entry;
353
354 err:
355 if (entry->path != NULL) {
356 my_free(entry->path);
357 }
358 my_free(entry);
359
360 return NULL;
361 }
362
363 static
364 uchar *
get_file_entry_key(file_entry_t * entry,size_t * length,my_bool not_used)365 get_file_entry_key(file_entry_t *entry, size_t *length,
366 my_bool not_used __attribute__((unused)))
367 {
368 *length = entry->pathlen;
369 return (uchar *) entry->path;
370 }
371
372 static
373 void
file_entry_free(file_entry_t * entry)374 file_entry_free(file_entry_t *entry)
375 {
376 pthread_mutex_destroy(&entry->mutex);
377 ds_close(entry->file);
378 my_free(entry->path);
379 my_free(entry);
380 }
381
382 static
383 void *
extract_worker_thread_func(void * arg)384 extract_worker_thread_func(void *arg)
385 {
386 xb_rstream_chunk_t chunk;
387 file_entry_t *entry;
388 xb_rstream_result_t res;
389
390 extract_ctxt_t *ctxt = (extract_ctxt_t *) arg;
391
392 my_thread_init();
393
394 memset(&chunk, 0, sizeof(chunk));
395
396 while (1) {
397
398 pthread_mutex_lock(ctxt->mutex);
399 res = xb_stream_read_chunk(ctxt->stream, &chunk);
400
401 if (res != XB_STREAM_READ_CHUNK) {
402 pthread_mutex_unlock(ctxt->mutex);
403 break;
404 }
405
406 /* If unknown type and ignorable flag is set, skip this chunk */
407 if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \
408 !(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) {
409 pthread_mutex_unlock(ctxt->mutex);
410 continue;
411 }
412
413 /* See if we already have this file open */
414 entry = (file_entry_t *) my_hash_search(ctxt->filehash,
415 (uchar *) chunk.path,
416 chunk.pathlen);
417
418 if (entry == NULL) {
419 entry = file_entry_new(ctxt,
420 chunk.path,
421 chunk.pathlen);
422 if (entry == NULL) {
423 pthread_mutex_unlock(ctxt->mutex);
424 break;
425 }
426 if (my_hash_insert(ctxt->filehash, (uchar *) entry)) {
427 msg("%s: my_hash_insert() failed.",
428 my_progname);
429 pthread_mutex_unlock(ctxt->mutex);
430 break;
431 }
432 }
433
434 pthread_mutex_lock(&entry->mutex);
435
436 pthread_mutex_unlock(ctxt->mutex);
437
438 res = xb_stream_validate_checksum(&chunk);
439
440 if (res != XB_STREAM_READ_CHUNK) {
441 pthread_mutex_unlock(&entry->mutex);
442 break;
443 }
444
445 if (chunk.type == XB_CHUNK_TYPE_EOF) {
446 pthread_mutex_unlock(&entry->mutex);
447 pthread_mutex_lock(ctxt->mutex);
448 my_hash_delete(ctxt->filehash, (uchar *) entry);
449 pthread_mutex_unlock(ctxt->mutex);
450
451 continue;
452 }
453
454 if (entry->offset != chunk.offset) {
455 msg("%s: out-of-order chunk: real offset = 0x%llx, "
456 "expected offset = 0x%llx", my_progname,
457 chunk.offset, entry->offset);
458 pthread_mutex_unlock(&entry->mutex);
459 res = XB_STREAM_READ_ERROR;
460 break;
461 }
462
463 if (ds_write(entry->file, chunk.data, chunk.length)) {
464 msg("%s: my_write() failed.", my_progname);
465 pthread_mutex_unlock(&entry->mutex);
466 res = XB_STREAM_READ_ERROR;
467 break;
468 }
469
470 entry->offset += chunk.length;
471
472 pthread_mutex_unlock(&entry->mutex);
473 }
474
475 if (chunk.data)
476 my_free(chunk.data);
477
478 my_thread_end();
479
480 return (void *)(res);
481 }
482
483
484 static
485 int
mode_extract(int n_threads,int argc,char ** argv)486 mode_extract(int n_threads, int argc __attribute__((unused)),
487 char **argv __attribute__((unused)))
488 {
489 xb_rstream_t *stream = NULL;
490 HASH filehash;
491 ds_ctxt_t *ds_ctxt = NULL;
492 extract_ctxt_t ctxt;
493 int i;
494 pthread_t *tids = NULL;
495 void **retvals = NULL;
496 pthread_mutex_t mutex;
497 int ret = 0;
498
499 if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE,
500 0, 0, (my_hash_get_key) get_file_entry_key,
501 (my_hash_free_key) file_entry_free, MYF(0))) {
502 msg("%s: failed to initialize file hash.", my_progname);
503 return 1;
504 }
505
506 if (pthread_mutex_init(&mutex, NULL)) {
507 msg("%s: failed to initialize mutex.", my_progname);
508 my_hash_free(&filehash);
509 return 1;
510 }
511
512 /* If --directory is specified, it is already set as CWD by now. */
513 ds_ctxt = ds_create(".", DS_TYPE_LOCAL);
514 if (ds_ctxt == NULL) {
515 ret = 1;
516 goto exit;
517 }
518
519
520 stream = xb_stream_read_new();
521 if (stream == NULL) {
522 msg("%s: xb_stream_read_new() failed.", my_progname);
523 pthread_mutex_destroy(&mutex);
524 ret = 1;
525 goto exit;
526 }
527
528 ctxt.stream = stream;
529 ctxt.filehash = &filehash;
530 ctxt.ds_ctxt = ds_ctxt;
531 ctxt.mutex = &mutex;
532
533 tids = (pthread_t *)calloc(n_threads, sizeof(pthread_t));
534 retvals = (void **)calloc(n_threads, sizeof(void*));
535
536 for (i = 0; i < n_threads; i++)
537 pthread_create(tids + i, NULL, extract_worker_thread_func,
538 &ctxt);
539
540 for (i = 0; i < n_threads; i++)
541 pthread_join(tids[i], retvals + i);
542
543 for (i = 0; i < n_threads; i++) {
544 if ((size_t)retvals[i] == XB_STREAM_READ_ERROR) {
545 ret = 1;
546 goto exit;
547 }
548 }
549
550 exit:
551 pthread_mutex_destroy(&mutex);
552
553 free(tids);
554 free(retvals);
555
556 my_hash_free(&filehash);
557 if (ds_ctxt != NULL) {
558 ds_destroy(ds_ctxt);
559 }
560 xb_stream_read_done(stream);
561
562 return ret;
563 }
564