1 /*  bamshuf.c -- collate subcommand.
2 
3     Copyright (C) 2012 Broad Institute.
4     Copyright (C) 2013, 2015-2019 Genome Research Ltd.
5 
6     Author: Heng Li <lh3@sanger.ac.uk>
7 
8 Permission is hereby granted, free of charge, to any person obtaining a copy
9 of this software and associated documentation files (the "Software"), to deal
10 in the Software without restriction, including without limitation the rights
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 copies of the Software, and to permit persons to whom the Software is
13 furnished to do so, subject to the following conditions:
14 
15 The above copyright notice and this permission notice shall be included in
16 all copies or substantial portions of the Software.
17 
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
21 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 DEALINGS IN THE SOFTWARE.  */
25 
26 #include <config.h>
27 
28 #include <unistd.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <assert.h>
33 #include <errno.h>
34 #ifdef _WIN32
35 #  define WIN32_LEAN_AND_MEAN
36 #  include <windows.h>
37 #endif
38 #include "htslib/sam.h"
39 #include "htslib/hts.h"
40 #include "htslib/ksort.h"
41 #include "samtools.h"
42 #include "htslib/thread_pool.h"
43 #include "sam_opts.h"
44 #include "htslib/khash.h"
45 
46 #define DEF_CLEVEL 1
47 
hash_Wang(unsigned key)48 static inline unsigned hash_Wang(unsigned key)
49 {
50     key += ~(key << 15);
51     key ^=  (key >> 10);
52     key +=  (key << 3);
53     key ^=  (key >> 6);
54     key += ~(key << 11);
55     key ^=  (key >> 16);
56     return key;
57 }
58 
hash_X31_Wang(const char * s)59 static inline unsigned hash_X31_Wang(const char *s)
60 {
61     unsigned h = *s;
62     if (h) {
63         for (++s ; *s; ++s) h = (h << 5) - h + *s;
64         return hash_Wang(h);
65     } else return 0;
66 }
67 
68 typedef struct {
69     unsigned key;
70     bam1_t *b;
71 } elem_t;
72 
elem_lt(elem_t x,elem_t y)73 static inline int elem_lt(elem_t x, elem_t y)
74 {
75     if (x.key < y.key) return 1;
76     if (x.key == y.key) {
77         int t;
78         t = strcmp(bam_get_qname(x.b), bam_get_qname(y.b));
79         if (t < 0) return 1;
80         return (t == 0 && ((x.b->core.flag>>6&3) < (y.b->core.flag>>6&3)));
81     } else return 0;
82 }
83 
84 KSORT_INIT(bamshuf, elem_t, elem_lt)
85 
86 
87 typedef struct {
88     int written;
89     bam1_t *b;
90 } bam_item_t;
91 
92 typedef struct {
93     bam1_t *bam_pool;
94     bam_item_t *items;
95     size_t size;
96     size_t index;
97 } bam_list_t;
98 
99 typedef struct {
100     bam_item_t *bi;
101 } store_item_t;
102 
KHASH_MAP_INIT_STR(bam_store,store_item_t)103 KHASH_MAP_INIT_STR(bam_store, store_item_t)
104 
105 
106 static bam_item_t *store_bam(bam_list_t *list) {
107     size_t old_index = list->index;
108 
109     list->items[list->index++].written = 0;
110 
111     if (list->index >= list->size)
112         list->index = 0;
113 
114     return &list->items[old_index];
115 }
116 
117 
write_bam_needed(bam_list_t * list)118 static int write_bam_needed(bam_list_t *list) {
119     return !list->items[list->index].written;
120 }
121 
122 
mark_bam_as_written(bam_list_t * list)123 static void mark_bam_as_written(bam_list_t *list) {
124     list->items[list->index].written = 1;
125 }
126 
127 
create_bam_list(bam_list_t * list,size_t max_size)128 static int create_bam_list(bam_list_t *list, size_t max_size) {
129     size_t i;
130 
131     list->size = list->index = 0;
132     list->items    = NULL;
133     list->bam_pool = NULL;
134 
135     if ((list->items = malloc(max_size * sizeof(bam_item_t))) == NULL) {
136         return 1;
137     }
138 
139     if ((list->bam_pool = calloc(max_size, sizeof(bam1_t))) == NULL) {
140         return 1;
141     }
142 
143     for (i = 0; i < max_size; i++) {
144         list->items[i].b = &list->bam_pool[i];
145         list->items[i].written = 1;
146     }
147 
148     list->size  = max_size;
149     list->index = 0;
150 
151     return 0;
152 }
153 
154 
destroy_bam_list(bam_list_t * list)155 static void destroy_bam_list(bam_list_t *list) {
156     size_t i;
157 
158     for (i = 0; i < list->size; i++) {
159         free(list->bam_pool[i].data);
160     }
161 
162     free(list->bam_pool);
163     free(list->items);
164 }
165 
166 
write_to_bin_file(bam1_t * bam,int64_t * count,samFile ** bin_files,char ** names,sam_hdr_t * header,int files)167 static inline int write_to_bin_file(bam1_t *bam, int64_t *count, samFile **bin_files, char **names, sam_hdr_t *header, int files) {
168     uint32_t x;
169 
170     x = hash_X31_Wang(bam_get_qname(bam)) % files;
171 
172     if (sam_write1(bin_files[x], header, bam) < 0) {
173         print_error_errno("collate", "Couldn't write to intermediate file \"%s\"", names[x]);
174         return 1;
175     }
176 
177     ++count[x];
178 
179     return 0;
180 }
181 
182 
bamshuf(const char * fn,int n_files,const char * pre,int clevel,int is_stdout,const char * output_file,int fast,int store_max,sam_global_args * ga,char * arg_list,int no_pg)183 static int bamshuf(const char *fn, int n_files, const char *pre, int clevel,
184                    int is_stdout, const char *output_file, int fast, int store_max, sam_global_args *ga, char *arg_list, int no_pg)
185 {
186     samFile *fp, *fpw = NULL, **fpt = NULL;
187     char **fnt = NULL, modew[8];
188     bam1_t *b = NULL;
189     int i, counter, l, r;
190     sam_hdr_t *h = NULL;
191     int64_t j, max_cnt = 0, *cnt = NULL;
192     elem_t *a = NULL;
193     htsThreadPool p = {NULL, 0};
194 
195     if (ga->nthreads > 0) {
196         if (!(p.pool = hts_tpool_init(ga->nthreads))) {
197             print_error_errno("collate", "Error creating thread pool\n");
198             return 1;
199         }
200     }
201 
202     // Read input, distribute reads pseudo-randomly into n_files temporary
203     // files.
204     fp = sam_open_format(fn, "r", &ga->in);
205     if (fp == NULL) {
206         print_error_errno("collate", "Cannot open input file \"%s\"", fn);
207         return 1;
208     }
209     if (p.pool) hts_set_opt(fp, HTS_OPT_THREAD_POOL, &p);
210 
211     h = sam_hdr_read(fp);
212     if (h == NULL) {
213         fprintf(stderr, "Couldn't read header for '%s'\n", fn);
214         goto fail;
215     }
216 
217     if ((-1 == sam_hdr_update_hd(h, "SO", "unsorted", "GO", "query"))
218      && (-1 == sam_hdr_add_line(h, "HD", "VN", SAM_FORMAT_VERSION, "SO", "unsorted", "GO", "query", NULL))
219      ) {
220         print_error("collate", "failed to update HD line\n");
221         goto fail;
222     }
223 
224     // open final output file
225     l = strlen(pre);
226 
227     sprintf(modew, "wb%d", (clevel >= 0 && clevel <= 9)? clevel : DEF_CLEVEL);
228 
229     if (!is_stdout && !output_file) { // output to a file (name based on prefix)
230         char *fnw = (char*)calloc(l + 5, 1);
231         if (!fnw) goto mem_fail;
232         if (ga->out.format == unknown_format)
233             sprintf(fnw, "%s.bam", pre); // "wb" above makes BAM the default
234         else
235             sprintf(fnw, "%s.%s", pre,  hts_format_file_extension(&ga->out));
236         fpw = sam_open_format(fnw, modew, &ga->out);
237         free(fnw);
238     } else if (output_file) { // output to a given file
239         modew[0] = 'w'; modew[1] = '\0';
240         sam_open_mode(modew + 1, output_file, NULL);
241         j = strlen(modew);
242         snprintf(modew + j, sizeof(modew) - j, "%d",
243                  (clevel >= 0 && clevel <= 9)? clevel : DEF_CLEVEL);
244         fpw = sam_open_format(output_file, modew, &ga->out);
245     } else fpw = sam_open_format("-", modew, &ga->out); // output to stdout
246     if (fpw == NULL) {
247         if (is_stdout) print_error_errno("collate", "Cannot open standard output");
248         else print_error_errno("collate", "Cannot open output file \"%s.bam\"", pre);
249         goto fail;
250     }
251     if (p.pool) hts_set_opt(fpw, HTS_OPT_THREAD_POOL, &p);
252 
253     if (!no_pg && sam_hdr_add_pg(h, "samtools",
254                                  "VN", samtools_version(),
255                                  arg_list ? "CL": NULL,
256                                  arg_list ? arg_list : NULL,
257                                  NULL)) {
258         print_error("collate", "failed to add PG line to header of \"%s\"", output_file);
259         goto fail;
260     }
261 
262     if (sam_hdr_write(fpw, h) < 0) {
263         print_error_errno("collate", "Couldn't write header");
264         goto fail;
265     }
266 
267     fnt = (char**)calloc(n_files, sizeof(char*));
268     if (!fnt) goto mem_fail;
269     fpt = (samFile**)calloc(n_files, sizeof(samFile*));
270     if (!fpt) goto mem_fail;
271     cnt = (int64_t*)calloc(n_files, 8);
272     if (!cnt) goto mem_fail;
273 
274     for (i = counter = 0; i < n_files; ++i) {
275         fnt[i] = (char*)calloc(l + 20, 1);
276         if (!fnt[i]) goto mem_fail;
277         do {
278             sprintf(fnt[i], "%s.%04d.bam", pre, counter++);
279             fpt[i] = sam_open(fnt[i], "wxb1");
280         } while (!fpt[i] && errno == EEXIST);
281         if (fpt[i] == NULL) {
282             print_error_errno("collate", "Cannot open intermediate file \"%s\"", fnt[i]);
283             goto fail;
284         }
285         if (p.pool) hts_set_opt(fpt[i], HTS_OPT_THREAD_POOL, &p);
286         if (sam_hdr_write(fpt[i], h) < 0) {
287             print_error_errno("collate", "Couldn't write header to intermediate file \"%s\"", fnt[i]);
288             goto fail;
289         }
290     }
291 
292     if (fast) {
293         khash_t(bam_store) *stored = kh_init(bam_store);
294         khiter_t itr;
295         bam_list_t list;
296         int err = 0;
297         if (!stored) goto mem_fail;
298 
299         if (store_max < 2) store_max = 2;
300 
301         if (create_bam_list(&list, store_max)) {
302             fprintf(stderr, "[collate[ ERROR: unable to create bam list.\n");
303             err = 1;
304             goto fast_fail;
305         }
306 
307         while ((r = sam_read1(fp, h, list.items[list.index].b)) >= 0) {
308             int ret;
309             bam1_t *b = list.items[list.index].b;
310             int readflag = b->core.flag & (BAM_FREAD1 | BAM_FREAD2);
311 
312             // strictly paired reads only
313             if (!(b->core.flag & (BAM_FSECONDARY | BAM_FSUPPLEMENTARY)) && (readflag == BAM_FREAD1 || readflag == BAM_FREAD2)) {
314 
315                 itr = kh_get(bam_store, stored, bam_get_qname(b));
316 
317                 if (itr == kh_end(stored)) {
318                     // new read
319                     itr = kh_put(bam_store, stored, bam_get_qname(b), &ret);
320 
321                     if (ret > 0) { // okay to go ahead store it
322                         kh_value(stored, itr).bi = store_bam(&list);
323 
324                         // see if the next one on the list needs to be written out
325                         if (write_bam_needed(&list)) {
326                             if (write_to_bin_file(list.items[list.index].b, cnt, fpt, fnt, h, n_files) < 0) {
327                                 fprintf(stderr, "[collate] ERROR: could not write line.\n");
328                                 err = 1;
329                                 goto fast_fail;
330                             } else {
331                                 mark_bam_as_written(&list);
332 
333                                 itr = kh_get(bam_store, stored, bam_get_qname(list.items[list.index].b));
334 
335                                 if (itr != kh_end(stored)) {
336                                     kh_del(bam_store, stored, itr);
337                                 } else {
338                                     fprintf(stderr, "[collate] ERROR: stored value not in hash.\n");
339                                     err = 1;
340                                     goto fast_fail;
341                                 }
342                             }
343                         }
344                     } else if (ret == 0) {
345                         fprintf(stderr, "[collate] ERROR: value already in hash.\n");
346                         err = 1;
347                         goto fast_fail;
348                     } else {
349                         fprintf(stderr, "[collate] ERROR: unable to store in hash.\n");
350                         err = 1;
351                         goto fast_fail;
352                     }
353                 } else { // we have a match
354                     // write out the reads in R1 R2 order
355                     bam1_t *r1, *r2;
356 
357                     if (b->core.flag & BAM_FREAD1) {
358                         r1 = b;
359                         r2 = kh_value(stored, itr).bi->b;
360                     } else {
361                         r1 = kh_value(stored, itr).bi->b;
362                         r2 = b;
363                     }
364 
365                     if (sam_write1(fpw, h, r1) < 0) {
366                         fprintf(stderr, "[collate] ERROR: could not write r1 alignment.\n");
367                         err = 1;
368                         goto fast_fail;
369                     }
370 
371                     if (sam_write1(fpw, h, r2) < 0) {
372                         fprintf(stderr, "[collate] ERROR: could not write r2 alignment.\n");
373                         err = 1;
374                         goto fast_fail;
375                     }
376 
377                     mark_bam_as_written(&list);
378 
379                     // remove stored read
380                     kh_value(stored, itr).bi->written = 1;
381                     kh_del(bam_store, stored, itr);
382                 }
383             }
384         }
385 
386         for (list.index = 0; list.index < list.size; list.index++) {
387             if (write_bam_needed(&list)) {
388                 bam1_t *b = list.items[list.index].b;
389 
390                 if (write_to_bin_file(b, cnt, fpt, fnt, h, n_files)) {
391                     err = 1;
392                     goto fast_fail;
393                 } else {
394                     itr = kh_get(bam_store, stored, bam_get_qname(b));
395                     kh_del(bam_store, stored, itr);
396                 }
397             }
398         }
399 
400  fast_fail:
401         if (err) {
402             for (itr = kh_begin(stored); itr != kh_end(stored); ++itr) {
403                 if (kh_exist(stored, itr)) {
404                     kh_del(bam_store, stored, itr);
405                 }
406             }
407 
408             kh_destroy(bam_store, stored);
409             destroy_bam_list(&list);
410             goto fail;
411         } else {
412             kh_destroy(bam_store, stored);
413             destroy_bam_list(&list);
414         }
415 
416     } else {
417         b = bam_init1();
418         if (!b) goto mem_fail;
419 
420         while ((r = sam_read1(fp, h, b)) >= 0) {
421             if (write_to_bin_file(b, cnt, fpt, fnt, h, n_files)) {
422                 bam_destroy1(b);
423                 goto fail;
424             }
425         }
426 
427         bam_destroy1(b);
428     }
429 
430     if (r < -1) {
431         fprintf(stderr, "Error reading input file\n");
432         goto fail;
433     }
434     for (i = 0; i < n_files; ++i) {
435         // Close split output
436         r = sam_close(fpt[i]);
437         fpt[i] = NULL;
438         if (r < 0) {
439             fprintf(stderr, "Error on closing '%s'\n", fnt[i]);
440             return 1;
441         }
442 
443         // Find biggest count
444         if (max_cnt < cnt[i]) max_cnt = cnt[i];
445     }
446     free(fpt);
447     fpt = NULL;
448     sam_close(fp);
449     fp = NULL;
450 
451     // merge
452     a = malloc(max_cnt * sizeof(elem_t));
453     if (!a) goto mem_fail;
454     for (j = 0; j < max_cnt; ++j) {
455         a[j].b = bam_init1();
456         if (!a[j].b) { max_cnt = j; goto mem_fail; }
457     }
458 
459     for (i = 0; i < n_files; ++i) {
460         int64_t c = cnt[i];
461         fp = sam_open_format(fnt[i], "r", &ga->in);
462         if (NULL == fp) {
463             print_error_errno("collate", "Couldn't open \"%s\"", fnt[i]);
464             goto fail;
465         }
466         if (p.pool) hts_set_opt(fp, HTS_OPT_THREAD_POOL, &p);
467         sam_hdr_destroy(sam_hdr_read(fp)); // Skip over header
468 
469         // Slurp in one of the split files
470         for (j = 0; j < c; ++j) {
471             if (sam_read1(fp, h, a[j].b) < 0) {
472                 fprintf(stderr, "Error reading '%s'\n", fnt[i]);
473                 goto fail;
474             }
475             a[j].key = hash_X31_Wang(bam_get_qname(a[j].b));
476         }
477         sam_close(fp);
478         unlink(fnt[i]);
479         free(fnt[i]);
480         fnt[i] = NULL;
481 
482         ks_introsort(bamshuf, c, a); // Shuffle all the reads
483 
484         // Write them out again
485         for (j = 0; j < c; ++j) {
486             if (sam_write1(fpw, h, a[j].b) < 0) {
487                 print_error_errno("collate", "Error writing to output");
488                 goto fail;
489             }
490         }
491     }
492 
493     sam_hdr_destroy(h);
494     for (j = 0; j < max_cnt; ++j) bam_destroy1(a[j].b);
495     free(a); free(fnt); free(cnt);
496     sam_global_args_free(ga);
497     if (sam_close(fpw) < 0) {
498         fprintf(stderr, "Error on closing output\n");
499         return 1;
500     }
501 
502     if (p.pool) hts_tpool_destroy(p.pool);
503     return 0;
504 
505  mem_fail:
506     fprintf(stderr, "Out of memory\n");
507 
508  fail:
509     if (fp) sam_close(fp);
510     if (fpw) sam_close(fpw);
511     if (h) sam_hdr_destroy(h);
512     for (i = 0; i < n_files; ++i) {
513         if (fnt) free(fnt[i]);
514         if (fpt && fpt[i]) sam_close(fpt[i]);
515     }
516     if (a) {
517         for (j = 0; j < max_cnt; ++j) bam_destroy1(a[j].b);
518         free(a);
519     }
520     free(fnt);
521     free(fpt);
522     free(cnt);
523     if (p.pool) hts_tpool_destroy(p.pool);
524     sam_global_args_free(ga);
525     return 1;
526 }
527 
usage(FILE * fp,int n_files,int reads_store)528 static int usage(FILE *fp, int n_files, int reads_store) {
529     fprintf(fp,
530             "Usage: samtools collate [-Ou] [-o <name>] [-n nFiles] [-l cLevel] <in.bam> [<prefix>]\n\n"
531             "Options:\n"
532             "      -O       output to stdout\n"
533             "      -o       output file name (use prefix if not set)\n"
534             "      -u       uncompressed BAM output\n"
535             "      -f       fast (only primary alignments)\n"
536             "      -r       working reads stored (with -f) [%d]\n" // reads_store
537             "      -l INT   compression level [%d]\n" // DEF_CLEVEL
538             "      -n INT   number of temporary files [%d]\n" // n_files
539             "      --no-PG  do not add a PG line\n",
540             reads_store, DEF_CLEVEL, n_files);
541 
542     sam_global_opt_help(fp, "-....@-.");
543     fprintf(fp,
544             "  <prefix> is required unless the -o or -O options are used.\n");
545 
546     return 1;
547 }
548 
generate_prefix()549 char * generate_prefix() {
550     char *prefix;
551     unsigned int pid = getpid();
552 #ifdef _WIN32
553 #  define PREFIX_LEN (MAX_PATH + 16)
554     DWORD ret;
555     prefix = calloc(PREFIX_LEN, sizeof(*prefix));
556     if (!prefix) {
557         perror("collate");
558         return NULL;
559     }
560     ret = GetTempPathA(MAX_PATH, prefix);
561     if (ret > MAX_PATH || ret == 0) {
562         fprintf(stderr,
563                 "[E::collate] Couldn't get path for temporary files.\n");
564         free(prefix);
565         return NULL;
566     }
567     snprintf(prefix + ret, PREFIX_LEN - ret, "\\%x", pid);
568     return prefix;
569 #else
570 #  define PREFIX_LEN 64
571     prefix = malloc(PREFIX_LEN);
572     if (!prefix) {
573         perror("collate");
574         return NULL;
575     }
576     snprintf(prefix, PREFIX_LEN, "/tmp/collate%x", pid);
577     return prefix;
578 #endif
579 }
580 
main_bamshuf(int argc,char * argv[])581 int main_bamshuf(int argc, char *argv[])
582 {
583     int c, n_files = 64, clevel = DEF_CLEVEL, is_stdout = 0, is_un = 0, fast_coll = 0, reads_store = 10000, ret, pre_mem = 0, no_pg = 0;
584     const char *output_file = NULL;
585     char *prefix = NULL, *arg_list = NULL;
586     sam_global_args ga = SAM_GLOBAL_ARGS_INIT;
587     static const struct option lopts[] = {
588         SAM_OPT_GLOBAL_OPTIONS('-', 0, 0, 0, 0, '@'),
589         {"no-PG", no_argument, NULL, 1},
590         { NULL, 0, NULL, 0 }
591     };
592 
593     while ((c = getopt_long(argc, argv, "n:l:uOo:@:fr:", lopts, NULL)) >= 0) {
594         switch (c) {
595         case 'n': n_files = atoi(optarg); break;
596         case 'l': clevel = atoi(optarg); break;
597         case 'u': is_un = 1; break;
598         case 'O': is_stdout = 1; break;
599         case 'o': output_file = optarg; break;
600         case 'f': fast_coll = 1; break;
601         case 'r': reads_store = atoi(optarg); break;
602         case 1: no_pg = 1; break;
603         default:  if (parse_sam_global_opt(c, optarg, lopts, &ga) == 0) break;
604                   /* else fall-through */
605         case '?': return usage(stderr, n_files, reads_store);
606         }
607     }
608     if (is_un) clevel = 0;
609     if (argc >= optind + 2) prefix = argv[optind+1];
610     if (!(prefix || is_stdout || output_file))
611         return usage(stderr, n_files, reads_store);
612     if (is_stdout && output_file) {
613         fprintf(stderr, "collate: -o and -O options cannot be used together.\n");
614         return usage(stderr, n_files, reads_store);
615     }
616     if (!prefix) {
617         prefix = generate_prefix();
618         pre_mem = 1;
619     }
620 
621     if (!prefix) return EXIT_FAILURE;
622 
623     if (!no_pg && !(arg_list = stringify_argv(argc+1, argv-1))) {
624         print_error("collate", "failed to create arg_list");
625         return 1;
626     }
627 
628     ret = bamshuf(argv[optind], n_files, prefix, clevel, is_stdout,
629                    output_file, fast_coll, reads_store, &ga, arg_list, no_pg);
630 
631     if (pre_mem) free(prefix);
632     free(arg_list);
633 
634     return ret;
635 }
636