1 /*- 2 * Copyright (C) 2009 Gabor Kovesdan <gabor@FreeBSD.org> 3 * Copyright (C) 2012 Oleg Moskalenko <mom040267@gmail.com> 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 25 * SUCH DAMAGE. 26 * 27 * $FreeBSD: head/usr.bin/sort/file.c 281182 2015-04-07 01:17:49Z pfg $ 28 */ 29 30 31 #include <sys/mman.h> 32 #include <sys/stat.h> 33 #include <sys/types.h> 34 #include <sys/queue.h> 35 36 #include <err.h> 37 #include <fcntl.h> 38 #if defined(SORT_THREADS) 39 #include <pthread.h> 40 #endif 41 #include <semaphore.h> 42 #include <stdio.h> 43 #include <stdlib.h> 44 #include <string.h> 45 #include <unistd.h> 46 #include <wchar.h> 47 #include <wctype.h> 48 49 #include "coll.h" 50 #include "file.h" 51 #include "radixsort.h" 52 53 unsigned long long free_memory = 1000000; 54 unsigned long long available_free_memory = 1000000; 55 56 bool use_mmap; 57 58 const char *tmpdir = "/var/tmp"; 59 const char *compress_program; 60 61 size_t max_open_files = 16; 62 63 /* 64 * How much space we read from file at once 65 */ 66 #define READ_CHUNK (4096) 67 68 /* 69 * File reader structure 70 */ 71 struct file_reader 72 { 73 struct reader_buffer rb; 74 FILE *file; 75 char *fname; 76 unsigned char *buffer; 77 unsigned char *mmapaddr; 78 unsigned char *mmapptr; 79 size_t bsz; 80 size_t cbsz; 81 size_t mmapsize; 82 size_t strbeg; 83 int fd; 84 char elsymb; 85 }; 86 87 /* 88 * Structure to be used in file merge process. 89 */ 90 struct file_header 91 { 92 struct file_reader *fr; 93 struct sort_list_item *si; /* current top line */ 94 size_t file_pos; 95 }; 96 97 /* 98 * List elements of "cleanable" files list. 99 */ 100 struct CLEANABLE_FILE 101 { 102 char *fn; 103 LIST_ENTRY(CLEANABLE_FILE) files; 104 }; 105 106 /* 107 * List header of "cleanable" files list. 108 */ 109 static LIST_HEAD(CLEANABLE_FILES,CLEANABLE_FILE) tmp_files; 110 111 /* 112 * Semaphore to protect the tmp file list. 113 * We use semaphore here because it is signal-safe, according to POSIX. 114 * And semaphore does not require pthread library. 115 */ 116 static sem_t tmp_files_sem; 117 118 static void mt_sort(struct sort_list *list, 119 int (*sort_func)(void *, size_t, size_t, 120 int (*)(const void *, const void *)), const char* fn); 121 122 /* 123 * Init tmp files list 124 */ 125 void 126 init_tmp_files(void) 127 { 128 129 LIST_INIT(&tmp_files); 130 sem_init(&tmp_files_sem, 0, 1); 131 } 132 133 /* 134 * Save name of a tmp file for signal cleanup 135 */ 136 void 137 tmp_file_atexit(const char *tmp_file) 138 { 139 140 if (tmp_file) { 141 sem_wait(&tmp_files_sem); 142 struct CLEANABLE_FILE *item = 143 sort_malloc(sizeof(struct CLEANABLE_FILE)); 144 item->fn = sort_strdup(tmp_file); 145 LIST_INSERT_HEAD(&tmp_files, item, files); 146 sem_post(&tmp_files_sem); 147 } 148 } 149 150 /* 151 * Clear tmp files 152 */ 153 void 154 clear_tmp_files(void) 155 { 156 struct CLEANABLE_FILE *item; 157 158 sem_wait(&tmp_files_sem); 159 LIST_FOREACH(item,&tmp_files,files) { 160 if ((item) && (item->fn)) 161 unlink(item->fn); 162 } 163 sem_post(&tmp_files_sem); 164 } 165 166 /* 167 * Check whether a file is a temporary file 168 */ 169 static bool 170 file_is_tmp(const char* fn) 171 { 172 struct CLEANABLE_FILE *item; 173 bool ret = false; 174 175 if (fn) { 176 sem_wait(&tmp_files_sem); 177 LIST_FOREACH(item,&tmp_files,files) { 178 if ((item) && (item->fn)) 179 if (strcmp(item->fn, fn) == 0) { 180 ret = true; 181 break; 182 } 183 } 184 sem_post(&tmp_files_sem); 185 } 186 187 return (ret); 188 } 189 190 /* 191 * Generate new temporary file name 192 */ 193 char * 194 new_tmp_file_name(void) 195 { 196 static size_t tfcounter = 0; 197 static const char *fn = ".bsdsort."; 198 char *ret; 199 size_t sz; 200 201 sz = strlen(tmpdir) + 1 + strlen(fn) + 32 + 1; 202 ret = sort_malloc(sz); 203 204 sprintf(ret, "%s/%s%d.%lu", tmpdir, fn, (int) getpid(), (unsigned long)(tfcounter++)); 205 tmp_file_atexit(ret); 206 return (ret); 207 } 208 209 /* 210 * Initialize file list 211 */ 212 void 213 file_list_init(struct file_list *fl, bool tmp) 214 { 215 216 if (fl) { 217 fl->count = 0; 218 fl->sz = 0; 219 fl->fns = NULL; 220 fl->tmp = tmp; 221 } 222 } 223 224 /* 225 * Add a file name to the list 226 */ 227 void 228 file_list_add(struct file_list *fl, char *fn, bool allocate) 229 { 230 231 if (fl && fn) { 232 if (fl->count >= fl->sz || (fl->fns == NULL)) { 233 fl->sz = (fl->sz) * 2 + 1; 234 fl->fns = sort_realloc(fl->fns, fl->sz * 235 sizeof(char *)); 236 } 237 fl->fns[fl->count] = allocate ? sort_strdup(fn) : fn; 238 fl->count += 1; 239 } 240 } 241 242 /* 243 * Populate file list from array of file names 244 */ 245 void 246 file_list_populate(struct file_list *fl, int argc, char **argv, bool allocate) 247 { 248 249 if (fl && argv) { 250 int i; 251 252 for (i = 0; i < argc; i++) 253 file_list_add(fl, argv[i], allocate); 254 } 255 } 256 257 /* 258 * Clean file list data and delete the files, 259 * if this is a list of temporary files 260 */ 261 void 262 file_list_clean(struct file_list *fl) 263 { 264 265 if (fl) { 266 if (fl->fns) { 267 size_t i; 268 269 for (i = 0; i < fl->count; i++) { 270 if (fl->fns[i]) { 271 if (fl->tmp) 272 unlink(fl->fns[i]); 273 sort_free(fl->fns[i]); 274 fl->fns[i] = 0; 275 } 276 } 277 sort_free(fl->fns); 278 fl->fns = NULL; 279 } 280 fl->sz = 0; 281 fl->count = 0; 282 fl->tmp = false; 283 } 284 } 285 286 /* 287 * Init sort list 288 */ 289 void 290 sort_list_init(struct sort_list *l) 291 { 292 293 if (l) { 294 l->count = 0; 295 l->size = 0; 296 l->memsize = sizeof(struct sort_list); 297 l->list = NULL; 298 } 299 } 300 301 /* 302 * Add string to sort list 303 */ 304 void 305 sort_list_add(struct sort_list *l, struct bwstring *str) 306 { 307 308 if (l && str) { 309 size_t indx = l->count; 310 311 if ((l->list == NULL) || (indx >= l->size)) { 312 size_t newsize = (l->size + 1) + 1024; 313 314 l->list = sort_realloc(l->list, 315 sizeof(struct sort_list_item*) * newsize); 316 l->memsize += (newsize - l->size) * 317 sizeof(struct sort_list_item*); 318 l->size = newsize; 319 } 320 l->list[indx] = sort_list_item_alloc(); 321 sort_list_item_set(l->list[indx], str); 322 l->memsize += sort_list_item_size(l->list[indx]); 323 l->count += 1; 324 } 325 } 326 327 /* 328 * Clean sort list data 329 */ 330 void 331 sort_list_clean(struct sort_list *l) 332 { 333 334 if (l) { 335 if (l->list) { 336 size_t i; 337 338 for (i = 0; i < l->count; i++) { 339 struct sort_list_item *item; 340 341 item = l->list[i]; 342 343 if (item) { 344 sort_list_item_clean(item); 345 sort_free(item); 346 l->list[i] = NULL; 347 } 348 } 349 sort_free(l->list); 350 l->list = NULL; 351 } 352 l->count = 0; 353 l->size = 0; 354 l->memsize = sizeof(struct sort_list); 355 } 356 } 357 358 /* 359 * Write sort list to file 360 */ 361 void 362 sort_list_dump(struct sort_list *l, const char *fn) 363 { 364 365 if (l && fn) { 366 FILE *f; 367 368 f = openfile(fn, "w"); 369 if (f == NULL) 370 err(2, NULL); 371 372 if (l->list) { 373 size_t i; 374 if (!(sort_opts_vals.uflag)) { 375 for (i = 0; i < l->count; ++i) 376 bwsfwrite(l->list[i]->str, f, 377 sort_opts_vals.zflag); 378 } else { 379 struct sort_list_item *last_printed_item = NULL; 380 struct sort_list_item *item; 381 for (i = 0; i < l->count; ++i) { 382 item = l->list[i]; 383 if ((last_printed_item == NULL) || 384 list_coll(&last_printed_item, &item)) { 385 bwsfwrite(item->str, f, sort_opts_vals.zflag); 386 last_printed_item = item; 387 } 388 } 389 } 390 } 391 392 closefile(f, fn); 393 } 394 } 395 396 /* 397 * Checks if the given file is sorted. Stops at the first disorder, 398 * prints the disordered line and returns 1. 399 */ 400 int 401 check(const char *fn) 402 { 403 struct bwstring *s1, *s2, *s1disorder, *s2disorder; 404 struct file_reader *fr; 405 struct keys_array *ka1, *ka2; 406 int res; 407 size_t pos, posdisorder; 408 409 s1 = s2 = s1disorder = s2disorder = NULL; 410 ka1 = ka2 = NULL; 411 412 fr = file_reader_init(fn); 413 414 res = 0; 415 pos = 1; 416 posdisorder = 1; 417 418 if (fr == NULL) { 419 err(2, NULL); 420 goto end; 421 } 422 423 s1 = file_reader_readline(fr); 424 if (s1 == NULL) 425 goto end; 426 427 ka1 = keys_array_alloc(); 428 preproc(s1, ka1); 429 430 s2 = file_reader_readline(fr); 431 if (s2 == NULL) 432 goto end; 433 434 ka2 = keys_array_alloc(); 435 preproc(s2, ka2); 436 437 for (;;) { 438 439 if (debug_sort) { 440 bwsprintf(stdout, s2, "s1=<", ">"); 441 bwsprintf(stdout, s1, "s2=<", ">"); 442 } 443 int cmp = key_coll(ka2, ka1, 0); 444 if (debug_sort) 445 printf("; cmp1=%d", cmp); 446 447 if (!cmp && sort_opts_vals.complex_sort && 448 !(sort_opts_vals.uflag) && !(sort_opts_vals.sflag)) { 449 cmp = top_level_str_coll(s2, s1); 450 if (debug_sort) 451 printf("; cmp2=%d", cmp); 452 } 453 if (debug_sort) 454 printf("\n"); 455 456 if ((sort_opts_vals.uflag && (cmp <= 0)) || (cmp < 0)) { 457 if (!(sort_opts_vals.csilentflag)) { 458 s2disorder = bwsdup(s2); 459 posdisorder = pos; 460 if (debug_sort) 461 s1disorder = bwsdup(s1); 462 } 463 res = 1; 464 goto end; 465 } 466 467 pos++; 468 469 clean_keys_array(s1, ka1); 470 sort_free(ka1); 471 ka1 = ka2; 472 ka2 = NULL; 473 474 bwsfree(s1); 475 s1 = s2; 476 477 s2 = file_reader_readline(fr); 478 if (s2 == NULL) 479 goto end; 480 481 ka2 = keys_array_alloc(); 482 preproc(s2, ka2); 483 } 484 485 end: 486 if (ka1) { 487 clean_keys_array(s1, ka1); 488 sort_free(ka1); 489 } 490 491 if (s1) 492 bwsfree(s1); 493 494 if (ka2) { 495 clean_keys_array(s2, ka2); 496 sort_free(ka2); 497 } 498 499 if (s2) 500 bwsfree(s2); 501 502 if ((fn == NULL) || (*fn == 0) || (strcmp(fn, "-") == 0)) { 503 for (;;) { 504 s2 = file_reader_readline(fr); 505 if (s2 == NULL) 506 break; 507 bwsfree(s2); 508 } 509 } 510 511 file_reader_free(fr); 512 513 if (s2disorder) { 514 bws_disorder_warnx(s2disorder, fn, posdisorder); 515 if (s1disorder) { 516 bws_disorder_warnx(s1disorder, fn, posdisorder); 517 if (s1disorder != s2disorder) 518 bwsfree(s1disorder); 519 } 520 bwsfree(s2disorder); 521 s1disorder = NULL; 522 s2disorder = NULL; 523 } 524 525 if (res) 526 exit(res); 527 528 return (0); 529 } 530 531 /* 532 * Opens a file. If the given filename is "-", stdout will be 533 * opened. 534 */ 535 FILE * 536 openfile(const char *fn, const char *mode) 537 { 538 FILE *file; 539 540 if (strcmp(fn, "-") == 0) { 541 return ((mode && mode[0] == 'r') ? stdin : stdout); 542 } else { 543 mode_t orig_file_mask = 0; 544 int is_tmp = file_is_tmp(fn); 545 546 if (is_tmp && (mode[0] == 'w')) 547 orig_file_mask = umask(S_IWGRP | S_IWOTH | 548 S_IRGRP | S_IROTH); 549 550 if (is_tmp && (compress_program != NULL)) { 551 char *cmd; 552 size_t cmdsz; 553 554 cmdsz = strlen(fn) + 128; 555 cmd = sort_malloc(cmdsz); 556 557 fflush(stdout); 558 559 if (mode[0] == 'r') 560 snprintf(cmd, cmdsz - 1, "cat %s | %s -d", 561 fn, compress_program); 562 else if (mode[0] == 'w') 563 snprintf(cmd, cmdsz - 1, "%s > %s", 564 compress_program, fn); 565 else 566 err(2, "%s", getstr(7)); 567 568 if ((file = popen(cmd, mode)) == NULL) 569 err(2, NULL); 570 571 sort_free(cmd); 572 573 } else 574 if ((file = fopen(fn, mode)) == NULL) 575 err(2, NULL); 576 577 if (is_tmp && (mode[0] == 'w')) 578 umask(orig_file_mask); 579 } 580 581 return (file); 582 } 583 584 /* 585 * Close file 586 */ 587 void 588 closefile(FILE *f, const char *fn) 589 { 590 if (f == NULL) { 591 ; 592 } else if (f == stdin) { 593 ; 594 } else if (f == stdout) { 595 fflush(f); 596 } else { 597 if (file_is_tmp(fn) && compress_program != NULL) { 598 if(pclose(f)<0) 599 err(2,NULL); 600 } else 601 fclose(f); 602 } 603 } 604 605 /* 606 * Reads a file into the internal buffer. 607 */ 608 struct file_reader * 609 file_reader_init(const char *fsrc) 610 { 611 struct file_reader *ret; 612 613 if (fsrc == NULL) 614 fsrc = "-"; 615 616 ret = sort_malloc(sizeof(struct file_reader)); 617 memset(ret, 0, sizeof(struct file_reader)); 618 619 ret->elsymb = '\n'; 620 if (sort_opts_vals.zflag) 621 ret->elsymb = 0; 622 623 ret->fname = sort_strdup(fsrc); 624 625 if (strcmp(fsrc, "-") && (compress_program == NULL) && use_mmap) { 626 627 do { 628 struct stat stat_buf; 629 void *addr; 630 size_t sz = 0; 631 int fd, flags; 632 633 flags = MAP_NOCORE | MAP_NOSYNC; 634 addr = MAP_FAILED; 635 636 fd = open(fsrc, O_RDONLY); 637 if (fd < 0) 638 err(2, NULL); 639 640 if (fstat(fd, &stat_buf) < 0) { 641 close(fd); 642 break; 643 } 644 645 sz = stat_buf.st_size; 646 647 #if defined(MAP_PREFAULT_READ) 648 flags |= MAP_PREFAULT_READ; 649 #endif 650 651 addr = mmap(NULL, sz, PROT_READ, flags, fd, 0); 652 if (addr == MAP_FAILED) { 653 close(fd); 654 break; 655 } 656 657 ret->fd = fd; 658 ret->mmapaddr = addr; 659 ret->mmapsize = sz; 660 ret->mmapptr = ret->mmapaddr; 661 662 } while (0); 663 } 664 665 if (ret->mmapaddr == NULL) { 666 ret->file = openfile(fsrc, "r"); 667 if (ret->file == NULL) 668 err(2, NULL); 669 670 if (strcmp(fsrc, "-")) { 671 ret->cbsz = READ_CHUNK; 672 ret->buffer = sort_malloc(ret->cbsz); 673 ret->bsz = 0; 674 ret->strbeg = 0; 675 676 ret->bsz = fread(ret->buffer, 1, ret->cbsz, ret->file); 677 if (ret->bsz == 0) { 678 if (ferror(ret->file)) 679 err(2, NULL); 680 } 681 } 682 } 683 684 return (ret); 685 } 686 687 struct bwstring * 688 file_reader_readline(struct file_reader *fr) 689 { 690 struct bwstring *ret = NULL; 691 692 if (fr->mmapaddr) { 693 unsigned char *mmapend; 694 695 mmapend = fr->mmapaddr + fr->mmapsize; 696 if (fr->mmapptr >= mmapend) 697 return (NULL); 698 else { 699 unsigned char *strend; 700 size_t sz; 701 702 sz = mmapend - fr->mmapptr; 703 strend = memchr(fr->mmapptr, fr->elsymb, sz); 704 705 if (strend == NULL) { 706 ret = bwscsbdup(fr->mmapptr, sz); 707 fr->mmapptr = mmapend; 708 } else { 709 ret = bwscsbdup(fr->mmapptr, strend - 710 fr->mmapptr); 711 fr->mmapptr = strend + 1; 712 } 713 } 714 715 } else if (fr->file != stdin) { 716 unsigned char *strend; 717 size_t bsz1, remsz, search_start; 718 719 search_start = 0; 720 remsz = 0; 721 strend = NULL; 722 723 if (fr->bsz > fr->strbeg) 724 remsz = fr->bsz - fr->strbeg; 725 726 /* line read cycle */ 727 for (;;) { 728 if (remsz > search_start) 729 strend = memchr(fr->buffer + fr->strbeg + 730 search_start, fr->elsymb, remsz - 731 search_start); 732 else 733 strend = NULL; 734 735 if (strend) 736 break; 737 if (feof(fr->file)) 738 break; 739 740 if (fr->bsz != fr->cbsz) 741 /* NOTREACHED */ 742 err(2, "File read software error 1"); 743 744 if (remsz > (READ_CHUNK >> 1)) { 745 search_start = fr->cbsz - fr->strbeg; 746 fr->cbsz += READ_CHUNK; 747 fr->buffer = sort_realloc(fr->buffer, 748 fr->cbsz); 749 bsz1 = fread(fr->buffer + fr->bsz, 1, 750 READ_CHUNK, fr->file); 751 if (bsz1 == 0) { 752 if (ferror(fr->file)) 753 err(2, NULL); 754 break; 755 } 756 fr->bsz += bsz1; 757 remsz += bsz1; 758 } else { 759 if (remsz > 0 && fr->strbeg>0) 760 bcopy(fr->buffer + fr->strbeg, 761 fr->buffer, remsz); 762 763 fr->strbeg = 0; 764 search_start = remsz; 765 bsz1 = fread(fr->buffer + remsz, 1, 766 fr->cbsz - remsz, fr->file); 767 if (bsz1 == 0) { 768 if (ferror(fr->file)) 769 err(2, NULL); 770 break; 771 } 772 fr->bsz = remsz + bsz1; 773 remsz = fr->bsz; 774 } 775 } 776 777 if (strend == NULL) 778 strend = fr->buffer + fr->bsz; 779 780 if ((fr->buffer + fr->strbeg <= strend) && 781 (fr->strbeg < fr->bsz) && (remsz>0)) 782 ret = bwscsbdup(fr->buffer + fr->strbeg, strend - 783 fr->buffer - fr->strbeg); 784 785 fr->strbeg = (strend - fr->buffer) + 1; 786 787 } else { 788 size_t len = 0; 789 790 ret = bwsfgetln(fr->file, &len, sort_opts_vals.zflag, 791 &(fr->rb)); 792 } 793 794 return (ret); 795 } 796 797 static void 798 file_reader_clean(struct file_reader *fr) 799 { 800 801 if (fr) { 802 if (fr->mmapaddr) 803 munmap(fr->mmapaddr, fr->mmapsize); 804 805 if (fr->fd) 806 close(fr->fd); 807 808 if (fr->buffer) 809 sort_free(fr->buffer); 810 811 if (fr->file) 812 if (fr->file != stdin) 813 closefile(fr->file, fr->fname); 814 815 if(fr->fname) 816 sort_free(fr->fname); 817 818 memset(fr, 0, sizeof(struct file_reader)); 819 } 820 } 821 822 void 823 file_reader_free(struct file_reader *fr) 824 { 825 826 if (fr) { 827 file_reader_clean(fr); 828 sort_free(fr); 829 } 830 } 831 832 int 833 procfile(const char *fsrc, struct sort_list *list, struct file_list *fl) 834 { 835 struct file_reader *fr; 836 837 fr = file_reader_init(fsrc); 838 if (fr == NULL) 839 err(2, NULL); 840 841 /* file browse cycle */ 842 for (;;) { 843 struct bwstring *bws; 844 845 bws = file_reader_readline(fr); 846 847 if (bws == NULL) 848 break; 849 850 sort_list_add(list, bws); 851 852 if (list->memsize >= available_free_memory) { 853 char *fn; 854 855 fn = new_tmp_file_name(); 856 sort_list_to_file(list, fn); 857 file_list_add(fl, fn, false); 858 sort_list_clean(list); 859 } 860 } 861 862 file_reader_free(fr); 863 864 return (0); 865 } 866 867 /* 868 * Compare file headers. Files with EOF always go to the end of the list. 869 */ 870 static int 871 file_header_cmp(struct file_header *f1, struct file_header *f2) 872 { 873 874 if (f1 == f2) 875 return (0); 876 else { 877 if (f1->fr == NULL) { 878 return ((f2->fr == NULL) ? 0 : +1); 879 } else if (f2->fr == NULL) 880 return (-1); 881 else { 882 int ret; 883 884 ret = list_coll(&(f1->si), &(f2->si)); 885 if (!ret) 886 return ((f1->file_pos < f2->file_pos) ? -1 : +1); 887 return (ret); 888 } 889 } 890 } 891 892 /* 893 * Allocate and init file header structure 894 */ 895 static void 896 file_header_init(struct file_header **fh, const char *fn, size_t file_pos) 897 { 898 899 if (fh && fn) { 900 struct bwstring *line; 901 902 *fh = sort_malloc(sizeof(struct file_header)); 903 (*fh)->file_pos = file_pos; 904 (*fh)->fr = file_reader_init(fn); 905 if ((*fh)->fr == NULL) { 906 perror(fn); 907 err(2, "%s", getstr(8)); 908 } 909 line = file_reader_readline((*fh)->fr); 910 if (line == NULL) { 911 file_reader_free((*fh)->fr); 912 (*fh)->fr = NULL; 913 (*fh)->si = NULL; 914 } else { 915 (*fh)->si = sort_list_item_alloc(); 916 sort_list_item_set((*fh)->si, line); 917 } 918 } 919 } 920 921 /* 922 * Close file 923 */ 924 static void 925 file_header_close(struct file_header **fh) 926 { 927 928 if (fh && *fh) { 929 if ((*fh)->fr) { 930 file_reader_free((*fh)->fr); 931 (*fh)->fr = NULL; 932 } 933 if ((*fh)->si) { 934 sort_list_item_clean((*fh)->si); 935 sort_free((*fh)->si); 936 (*fh)->si = NULL; 937 } 938 sort_free(*fh); 939 *fh = NULL; 940 } 941 } 942 943 /* 944 * Swap two array elements 945 */ 946 static void 947 file_header_swap(struct file_header **fh, size_t i1, size_t i2) 948 { 949 struct file_header *tmp; 950 951 tmp = fh[i1]; 952 fh[i1] = fh[i2]; 953 fh[i2] = tmp; 954 } 955 956 /* heap algorithm ==>> */ 957 958 /* 959 * See heap sort algorithm 960 * "Raises" last element to its right place 961 */ 962 static void 963 file_header_heap_swim(struct file_header **fh, size_t indx) 964 { 965 966 if (indx > 0) { 967 size_t parent_index; 968 969 parent_index = (indx - 1) >> 1; 970 971 if (file_header_cmp(fh[indx], fh[parent_index]) < 0) { 972 /* swap child and parent and continue */ 973 file_header_swap(fh, indx, parent_index); 974 file_header_heap_swim(fh, parent_index); 975 } 976 } 977 } 978 979 /* 980 * Sink the top element to its correct position 981 */ 982 static void 983 file_header_heap_sink(struct file_header **fh, size_t indx, size_t size) 984 { 985 size_t left_child_index; 986 size_t right_child_index; 987 988 left_child_index = indx + indx + 1; 989 right_child_index = left_child_index + 1; 990 991 if (left_child_index < size) { 992 size_t min_child_index; 993 994 min_child_index = left_child_index; 995 996 if ((right_child_index < size) && 997 (file_header_cmp(fh[left_child_index], 998 fh[right_child_index]) > 0)) 999 min_child_index = right_child_index; 1000 if (file_header_cmp(fh[indx], fh[min_child_index]) > 0) { 1001 file_header_swap(fh, indx, min_child_index); 1002 file_header_heap_sink(fh, min_child_index, size); 1003 } 1004 } 1005 } 1006 1007 /* <<== heap algorithm */ 1008 1009 /* 1010 * Adds element to the "left" end 1011 */ 1012 static void 1013 file_header_list_rearrange_from_header(struct file_header **fh, size_t size) 1014 { 1015 1016 file_header_heap_sink(fh, 0, size); 1017 } 1018 1019 /* 1020 * Adds element to the "right" end 1021 */ 1022 static void 1023 file_header_list_push(struct file_header *f, struct file_header **fh, size_t size) 1024 { 1025 1026 fh[size++] = f; 1027 file_header_heap_swim(fh, size - 1); 1028 } 1029 1030 struct last_printed 1031 { 1032 struct bwstring *str; 1033 }; 1034 1035 /* 1036 * Prints the current line of the file 1037 */ 1038 static void 1039 file_header_print(struct file_header *fh, FILE *f_out, struct last_printed *lp) 1040 { 1041 1042 if (fh && fh->fr && f_out && fh->si && fh->si->str) { 1043 if (sort_opts_vals.uflag) { 1044 if ((lp->str == NULL) || (str_list_coll(lp->str, &(fh->si)))) { 1045 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1046 if (lp->str) 1047 bwsfree(lp->str); 1048 lp->str = bwsdup(fh->si->str); 1049 } 1050 } else 1051 bwsfwrite(fh->si->str, f_out, sort_opts_vals.zflag); 1052 } 1053 } 1054 1055 /* 1056 * Read next line 1057 */ 1058 static void 1059 file_header_read_next(struct file_header *fh) 1060 { 1061 1062 if (fh && fh->fr) { 1063 struct bwstring *tmp; 1064 1065 tmp = file_reader_readline(fh->fr); 1066 if (tmp == NULL) { 1067 file_reader_free(fh->fr); 1068 fh->fr = NULL; 1069 if (fh->si) { 1070 sort_list_item_clean(fh->si); 1071 sort_free(fh->si); 1072 fh->si = NULL; 1073 } 1074 } else { 1075 if (fh->si == NULL) 1076 fh->si = sort_list_item_alloc(); 1077 sort_list_item_set(fh->si, tmp); 1078 } 1079 } 1080 } 1081 1082 /* 1083 * Merge array of "files headers" 1084 */ 1085 static void 1086 file_headers_merge(size_t fnum, struct file_header **fh, FILE *f_out) 1087 { 1088 struct last_printed lp; 1089 size_t i; 1090 1091 memset(&lp, 0, sizeof(lp)); 1092 1093 /* 1094 * construct the initial sort structure 1095 */ 1096 for (i = 0; i < fnum; i++) 1097 file_header_list_push(fh[i], fh, i); 1098 1099 while (fh[0]->fr) { /* unfinished files are always in front */ 1100 /* output the smallest line: */ 1101 file_header_print(fh[0], f_out, &lp); 1102 /* read a new line, if possible: */ 1103 file_header_read_next(fh[0]); 1104 /* re-arrange the list: */ 1105 file_header_list_rearrange_from_header(fh, fnum); 1106 } 1107 1108 if (lp.str) 1109 bwsfree(lp.str); 1110 } 1111 1112 /* 1113 * Merges the given files into the output file, which can be 1114 * stdout. 1115 */ 1116 static void 1117 merge_files_array(size_t argc, char **argv, const char *fn_out) 1118 { 1119 1120 if (argv && fn_out) { 1121 struct file_header **fh; 1122 FILE *f_out; 1123 size_t i; 1124 1125 f_out = openfile(fn_out, "w"); 1126 1127 if (f_out == NULL) 1128 err(2, NULL); 1129 1130 fh = sort_malloc((argc + 1) * sizeof(struct file_header *)); 1131 1132 for (i = 0; i < argc; i++) 1133 file_header_init(fh + i, argv[i], (size_t) i); 1134 1135 file_headers_merge(argc, fh, f_out); 1136 1137 for (i = 0; i < argc; i++) 1138 file_header_close(fh + i); 1139 1140 sort_free(fh); 1141 1142 closefile(f_out, fn_out); 1143 } 1144 } 1145 1146 /* 1147 * Shrinks the file list until its size smaller than max number of opened files 1148 */ 1149 static int 1150 shrink_file_list(struct file_list *fl) 1151 { 1152 1153 if ((fl == NULL) || (size_t) (fl->count) < max_open_files) 1154 return (0); 1155 else { 1156 struct file_list new_fl; 1157 size_t indx = 0; 1158 1159 file_list_init(&new_fl, true); 1160 while (indx < fl->count) { 1161 char *fnew; 1162 size_t num; 1163 1164 num = fl->count - indx; 1165 fnew = new_tmp_file_name(); 1166 1167 if ((size_t) num >= max_open_files) 1168 num = max_open_files - 1; 1169 merge_files_array(num, fl->fns + indx, fnew); 1170 if (fl->tmp) { 1171 size_t i; 1172 1173 for (i = 0; i < num; i++) 1174 unlink(fl->fns[indx + i]); 1175 } 1176 file_list_add(&new_fl, fnew, false); 1177 indx += num; 1178 } 1179 fl->tmp = false; /* already taken care of */ 1180 file_list_clean(fl); 1181 1182 fl->count = new_fl.count; 1183 fl->fns = new_fl.fns; 1184 fl->sz = new_fl.sz; 1185 fl->tmp = new_fl.tmp; 1186 1187 return (1); 1188 } 1189 } 1190 1191 /* 1192 * Merge list of files 1193 */ 1194 void 1195 merge_files(struct file_list *fl, const char *fn_out) 1196 { 1197 1198 if (fl && fn_out) { 1199 while (shrink_file_list(fl)); 1200 1201 merge_files_array(fl->count, fl->fns, fn_out); 1202 } 1203 } 1204 1205 static const char * 1206 get_sort_method_name(int sm) 1207 { 1208 1209 if (sm == SORT_MERGESORT) 1210 return "mergesort"; 1211 else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1212 return "radixsort"; 1213 else if (sort_opts_vals.sort_method == SORT_HEAPSORT) 1214 return "heapsort"; 1215 else 1216 return "quicksort"; 1217 } 1218 1219 /* 1220 * Wrapper for qsort 1221 */ 1222 static int sort_qsort(void *list, size_t count, size_t elem_size, 1223 int (*cmp_func)(const void *, const void *)) 1224 { 1225 1226 qsort(list, count, elem_size, cmp_func); 1227 return (0); 1228 } 1229 1230 /* 1231 * Sort list of lines and writes it to the file 1232 */ 1233 void 1234 sort_list_to_file(struct sort_list *list, const char *outfile) 1235 { 1236 struct sort_mods *sm = &(keys[0].sm); 1237 1238 if (!(sm->Mflag) && !(sm->Vflag) && 1239 #if defined(SORT_RANDOM) 1240 !(sm->Rflag) && 1241 #endif 1242 !(sm->gflag) && !(sm->hflag) && !(sm->nflag)) { 1243 if ((sort_opts_vals.sort_method == SORT_DEFAULT) && byte_sort) 1244 sort_opts_vals.sort_method = SORT_RADIXSORT; 1245 1246 } else if (sort_opts_vals.sort_method == SORT_RADIXSORT) 1247 err(2, "%s", getstr(9)); 1248 1249 /* 1250 * to handle stable sort and the unique cases in the 1251 * right order, we need stable basic algorithm 1252 */ 1253 if (sort_opts_vals.sflag) { 1254 switch (sort_opts_vals.sort_method){ 1255 case SORT_MERGESORT: 1256 break; 1257 case SORT_RADIXSORT: 1258 break; 1259 case SORT_DEFAULT: 1260 sort_opts_vals.sort_method = SORT_MERGESORT; 1261 break; 1262 default: 1263 errx(2, "%s", getstr(10)); 1264 }; 1265 } 1266 1267 if (sort_opts_vals.sort_method == SORT_DEFAULT) 1268 sort_opts_vals.sort_method = DEFAULT_SORT_ALGORITHM; 1269 1270 if (debug_sort) 1271 printf("sort_method=%s\n", 1272 get_sort_method_name(sort_opts_vals.sort_method)); 1273 1274 switch (sort_opts_vals.sort_method){ 1275 case SORT_RADIXSORT: 1276 rxsort(list->list, list->count); 1277 sort_list_dump(list, outfile); 1278 break; 1279 case SORT_MERGESORT: 1280 mt_sort(list, mergesort, outfile); 1281 break; 1282 case SORT_HEAPSORT: 1283 mt_sort(list, heapsort, outfile); 1284 break; 1285 case SORT_QSORT: 1286 mt_sort(list, sort_qsort, outfile); 1287 break; 1288 default: 1289 mt_sort(list, DEFAULT_SORT_FUNC, outfile); 1290 break; 1291 } 1292 } 1293 1294 /******************* MT SORT ************************/ 1295 1296 #if defined(SORT_THREADS) 1297 /* semaphore to count threads */ 1298 static sem_t mtsem; 1299 1300 /* current system sort function */ 1301 static int (*g_sort_func)(void *, size_t, size_t, 1302 int(*)(const void *, const void *)); 1303 1304 /* 1305 * Sort cycle thread (in multi-threaded mode) 1306 */ 1307 static void* 1308 mt_sort_thread(void* arg) 1309 { 1310 struct sort_list *list = arg; 1311 1312 g_sort_func(list->list, list->count, sizeof(struct sort_list_item *), 1313 (int(*)(const void *, const void *)) list_coll); 1314 1315 sem_post(&mtsem); 1316 1317 return (arg); 1318 } 1319 1320 /* 1321 * Compare sub-lists. Empty sub-lists always go to the end of the list. 1322 */ 1323 static int 1324 sub_list_cmp(struct sort_list *l1, struct sort_list *l2) 1325 { 1326 1327 if (l1 == l2) 1328 return (0); 1329 else { 1330 if (l1->count == 0) { 1331 return ((l2->count == 0) ? 0 : +1); 1332 } else if (l2->count == 0) { 1333 return (-1); 1334 } else { 1335 int ret; 1336 1337 ret = list_coll(&(l1->list[0]), &(l2->list[0])); 1338 if (!ret) 1339 return ((l1->sub_list_pos < l2->sub_list_pos) ? 1340 -1 : +1); 1341 return (ret); 1342 } 1343 } 1344 } 1345 1346 /* 1347 * Swap two array elements 1348 */ 1349 static void 1350 sub_list_swap(struct sort_list **sl, size_t i1, size_t i2) 1351 { 1352 struct sort_list *tmp; 1353 1354 tmp = sl[i1]; 1355 sl[i1] = sl[i2]; 1356 sl[i2] = tmp; 1357 } 1358 1359 /* heap algorithm ==>> */ 1360 1361 /* 1362 * See heap sort algorithm 1363 * "Raises" last element to its right place 1364 */ 1365 static void 1366 sub_list_swim(struct sort_list **sl, size_t indx) 1367 { 1368 1369 if (indx > 0) { 1370 size_t parent_index; 1371 1372 parent_index = (indx - 1) >> 1; 1373 1374 if (sub_list_cmp(sl[indx], sl[parent_index]) < 0) { 1375 /* swap child and parent and continue */ 1376 sub_list_swap(sl, indx, parent_index); 1377 sub_list_swim(sl, parent_index); 1378 } 1379 } 1380 } 1381 1382 /* 1383 * Sink the top element to its correct position 1384 */ 1385 static void 1386 sub_list_sink(struct sort_list **sl, size_t indx, size_t size) 1387 { 1388 size_t left_child_index; 1389 size_t right_child_index; 1390 1391 left_child_index = indx + indx + 1; 1392 right_child_index = left_child_index + 1; 1393 1394 if (left_child_index < size) { 1395 size_t min_child_index; 1396 1397 min_child_index = left_child_index; 1398 1399 if ((right_child_index < size) && 1400 (sub_list_cmp(sl[left_child_index], 1401 sl[right_child_index]) > 0)) 1402 min_child_index = right_child_index; 1403 if (sub_list_cmp(sl[indx], sl[min_child_index]) > 0) { 1404 sub_list_swap(sl, indx, min_child_index); 1405 sub_list_sink(sl, min_child_index, size); 1406 } 1407 } 1408 } 1409 1410 /* <<== heap algorithm */ 1411 1412 /* 1413 * Adds element to the "right" end 1414 */ 1415 static void 1416 sub_list_push(struct sort_list *s, struct sort_list **sl, size_t size) 1417 { 1418 1419 sl[size++] = s; 1420 sub_list_swim(sl, size - 1); 1421 } 1422 1423 struct last_printed_item 1424 { 1425 struct sort_list_item *item; 1426 }; 1427 1428 /* 1429 * Prints the current line of the file 1430 */ 1431 static void 1432 sub_list_header_print(struct sort_list *sl, FILE *f_out, 1433 struct last_printed_item *lp) 1434 { 1435 1436 if (sl && sl->count && f_out && sl->list[0]->str) { 1437 if (sort_opts_vals.uflag) { 1438 if ((lp->item == NULL) || (list_coll(&(lp->item), 1439 &(sl->list[0])))) { 1440 bwsfwrite(sl->list[0]->str, f_out, 1441 sort_opts_vals.zflag); 1442 lp->item = sl->list[0]; 1443 } 1444 } else 1445 bwsfwrite(sl->list[0]->str, f_out, 1446 sort_opts_vals.zflag); 1447 } 1448 } 1449 1450 /* 1451 * Read next line 1452 */ 1453 static void 1454 sub_list_next(struct sort_list *sl) 1455 { 1456 1457 if (sl && sl->count) { 1458 sl->list += 1; 1459 sl->count -= 1; 1460 } 1461 } 1462 1463 /* 1464 * Merge sub-lists to a file 1465 */ 1466 static void 1467 merge_sub_lists(struct sort_list **sl, size_t n, FILE* f_out) 1468 { 1469 struct last_printed_item lp; 1470 size_t i; 1471 1472 memset(&lp,0,sizeof(lp)); 1473 1474 /* construct the initial list: */ 1475 for (i = 0; i < n; i++) 1476 sub_list_push(sl[i], sl, i); 1477 1478 while (sl[0]->count) { /* unfinished lists are always in front */ 1479 /* output the smallest line: */ 1480 sub_list_header_print(sl[0], f_out, &lp); 1481 /* move to a new line, if possible: */ 1482 sub_list_next(sl[0]); 1483 /* re-arrange the list: */ 1484 sub_list_sink(sl, 0, n); 1485 } 1486 } 1487 1488 /* 1489 * Merge sub-lists to a file 1490 */ 1491 static void 1492 merge_list_parts(struct sort_list **parts, size_t n, const char *fn) 1493 { 1494 FILE* f_out; 1495 1496 f_out = openfile(fn,"w"); 1497 1498 merge_sub_lists(parts, n, f_out); 1499 1500 closefile(f_out, fn); 1501 } 1502 1503 #endif /* defined(SORT_THREADS) */ 1504 /* 1505 * Multi-threaded sort algorithm "driver" 1506 */ 1507 static void 1508 mt_sort(struct sort_list *list, 1509 int(*sort_func)(void *, size_t, size_t, int(*)(const void *, const void *)), 1510 const char* fn) 1511 { 1512 #if defined(SORT_THREADS) 1513 if (nthreads < 2 || list->count < MT_SORT_THRESHOLD) { 1514 size_t nthreads_save = nthreads; 1515 nthreads = 1; 1516 #endif 1517 /* if single thread or small data, do simple sort */ 1518 sort_func(list->list, list->count, 1519 sizeof(struct sort_list_item *), 1520 (int(*)(const void *, const void *)) list_coll); 1521 sort_list_dump(list, fn); 1522 #if defined(SORT_THREADS) 1523 nthreads = nthreads_save; 1524 } else { 1525 /* multi-threaded sort */ 1526 struct sort_list **parts; 1527 size_t avgsize, cstart, i; 1528 1529 /* array of sub-lists */ 1530 parts = sort_malloc(sizeof(struct sort_list*) * nthreads); 1531 cstart = 0; 1532 avgsize = list->count / nthreads; 1533 1534 /* set global system sort function */ 1535 g_sort_func = sort_func; 1536 1537 /* set sublists */ 1538 for (i = 0; i < nthreads; ++i) { 1539 size_t sz = 0; 1540 1541 parts[i] = sort_malloc(sizeof(struct sort_list)); 1542 parts[i]->list = list->list + cstart; 1543 parts[i]->memsize = 0; 1544 parts[i]->sub_list_pos = i; 1545 1546 sz = (i == nthreads - 1) ? list->count - cstart : 1547 avgsize; 1548 1549 parts[i]->count = sz; 1550 1551 parts[i]->size = parts[i]->count; 1552 1553 cstart += sz; 1554 } 1555 1556 /* init threads counting semaphore */ 1557 sem_init(&mtsem, 0, 0); 1558 1559 /* start threads */ 1560 for (i = 0; i < nthreads; ++i) { 1561 pthread_t pth; 1562 pthread_attr_t attr; 1563 1564 pthread_attr_init(&attr); 1565 pthread_attr_setdetachstate(&attr, PTHREAD_DETACHED); 1566 1567 for (;;) { 1568 int res = pthread_create(&pth, &attr, 1569 mt_sort_thread, parts[i]); 1570 1571 if (res >= 0) 1572 break; 1573 if (errno == EAGAIN) { 1574 pthread_yield(); 1575 continue; 1576 } 1577 err(2, NULL); 1578 } 1579 1580 pthread_attr_destroy(&attr); 1581 } 1582 1583 /* wait for threads completion */ 1584 for (i = 0; i < nthreads; ++i) { 1585 sem_wait(&mtsem); 1586 } 1587 /* destroy the semaphore - we do not need it anymore */ 1588 sem_destroy(&mtsem); 1589 1590 /* merge sorted sub-lists to the file */ 1591 merge_list_parts(parts, nthreads, fn); 1592 1593 /* free sub-lists data */ 1594 for (i = 0; i < nthreads; ++i) { 1595 sort_free(parts[i]); 1596 } 1597 sort_free(parts); 1598 } 1599 #endif /* defined(SORT_THREADS) */ 1600 } 1601