1 /* bamshuf.c -- collate subcommand.
2
3 Copyright (C) 2012 Broad Institute.
4 Copyright (C) 2013, 2015-2019 Genome Research Ltd.
5
6 Author: Heng Li <lh3@sanger.ac.uk>
7
8 Permission is hereby granted, free of charge, to any person obtaining a copy
9 of this software and associated documentation files (the "Software"), to deal
10 in the Software without restriction, including without limitation the rights
11 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 copies of the Software, and to permit persons to whom the Software is
13 furnished to do so, subject to the following conditions:
14
15 The above copyright notice and this permission notice shall be included in
16 all copies or substantial portions of the Software.
17
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
21 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 DEALINGS IN THE SOFTWARE. */
25
26 #include <config.h>
27
28 #include <unistd.h>
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <assert.h>
33 #include <errno.h>
34 #ifdef _WIN32
35 # define WIN32_LEAN_AND_MEAN
36 # include <windows.h>
37 #endif
38 #include "htslib/sam.h"
39 #include "htslib/hts.h"
40 #include "htslib/ksort.h"
41 #include "samtools.h"
42 #include "htslib/thread_pool.h"
43 #include "sam_opts.h"
44 #include "htslib/khash.h"
45
46 #define DEF_CLEVEL 1
47
hash_Wang(unsigned key)48 static inline unsigned hash_Wang(unsigned key)
49 {
50 key += ~(key << 15);
51 key ^= (key >> 10);
52 key += (key << 3);
53 key ^= (key >> 6);
54 key += ~(key << 11);
55 key ^= (key >> 16);
56 return key;
57 }
58
hash_X31_Wang(const char * s)59 static inline unsigned hash_X31_Wang(const char *s)
60 {
61 unsigned h = *s;
62 if (h) {
63 for (++s ; *s; ++s) h = (h << 5) - h + *s;
64 return hash_Wang(h);
65 } else return 0;
66 }
67
68 typedef struct {
69 unsigned key;
70 bam1_t *b;
71 } elem_t;
72
elem_lt(elem_t x,elem_t y)73 static inline int elem_lt(elem_t x, elem_t y)
74 {
75 if (x.key < y.key) return 1;
76 if (x.key == y.key) {
77 int t;
78 t = strcmp(bam_get_qname(x.b), bam_get_qname(y.b));
79 if (t < 0) return 1;
80 return (t == 0 && ((x.b->core.flag>>6&3) < (y.b->core.flag>>6&3)));
81 } else return 0;
82 }
83
84 KSORT_INIT(bamshuf, elem_t, elem_lt)
85
86
87 typedef struct {
88 int written;
89 bam1_t *b;
90 } bam_item_t;
91
92 typedef struct {
93 bam1_t *bam_pool;
94 bam_item_t *items;
95 size_t size;
96 size_t index;
97 } bam_list_t;
98
99 typedef struct {
100 bam_item_t *bi;
101 } store_item_t;
102
KHASH_MAP_INIT_STR(bam_store,store_item_t)103 KHASH_MAP_INIT_STR(bam_store, store_item_t)
104
105
106 static bam_item_t *store_bam(bam_list_t *list) {
107 size_t old_index = list->index;
108
109 list->items[list->index++].written = 0;
110
111 if (list->index >= list->size)
112 list->index = 0;
113
114 return &list->items[old_index];
115 }
116
117
write_bam_needed(bam_list_t * list)118 static int write_bam_needed(bam_list_t *list) {
119 return !list->items[list->index].written;
120 }
121
122
mark_bam_as_written(bam_list_t * list)123 static void mark_bam_as_written(bam_list_t *list) {
124 list->items[list->index].written = 1;
125 }
126
127
create_bam_list(bam_list_t * list,size_t max_size)128 static int create_bam_list(bam_list_t *list, size_t max_size) {
129 size_t i;
130
131 list->size = list->index = 0;
132 list->items = NULL;
133 list->bam_pool = NULL;
134
135 if ((list->items = malloc(max_size * sizeof(bam_item_t))) == NULL) {
136 return 1;
137 }
138
139 if ((list->bam_pool = calloc(max_size, sizeof(bam1_t))) == NULL) {
140 return 1;
141 }
142
143 for (i = 0; i < max_size; i++) {
144 list->items[i].b = &list->bam_pool[i];
145 list->items[i].written = 1;
146 }
147
148 list->size = max_size;
149 list->index = 0;
150
151 return 0;
152 }
153
154
destroy_bam_list(bam_list_t * list)155 static void destroy_bam_list(bam_list_t *list) {
156 size_t i;
157
158 for (i = 0; i < list->size; i++) {
159 free(list->bam_pool[i].data);
160 }
161
162 free(list->bam_pool);
163 free(list->items);
164 }
165
166
write_to_bin_file(bam1_t * bam,int64_t * count,samFile ** bin_files,char ** names,sam_hdr_t * header,int files)167 static inline int write_to_bin_file(bam1_t *bam, int64_t *count, samFile **bin_files, char **names, sam_hdr_t *header, int files) {
168 uint32_t x;
169
170 x = hash_X31_Wang(bam_get_qname(bam)) % files;
171
172 if (sam_write1(bin_files[x], header, bam) < 0) {
173 print_error_errno("collate", "Couldn't write to intermediate file \"%s\"", names[x]);
174 return 1;
175 }
176
177 ++count[x];
178
179 return 0;
180 }
181
182
bamshuf(const char * fn,int n_files,const char * pre,int clevel,int is_stdout,const char * output_file,int fast,int store_max,sam_global_args * ga,char * arg_list,int no_pg)183 static int bamshuf(const char *fn, int n_files, const char *pre, int clevel,
184 int is_stdout, const char *output_file, int fast, int store_max, sam_global_args *ga, char *arg_list, int no_pg)
185 {
186 samFile *fp, *fpw = NULL, **fpt = NULL;
187 char **fnt = NULL, modew[8];
188 bam1_t *b = NULL;
189 int i, counter, l, r;
190 sam_hdr_t *h = NULL;
191 int64_t j, max_cnt = 0, *cnt = NULL;
192 elem_t *a = NULL;
193 htsThreadPool p = {NULL, 0};
194
195 if (ga->nthreads > 0) {
196 if (!(p.pool = hts_tpool_init(ga->nthreads))) {
197 print_error_errno("collate", "Error creating thread pool\n");
198 return 1;
199 }
200 }
201
202 // Read input, distribute reads pseudo-randomly into n_files temporary
203 // files.
204 fp = sam_open_format(fn, "r", &ga->in);
205 if (fp == NULL) {
206 print_error_errno("collate", "Cannot open input file \"%s\"", fn);
207 return 1;
208 }
209 if (p.pool) hts_set_opt(fp, HTS_OPT_THREAD_POOL, &p);
210
211 h = sam_hdr_read(fp);
212 if (h == NULL) {
213 fprintf(stderr, "Couldn't read header for '%s'\n", fn);
214 goto fail;
215 }
216
217 if ((-1 == sam_hdr_update_hd(h, "SO", "unsorted", "GO", "query"))
218 && (-1 == sam_hdr_add_line(h, "HD", "VN", SAM_FORMAT_VERSION, "SO", "unsorted", "GO", "query", NULL))
219 ) {
220 print_error("collate", "failed to update HD line\n");
221 goto fail;
222 }
223
224 // open final output file
225 l = strlen(pre);
226
227 sprintf(modew, "wb%d", (clevel >= 0 && clevel <= 9)? clevel : DEF_CLEVEL);
228
229 if (!is_stdout && !output_file) { // output to a file (name based on prefix)
230 char *fnw = (char*)calloc(l + 5, 1);
231 if (!fnw) goto mem_fail;
232 if (ga->out.format == unknown_format)
233 sprintf(fnw, "%s.bam", pre); // "wb" above makes BAM the default
234 else
235 sprintf(fnw, "%s.%s", pre, hts_format_file_extension(&ga->out));
236 fpw = sam_open_format(fnw, modew, &ga->out);
237 free(fnw);
238 } else if (output_file) { // output to a given file
239 modew[0] = 'w'; modew[1] = '\0';
240 sam_open_mode(modew + 1, output_file, NULL);
241 j = strlen(modew);
242 snprintf(modew + j, sizeof(modew) - j, "%d",
243 (clevel >= 0 && clevel <= 9)? clevel : DEF_CLEVEL);
244 fpw = sam_open_format(output_file, modew, &ga->out);
245 } else fpw = sam_open_format("-", modew, &ga->out); // output to stdout
246 if (fpw == NULL) {
247 if (is_stdout) print_error_errno("collate", "Cannot open standard output");
248 else print_error_errno("collate", "Cannot open output file \"%s.bam\"", pre);
249 goto fail;
250 }
251 if (p.pool) hts_set_opt(fpw, HTS_OPT_THREAD_POOL, &p);
252
253 if (!no_pg && sam_hdr_add_pg(h, "samtools",
254 "VN", samtools_version(),
255 arg_list ? "CL": NULL,
256 arg_list ? arg_list : NULL,
257 NULL)) {
258 print_error("collate", "failed to add PG line to header of \"%s\"", output_file);
259 goto fail;
260 }
261
262 if (sam_hdr_write(fpw, h) < 0) {
263 print_error_errno("collate", "Couldn't write header");
264 goto fail;
265 }
266
267 fnt = (char**)calloc(n_files, sizeof(char*));
268 if (!fnt) goto mem_fail;
269 fpt = (samFile**)calloc(n_files, sizeof(samFile*));
270 if (!fpt) goto mem_fail;
271 cnt = (int64_t*)calloc(n_files, 8);
272 if (!cnt) goto mem_fail;
273
274 for (i = counter = 0; i < n_files; ++i) {
275 fnt[i] = (char*)calloc(l + 20, 1);
276 if (!fnt[i]) goto mem_fail;
277 do {
278 sprintf(fnt[i], "%s.%04d.bam", pre, counter++);
279 fpt[i] = sam_open(fnt[i], "wxb1");
280 } while (!fpt[i] && errno == EEXIST);
281 if (fpt[i] == NULL) {
282 print_error_errno("collate", "Cannot open intermediate file \"%s\"", fnt[i]);
283 goto fail;
284 }
285 if (p.pool) hts_set_opt(fpt[i], HTS_OPT_THREAD_POOL, &p);
286 if (sam_hdr_write(fpt[i], h) < 0) {
287 print_error_errno("collate", "Couldn't write header to intermediate file \"%s\"", fnt[i]);
288 goto fail;
289 }
290 }
291
292 if (fast) {
293 khash_t(bam_store) *stored = kh_init(bam_store);
294 khiter_t itr;
295 bam_list_t list;
296 int err = 0;
297 if (!stored) goto mem_fail;
298
299 if (store_max < 2) store_max = 2;
300
301 if (create_bam_list(&list, store_max)) {
302 fprintf(stderr, "[collate[ ERROR: unable to create bam list.\n");
303 err = 1;
304 goto fast_fail;
305 }
306
307 while ((r = sam_read1(fp, h, list.items[list.index].b)) >= 0) {
308 int ret;
309 bam1_t *b = list.items[list.index].b;
310 int readflag = b->core.flag & (BAM_FREAD1 | BAM_FREAD2);
311
312 // strictly paired reads only
313 if (!(b->core.flag & (BAM_FSECONDARY | BAM_FSUPPLEMENTARY)) && (readflag == BAM_FREAD1 || readflag == BAM_FREAD2)) {
314
315 itr = kh_get(bam_store, stored, bam_get_qname(b));
316
317 if (itr == kh_end(stored)) {
318 // new read
319 itr = kh_put(bam_store, stored, bam_get_qname(b), &ret);
320
321 if (ret > 0) { // okay to go ahead store it
322 kh_value(stored, itr).bi = store_bam(&list);
323
324 // see if the next one on the list needs to be written out
325 if (write_bam_needed(&list)) {
326 if (write_to_bin_file(list.items[list.index].b, cnt, fpt, fnt, h, n_files) < 0) {
327 fprintf(stderr, "[collate] ERROR: could not write line.\n");
328 err = 1;
329 goto fast_fail;
330 } else {
331 mark_bam_as_written(&list);
332
333 itr = kh_get(bam_store, stored, bam_get_qname(list.items[list.index].b));
334
335 if (itr != kh_end(stored)) {
336 kh_del(bam_store, stored, itr);
337 } else {
338 fprintf(stderr, "[collate] ERROR: stored value not in hash.\n");
339 err = 1;
340 goto fast_fail;
341 }
342 }
343 }
344 } else if (ret == 0) {
345 fprintf(stderr, "[collate] ERROR: value already in hash.\n");
346 err = 1;
347 goto fast_fail;
348 } else {
349 fprintf(stderr, "[collate] ERROR: unable to store in hash.\n");
350 err = 1;
351 goto fast_fail;
352 }
353 } else { // we have a match
354 // write out the reads in R1 R2 order
355 bam1_t *r1, *r2;
356
357 if (b->core.flag & BAM_FREAD1) {
358 r1 = b;
359 r2 = kh_value(stored, itr).bi->b;
360 } else {
361 r1 = kh_value(stored, itr).bi->b;
362 r2 = b;
363 }
364
365 if (sam_write1(fpw, h, r1) < 0) {
366 fprintf(stderr, "[collate] ERROR: could not write r1 alignment.\n");
367 err = 1;
368 goto fast_fail;
369 }
370
371 if (sam_write1(fpw, h, r2) < 0) {
372 fprintf(stderr, "[collate] ERROR: could not write r2 alignment.\n");
373 err = 1;
374 goto fast_fail;
375 }
376
377 mark_bam_as_written(&list);
378
379 // remove stored read
380 kh_value(stored, itr).bi->written = 1;
381 kh_del(bam_store, stored, itr);
382 }
383 }
384 }
385
386 for (list.index = 0; list.index < list.size; list.index++) {
387 if (write_bam_needed(&list)) {
388 bam1_t *b = list.items[list.index].b;
389
390 if (write_to_bin_file(b, cnt, fpt, fnt, h, n_files)) {
391 err = 1;
392 goto fast_fail;
393 } else {
394 itr = kh_get(bam_store, stored, bam_get_qname(b));
395 kh_del(bam_store, stored, itr);
396 }
397 }
398 }
399
400 fast_fail:
401 if (err) {
402 for (itr = kh_begin(stored); itr != kh_end(stored); ++itr) {
403 if (kh_exist(stored, itr)) {
404 kh_del(bam_store, stored, itr);
405 }
406 }
407
408 kh_destroy(bam_store, stored);
409 destroy_bam_list(&list);
410 goto fail;
411 } else {
412 kh_destroy(bam_store, stored);
413 destroy_bam_list(&list);
414 }
415
416 } else {
417 b = bam_init1();
418 if (!b) goto mem_fail;
419
420 while ((r = sam_read1(fp, h, b)) >= 0) {
421 if (write_to_bin_file(b, cnt, fpt, fnt, h, n_files)) {
422 bam_destroy1(b);
423 goto fail;
424 }
425 }
426
427 bam_destroy1(b);
428 }
429
430 if (r < -1) {
431 fprintf(stderr, "Error reading input file\n");
432 goto fail;
433 }
434 for (i = 0; i < n_files; ++i) {
435 // Close split output
436 r = sam_close(fpt[i]);
437 fpt[i] = NULL;
438 if (r < 0) {
439 fprintf(stderr, "Error on closing '%s'\n", fnt[i]);
440 return 1;
441 }
442
443 // Find biggest count
444 if (max_cnt < cnt[i]) max_cnt = cnt[i];
445 }
446 free(fpt);
447 fpt = NULL;
448 sam_close(fp);
449 fp = NULL;
450
451 // merge
452 a = malloc(max_cnt * sizeof(elem_t));
453 if (!a) goto mem_fail;
454 for (j = 0; j < max_cnt; ++j) {
455 a[j].b = bam_init1();
456 if (!a[j].b) { max_cnt = j; goto mem_fail; }
457 }
458
459 for (i = 0; i < n_files; ++i) {
460 int64_t c = cnt[i];
461 fp = sam_open_format(fnt[i], "r", &ga->in);
462 if (NULL == fp) {
463 print_error_errno("collate", "Couldn't open \"%s\"", fnt[i]);
464 goto fail;
465 }
466 if (p.pool) hts_set_opt(fp, HTS_OPT_THREAD_POOL, &p);
467 sam_hdr_destroy(sam_hdr_read(fp)); // Skip over header
468
469 // Slurp in one of the split files
470 for (j = 0; j < c; ++j) {
471 if (sam_read1(fp, h, a[j].b) < 0) {
472 fprintf(stderr, "Error reading '%s'\n", fnt[i]);
473 goto fail;
474 }
475 a[j].key = hash_X31_Wang(bam_get_qname(a[j].b));
476 }
477 sam_close(fp);
478 unlink(fnt[i]);
479 free(fnt[i]);
480 fnt[i] = NULL;
481
482 ks_introsort(bamshuf, c, a); // Shuffle all the reads
483
484 // Write them out again
485 for (j = 0; j < c; ++j) {
486 if (sam_write1(fpw, h, a[j].b) < 0) {
487 print_error_errno("collate", "Error writing to output");
488 goto fail;
489 }
490 }
491 }
492
493 sam_hdr_destroy(h);
494 for (j = 0; j < max_cnt; ++j) bam_destroy1(a[j].b);
495 free(a); free(fnt); free(cnt);
496 sam_global_args_free(ga);
497 if (sam_close(fpw) < 0) {
498 fprintf(stderr, "Error on closing output\n");
499 return 1;
500 }
501
502 if (p.pool) hts_tpool_destroy(p.pool);
503 return 0;
504
505 mem_fail:
506 fprintf(stderr, "Out of memory\n");
507
508 fail:
509 if (fp) sam_close(fp);
510 if (fpw) sam_close(fpw);
511 if (h) sam_hdr_destroy(h);
512 for (i = 0; i < n_files; ++i) {
513 if (fnt) free(fnt[i]);
514 if (fpt && fpt[i]) sam_close(fpt[i]);
515 }
516 if (a) {
517 for (j = 0; j < max_cnt; ++j) bam_destroy1(a[j].b);
518 free(a);
519 }
520 free(fnt);
521 free(fpt);
522 free(cnt);
523 if (p.pool) hts_tpool_destroy(p.pool);
524 sam_global_args_free(ga);
525 return 1;
526 }
527
usage(FILE * fp,int n_files,int reads_store)528 static int usage(FILE *fp, int n_files, int reads_store) {
529 fprintf(fp,
530 "Usage: samtools collate [-Ou] [-o <name>] [-n nFiles] [-l cLevel] <in.bam> [<prefix>]\n\n"
531 "Options:\n"
532 " -O output to stdout\n"
533 " -o output file name (use prefix if not set)\n"
534 " -u uncompressed BAM output\n"
535 " -f fast (only primary alignments)\n"
536 " -r working reads stored (with -f) [%d]\n" // reads_store
537 " -l INT compression level [%d]\n" // DEF_CLEVEL
538 " -n INT number of temporary files [%d]\n" // n_files
539 " --no-PG do not add a PG line\n",
540 reads_store, DEF_CLEVEL, n_files);
541
542 sam_global_opt_help(fp, "-....@-.");
543 fprintf(fp,
544 " <prefix> is required unless the -o or -O options are used.\n");
545
546 return 1;
547 }
548
generate_prefix()549 char * generate_prefix() {
550 char *prefix;
551 unsigned int pid = getpid();
552 #ifdef _WIN32
553 # define PREFIX_LEN (MAX_PATH + 16)
554 DWORD ret;
555 prefix = calloc(PREFIX_LEN, sizeof(*prefix));
556 if (!prefix) {
557 perror("collate");
558 return NULL;
559 }
560 ret = GetTempPathA(MAX_PATH, prefix);
561 if (ret > MAX_PATH || ret == 0) {
562 fprintf(stderr,
563 "[E::collate] Couldn't get path for temporary files.\n");
564 free(prefix);
565 return NULL;
566 }
567 snprintf(prefix + ret, PREFIX_LEN - ret, "\\%x", pid);
568 return prefix;
569 #else
570 # define PREFIX_LEN 64
571 prefix = malloc(PREFIX_LEN);
572 if (!prefix) {
573 perror("collate");
574 return NULL;
575 }
576 snprintf(prefix, PREFIX_LEN, "/tmp/collate%x", pid);
577 return prefix;
578 #endif
579 }
580
main_bamshuf(int argc,char * argv[])581 int main_bamshuf(int argc, char *argv[])
582 {
583 int c, n_files = 64, clevel = DEF_CLEVEL, is_stdout = 0, is_un = 0, fast_coll = 0, reads_store = 10000, ret, pre_mem = 0, no_pg = 0;
584 const char *output_file = NULL;
585 char *prefix = NULL, *arg_list = NULL;
586 sam_global_args ga = SAM_GLOBAL_ARGS_INIT;
587 static const struct option lopts[] = {
588 SAM_OPT_GLOBAL_OPTIONS('-', 0, 0, 0, 0, '@'),
589 {"no-PG", no_argument, NULL, 1},
590 { NULL, 0, NULL, 0 }
591 };
592
593 while ((c = getopt_long(argc, argv, "n:l:uOo:@:fr:", lopts, NULL)) >= 0) {
594 switch (c) {
595 case 'n': n_files = atoi(optarg); break;
596 case 'l': clevel = atoi(optarg); break;
597 case 'u': is_un = 1; break;
598 case 'O': is_stdout = 1; break;
599 case 'o': output_file = optarg; break;
600 case 'f': fast_coll = 1; break;
601 case 'r': reads_store = atoi(optarg); break;
602 case 1: no_pg = 1; break;
603 default: if (parse_sam_global_opt(c, optarg, lopts, &ga) == 0) break;
604 /* else fall-through */
605 case '?': return usage(stderr, n_files, reads_store);
606 }
607 }
608 if (is_un) clevel = 0;
609 if (argc >= optind + 2) prefix = argv[optind+1];
610 if (!(prefix || is_stdout || output_file))
611 return usage(stderr, n_files, reads_store);
612 if (is_stdout && output_file) {
613 fprintf(stderr, "collate: -o and -O options cannot be used together.\n");
614 return usage(stderr, n_files, reads_store);
615 }
616 if (!prefix) {
617 prefix = generate_prefix();
618 pre_mem = 1;
619 }
620
621 if (!prefix) return EXIT_FAILURE;
622
623 if (!no_pg && !(arg_list = stringify_argv(argc+1, argv-1))) {
624 print_error("collate", "failed to create arg_list");
625 return 1;
626 }
627
628 ret = bamshuf(argv[optind], n_files, prefix, clevel, is_stdout,
629 output_file, fast_coll, reads_store, &ga, arg_list, no_pg);
630
631 if (pre_mem) free(prefix);
632 free(arg_list);
633
634 return ret;
635 }
636