1 /****************************************************** 2 Copyright (c) 2011-2013 Percona LLC and/or its affiliates. 3 4 The xbstream utility: serialize/deserialize files in the XBSTREAM format. 5 6 This program is free software; you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation; version 2 of the License. 9 10 This program is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 GNU General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA 18 19 *******************************************************/ 20 21 #include <my_global.h> 22 #include <my_base.h> 23 #include <my_getopt.h> 24 #include <hash.h> 25 #include <my_pthread.h> 26 #include "common.h" 27 #include "xbstream.h" 28 #include "datasink.h" 29 #include "crc_glue.h" 30 31 #define XBSTREAM_VERSION "1.0" 32 #define XBSTREAM_BUFFER_SIZE (10 * 1024 * 1024UL) 33 34 #define START_FILE_HASH_SIZE 16 35 36 typedef enum { 37 RUN_MODE_NONE, 38 RUN_MODE_CREATE, 39 RUN_MODE_EXTRACT 40 } run_mode_t; 41 42 /* Need the following definitions to avoid linking with ds_*.o and their link 43 dependencies */ 44 datasink_t datasink_archive; 45 datasink_t datasink_xbstream; 46 datasink_t datasink_compress; 47 datasink_t datasink_tmpfile; 48 49 static run_mode_t opt_mode; 50 static char * opt_directory = NULL; 51 static my_bool opt_verbose = 0; 52 static int opt_parallel = 1; 53 54 static struct my_option my_long_options[] = 55 { 56 {"help", '?', "Display this help and exit.", 57 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 58 {"create", 'c', "Stream the specified files to the standard output.", 59 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 60 {"extract", 'x', "Extract to disk files from the stream on the " 61 "standard input.", 62 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 63 {"directory", 'C', "Change the current directory to the specified one " 64 "before streaming or extracting.", &opt_directory, &opt_directory, 0, 65 GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, 66 {"verbose", 'v', "Print verbose output.", &opt_verbose, &opt_verbose, 67 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, 68 {"parallel", 'p', "Number of worker threads for reading / writing.", 69 &opt_parallel, &opt_parallel, 0, GET_INT, REQUIRED_ARG, 70 1, 1, INT_MAX, 0, 0, 0}, 71 72 {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} 73 }; 74 75 typedef struct { 76 HASH *filehash; 77 xb_rstream_t *stream; 78 ds_ctxt_t *ds_ctxt; 79 pthread_mutex_t *mutex; 80 } extract_ctxt_t; 81 82 typedef struct { 83 char *path; 84 uint pathlen; 85 my_off_t offset; 86 ds_file_t *file; 87 pthread_mutex_t mutex; 88 } file_entry_t; 89 90 static int get_options(int *argc, char ***argv); 91 static int mode_create(int argc, char **argv); 92 static int mode_extract(int n_threads, int argc, char **argv); 93 static my_bool get_one_option(int optid, const struct my_option *opt, 94 char *argument); 95 96 int 97 main(int argc, char **argv) 98 { 99 MY_INIT(argv[0]); 100 101 crc_init(); 102 103 if (get_options(&argc, &argv)) { 104 goto err; 105 } 106 107 if (opt_mode == RUN_MODE_NONE) { 108 msg("%s: either -c or -x must be specified.", my_progname); 109 goto err; 110 } 111 112 /* Change the current directory if -C is specified */ 113 if (opt_directory && my_setwd(opt_directory, MYF(MY_WME))) { 114 goto err; 115 } 116 117 if (opt_mode == RUN_MODE_CREATE && mode_create(argc, argv)) { 118 goto err; 119 } else if (opt_mode == RUN_MODE_EXTRACT && 120 mode_extract(opt_parallel, argc, argv)) { 121 goto err; 122 } 123 124 my_cleanup_options(my_long_options); 125 126 my_end(0); 127 128 return EXIT_SUCCESS; 129 err: 130 my_cleanup_options(my_long_options); 131 132 my_end(0); 133 134 exit(EXIT_FAILURE); 135 } 136 137 static 138 int 139 get_options(int *argc, char ***argv) 140 { 141 int ho_error; 142 143 if ((ho_error= handle_options(argc, argv, my_long_options, 144 get_one_option))) { 145 exit(EXIT_FAILURE); 146 } 147 148 return 0; 149 } 150 151 static 152 void 153 print_version(void) 154 { 155 printf("%s Ver %s for %s (%s)\n", my_progname, XBSTREAM_VERSION, 156 SYSTEM_TYPE, MACHINE_TYPE); 157 } 158 159 static 160 void 161 usage(void) 162 { 163 print_version(); 164 puts("Copyright (C) 2011-2013 Percona LLC and/or its affiliates."); 165 puts("This software comes with ABSOLUTELY NO WARRANTY. " 166 "This is free software,\nand you are welcome to modify and " 167 "redistribute it under the GPL license.\n"); 168 169 puts("Serialize/deserialize files in the XBSTREAM format.\n"); 170 171 puts("Usage: "); 172 printf(" %s -c [OPTIONS...] FILES... # stream specified files to " 173 "standard output.\n", my_progname); 174 printf(" %s -x [OPTIONS...] # extract files from the stream" 175 "on the standard input.\n", my_progname); 176 177 puts("\nOptions:"); 178 my_print_help(my_long_options); 179 } 180 181 static 182 int 183 set_run_mode(run_mode_t mode) 184 { 185 if (opt_mode != RUN_MODE_NONE) { 186 msg("%s: can't set specify both -c and -x.", my_progname); 187 return 1; 188 } 189 190 opt_mode = mode; 191 192 return 0; 193 } 194 195 static 196 my_bool 197 get_one_option(int optid, const struct my_option *opt __attribute__((unused)), 198 char *argument __attribute__((unused))) 199 { 200 switch (optid) { 201 case 'c': 202 if (set_run_mode(RUN_MODE_CREATE)) { 203 return TRUE; 204 } 205 break; 206 case 'x': 207 if (set_run_mode(RUN_MODE_EXTRACT)) { 208 return TRUE; 209 } 210 break; 211 case '?': 212 usage(); 213 exit(0); 214 } 215 216 return FALSE; 217 } 218 219 static 220 int 221 stream_one_file(File file, xb_wstream_file_t *xbfile) 222 { 223 uchar *buf; 224 ssize_t bytes; 225 my_off_t offset; 226 227 posix_fadvise(file, 0, 0, POSIX_FADV_SEQUENTIAL); 228 offset = my_tell(file, MYF(MY_WME)); 229 230 buf = (uchar*)(my_malloc(XBSTREAM_BUFFER_SIZE, MYF(MY_FAE))); 231 232 while ((bytes = (ssize_t)my_read(file, buf, XBSTREAM_BUFFER_SIZE, 233 MYF(MY_WME))) > 0) { 234 if (xb_stream_write_data(xbfile, buf, bytes)) { 235 msg("%s: xb_stream_write_data() failed.", 236 my_progname); 237 my_free(buf); 238 return 1; 239 } 240 posix_fadvise(file, offset, XBSTREAM_BUFFER_SIZE, 241 POSIX_FADV_DONTNEED); 242 offset += XBSTREAM_BUFFER_SIZE; 243 244 } 245 246 my_free(buf); 247 248 if (bytes < 0) { 249 return 1; 250 } 251 252 return 0; 253 } 254 255 static 256 int 257 mode_create(int argc, char **argv) 258 { 259 int i; 260 MY_STAT mystat; 261 xb_wstream_t *stream; 262 263 if (argc < 1) { 264 msg("%s: no files are specified.", my_progname); 265 return 1; 266 } 267 268 stream = xb_stream_write_new(); 269 if (stream == NULL) { 270 msg("%s: xb_stream_write_new() failed.", my_progname); 271 return 1; 272 } 273 274 for (i = 0; i < argc; i++) { 275 char *filepath = argv[i]; 276 File src_file; 277 xb_wstream_file_t *file; 278 279 if (my_stat(filepath, &mystat, MYF(MY_WME)) == NULL) { 280 goto err; 281 } 282 if (!MY_S_ISREG(mystat.st_mode)) { 283 msg("%s: %s is not a regular file, exiting.", 284 my_progname, filepath); 285 goto err; 286 } 287 288 if ((src_file = my_open(filepath, O_RDONLY, MYF(MY_WME))) < 0) { 289 msg("%s: failed to open %s.", my_progname, filepath); 290 goto err; 291 } 292 293 file = xb_stream_write_open(stream, filepath, &mystat, NULL, NULL); 294 if (file == NULL) { 295 goto err; 296 } 297 298 if (opt_verbose) { 299 msg("%s", filepath); 300 } 301 302 if (stream_one_file(src_file, file) || 303 xb_stream_write_close(file) || 304 my_close(src_file, MYF(MY_WME))) { 305 goto err; 306 } 307 } 308 309 xb_stream_write_done(stream); 310 311 return 0; 312 err: 313 xb_stream_write_done(stream); 314 315 return 1; 316 } 317 318 static 319 file_entry_t * 320 file_entry_new(extract_ctxt_t *ctxt, const char *path, uint pathlen) 321 { 322 file_entry_t *entry; 323 ds_file_t *file; 324 325 entry = (file_entry_t *) my_malloc(sizeof(file_entry_t), 326 MYF(MY_WME | MY_ZEROFILL)); 327 if (entry == NULL) { 328 return NULL; 329 } 330 331 entry->path = my_strndup(path, pathlen, MYF(MY_WME)); 332 if (entry->path == NULL) { 333 goto err; 334 } 335 entry->pathlen = pathlen; 336 337 file = ds_open(ctxt->ds_ctxt, path, NULL); 338 339 if (file == NULL) { 340 msg("%s: failed to create file.", my_progname); 341 goto err; 342 } 343 344 if (opt_verbose) { 345 msg("%s", entry->path); 346 } 347 348 entry->file = file; 349 350 pthread_mutex_init(&entry->mutex, NULL); 351 352 return entry; 353 354 err: 355 if (entry->path != NULL) { 356 my_free(entry->path); 357 } 358 my_free(entry); 359 360 return NULL; 361 } 362 363 static 364 uchar * 365 get_file_entry_key(file_entry_t *entry, size_t *length, 366 my_bool not_used __attribute__((unused))) 367 { 368 *length = entry->pathlen; 369 return (uchar *) entry->path; 370 } 371 372 static 373 void 374 file_entry_free(file_entry_t *entry) 375 { 376 pthread_mutex_destroy(&entry->mutex); 377 ds_close(entry->file); 378 my_free(entry->path); 379 my_free(entry); 380 } 381 382 static 383 void * 384 extract_worker_thread_func(void *arg) 385 { 386 xb_rstream_chunk_t chunk; 387 file_entry_t *entry; 388 xb_rstream_result_t res; 389 390 extract_ctxt_t *ctxt = (extract_ctxt_t *) arg; 391 392 my_thread_init(); 393 394 memset(&chunk, 0, sizeof(chunk)); 395 396 while (1) { 397 398 pthread_mutex_lock(ctxt->mutex); 399 res = xb_stream_read_chunk(ctxt->stream, &chunk); 400 401 if (res != XB_STREAM_READ_CHUNK) { 402 pthread_mutex_unlock(ctxt->mutex); 403 break; 404 } 405 406 /* If unknown type and ignorable flag is set, skip this chunk */ 407 if (chunk.type == XB_CHUNK_TYPE_UNKNOWN && \ 408 !(chunk.flags & XB_STREAM_FLAG_IGNORABLE)) { 409 pthread_mutex_unlock(ctxt->mutex); 410 continue; 411 } 412 413 /* See if we already have this file open */ 414 entry = (file_entry_t *) my_hash_search(ctxt->filehash, 415 (uchar *) chunk.path, 416 chunk.pathlen); 417 418 if (entry == NULL) { 419 entry = file_entry_new(ctxt, 420 chunk.path, 421 chunk.pathlen); 422 if (entry == NULL) { 423 pthread_mutex_unlock(ctxt->mutex); 424 break; 425 } 426 if (my_hash_insert(ctxt->filehash, (uchar *) entry)) { 427 msg("%s: my_hash_insert() failed.", 428 my_progname); 429 pthread_mutex_unlock(ctxt->mutex); 430 break; 431 } 432 } 433 434 pthread_mutex_lock(&entry->mutex); 435 436 pthread_mutex_unlock(ctxt->mutex); 437 438 res = xb_stream_validate_checksum(&chunk); 439 440 if (res != XB_STREAM_READ_CHUNK) { 441 pthread_mutex_unlock(&entry->mutex); 442 break; 443 } 444 445 if (chunk.type == XB_CHUNK_TYPE_EOF) { 446 pthread_mutex_unlock(&entry->mutex); 447 pthread_mutex_lock(ctxt->mutex); 448 my_hash_delete(ctxt->filehash, (uchar *) entry); 449 pthread_mutex_unlock(ctxt->mutex); 450 451 continue; 452 } 453 454 if (entry->offset != chunk.offset) { 455 msg("%s: out-of-order chunk: real offset = 0x%llx, " 456 "expected offset = 0x%llx", my_progname, 457 chunk.offset, entry->offset); 458 pthread_mutex_unlock(&entry->mutex); 459 res = XB_STREAM_READ_ERROR; 460 break; 461 } 462 463 if (ds_write(entry->file, chunk.data, chunk.length)) { 464 msg("%s: my_write() failed.", my_progname); 465 pthread_mutex_unlock(&entry->mutex); 466 res = XB_STREAM_READ_ERROR; 467 break; 468 } 469 470 entry->offset += chunk.length; 471 472 pthread_mutex_unlock(&entry->mutex); 473 } 474 475 if (chunk.data) 476 my_free(chunk.data); 477 478 my_thread_end(); 479 480 return (void *)(res); 481 } 482 483 484 static 485 int 486 mode_extract(int n_threads, int argc __attribute__((unused)), 487 char **argv __attribute__((unused))) 488 { 489 xb_rstream_t *stream = NULL; 490 HASH filehash; 491 ds_ctxt_t *ds_ctxt = NULL; 492 extract_ctxt_t ctxt; 493 int i; 494 pthread_t *tids = NULL; 495 void **retvals = NULL; 496 pthread_mutex_t mutex; 497 int ret = 0; 498 499 if (my_hash_init(&filehash, &my_charset_bin, START_FILE_HASH_SIZE, 500 0, 0, (my_hash_get_key) get_file_entry_key, 501 (my_hash_free_key) file_entry_free, MYF(0))) { 502 msg("%s: failed to initialize file hash.", my_progname); 503 return 1; 504 } 505 506 if (pthread_mutex_init(&mutex, NULL)) { 507 msg("%s: failed to initialize mutex.", my_progname); 508 my_hash_free(&filehash); 509 return 1; 510 } 511 512 /* If --directory is specified, it is already set as CWD by now. */ 513 ds_ctxt = ds_create(".", DS_TYPE_LOCAL); 514 if (ds_ctxt == NULL) { 515 ret = 1; 516 goto exit; 517 } 518 519 520 stream = xb_stream_read_new(); 521 if (stream == NULL) { 522 msg("%s: xb_stream_read_new() failed.", my_progname); 523 pthread_mutex_destroy(&mutex); 524 ret = 1; 525 goto exit; 526 } 527 528 ctxt.stream = stream; 529 ctxt.filehash = &filehash; 530 ctxt.ds_ctxt = ds_ctxt; 531 ctxt.mutex = &mutex; 532 533 tids = (pthread_t *)calloc(n_threads, sizeof(pthread_t)); 534 retvals = (void **)calloc(n_threads, sizeof(void*)); 535 536 for (i = 0; i < n_threads; i++) 537 pthread_create(tids + i, NULL, extract_worker_thread_func, 538 &ctxt); 539 540 for (i = 0; i < n_threads; i++) 541 pthread_join(tids[i], retvals + i); 542 543 for (i = 0; i < n_threads; i++) { 544 if ((size_t)retvals[i] == XB_STREAM_READ_ERROR) { 545 ret = 1; 546 goto exit; 547 } 548 } 549 550 exit: 551 pthread_mutex_destroy(&mutex); 552 553 free(tids); 554 free(retvals); 555 556 my_hash_free(&filehash); 557 if (ds_ctxt != NULL) { 558 ds_destroy(ds_ctxt); 559 } 560 xb_stream_read_done(stream); 561 562 return ret; 563 } 564