1 /* vcfsort.c -- sort subcommand
2
3 Copyright (C) 2017-2021 Genome Research Ltd.
4
5 Author: Petr Danecek <pd3@sanger.ac.uk>
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
24
25 */
26
27 #include <stdio.h>
28 #include <unistd.h>
29 #include <getopt.h>
30 #include <ctype.h>
31 #include <string.h>
32 #include <strings.h>
33 #include <errno.h>
34 #include <sys/stat.h>
35 #include <sys/types.h>
36 #include <fcntl.h>
37 #include <math.h>
38 #ifdef _WIN32
39 #include <windows.h>
40 #endif
41 #include <htslib/vcf.h>
42 #include <htslib/kstring.h>
43 #include <htslib/hts_os.h>
44 #include "kheap.h"
45 #include "bcftools.h"
46
47 typedef struct
48 {
49 char *fname;
50 htsFile *fh;
51 bcf1_t *rec;
52 }
53 blk_t;
54
55 typedef struct _args_t
56 {
57 bcf_hdr_t *hdr;
58 char **argv, *fname, *output_fname, *tmp_dir;
59 int argc, output_type, clevel;
60 size_t max_mem, mem;
61 bcf1_t **buf;
62 uint8_t *mem_block;
63 size_t nbuf, mbuf, nblk;
64 blk_t *blk;
65 }
66 args_t;
67
clean_files(args_t * args)68 void clean_files(args_t *args)
69 {
70 int i;
71 fprintf(stderr,"Cleaning\n");
72 for (i=0; i<args->nblk; i++)
73 {
74 blk_t *blk = args->blk + i;
75 if ( blk->fname )
76 {
77 unlink(blk->fname);
78 free(blk->fname);
79 }
80 if ( blk->rec )
81 bcf_destroy(blk->rec);
82 }
83 rmdir(args->tmp_dir);
84 }
clean_files_and_throw(args_t * args,const char * format,...)85 void clean_files_and_throw(args_t *args, const char *format, ...)
86 {
87 va_list ap;
88 va_start(ap, format);
89 vfprintf(stderr, format, ap);
90 va_end(ap);
91 clean_files(args);
92 exit(-1);
93 }
94
cmp_bcf_pos(const void * aptr,const void * bptr)95 int cmp_bcf_pos(const void *aptr, const void *bptr)
96 {
97 bcf1_t *a = *((bcf1_t**)aptr);
98 bcf1_t *b = *((bcf1_t**)bptr);
99 if ( a->rid < b->rid ) return -1;
100 if ( a->rid > b->rid ) return 1;
101 if ( a->pos < b->pos ) return -1;
102 if ( a->pos > b->pos ) return 1;
103
104 // Sort the same chr:pos records lexicographically by ref,alt.
105 // This will be called rarely so should not slow the sorting down
106 // noticeably.
107
108 int i;
109 for (i=0; i<a->n_allele; i++)
110 {
111 if ( i >= b->n_allele ) return 1;
112 int ret = strcasecmp(a->d.allele[i],b->d.allele[i]);
113 if ( ret ) return ret;
114 }
115 if ( a->n_allele < b->n_allele ) return -1;
116 return 0;
117 }
118
buf_flush(args_t * args)119 void buf_flush(args_t *args)
120 {
121 if ( !args->nbuf ) return;
122
123 qsort(args->buf, args->nbuf, sizeof(*args->buf), cmp_bcf_pos);
124
125 args->nblk++;
126 args->blk = (blk_t*) realloc(args->blk, sizeof(blk_t)*args->nblk);
127 blk_t *blk = args->blk + args->nblk - 1;
128
129 kstring_t str = {0,0,0};
130 ksprintf(&str, "%s/%05d.bcf", args->tmp_dir, (int)args->nblk);
131 blk->fname = str.s;
132 blk->rec = NULL;
133 blk->fh = NULL;
134
135 htsFile *fh = hts_open(blk->fname, "wbu");
136 if ( fh == NULL ) clean_files_and_throw(args, "Cannot write %s: %s\n", blk->fname, strerror(errno));
137 if ( bcf_hdr_write(fh, args->hdr)!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,blk->fname);
138
139 int i;
140 for (i=0; i<args->nbuf; i++)
141 {
142 if ( bcf_write(fh, args->hdr, args->buf[i])!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,blk->fname);
143 }
144 if ( hts_close(fh)!=0 ) clean_files_and_throw(args, "[%s] Error: close failed .. %s\n", __func__,blk->fname);
145
146 args->nbuf = 0;
147 args->mem = 0;
148 }
149
150
_align_up(uint8_t * ptr)151 static inline uint8_t *_align_up(uint8_t *ptr)
152 {
153 return (uint8_t*)(((size_t)ptr + 8 - 1) & ~((size_t)(8 - 1)));
154 }
155
buf_push(args_t * args,bcf1_t * rec)156 void buf_push(args_t *args, bcf1_t *rec)
157 {
158 size_t delta = sizeof(bcf1_t) + rec->shared.l + rec->indiv.l + rec->unpack_size[0] + rec->unpack_size[1]
159 + sizeof(*rec->d.allele)*rec->d.m_allele
160 + sizeof(bcf1_t*) // args->buf
161 + 8; // the number of _align_up() calls
162
163 if ( delta > args->max_mem - args->mem )
164 {
165 args->nbuf++;
166 hts_expand(bcf1_t*, args->nbuf, args->mbuf, args->buf);
167 args->buf[args->nbuf-1] = rec;
168 buf_flush(args);
169 bcf_destroy(rec);
170 return;
171 }
172
173 // make sure nothing has changed in htslib
174 assert( rec->unpacked==BCF_UN_STR && !rec->d.flt && !rec->d.info && !rec->d.fmt && !rec->d.var );
175
176 uint8_t *ptr_beg = args->mem_block + args->mem;
177 uint8_t *ptr = _align_up(ptr_beg);
178 bcf1_t *new_rec = (bcf1_t*)ptr;
179 memcpy(new_rec,rec,sizeof(*rec));
180 ptr += sizeof(*rec);
181
182 // The array of allele pointers does not need alignment as bcf1_t is already padded to the biggest
183 // data type in the structure
184 char **allele = (char**)ptr;
185 ptr += rec->n_allele*sizeof(*allele);
186
187 // This is just to prevent valgrind from complaining about memcpy, unpack_size is a high-water mark
188 // and the end may be uninitialized
189 delta = rec->d.allele[rec->n_allele-1] - rec->d.allele[0];
190 while ( delta < rec->unpack_size[1] ) if ( !rec->d.als[delta++] ) break;
191 memcpy(ptr,rec->d.als,delta);
192 new_rec->d.als = (char*)ptr;
193 ptr = ptr + delta;
194
195 int i;
196 for (i=0; i<rec->n_allele; i++) allele[i] = new_rec->d.als + (ptrdiff_t)(rec->d.allele[i] - rec->d.allele[0]);
197 new_rec->d.allele = allele;
198
199 memcpy(ptr,rec->shared.s,rec->shared.l);
200 new_rec->shared.s = (char*)ptr;
201 new_rec->shared.m = rec->shared.l;
202 ptr += rec->shared.l;
203
204 memcpy(ptr,rec->indiv.s,rec->indiv.l);
205 new_rec->indiv.s = (char*)ptr;
206 new_rec->indiv.m = rec->indiv.l;
207 ptr += rec->indiv.l;
208
209 // This is just to prevent valgrind from complaining about memcpy, unpack_size is a high-water mark
210 // and the end may be uninitialized
211 i = 0;
212 while ( i < rec->unpack_size[0] ) if ( !rec->d.id[i++] ) break;
213 memcpy(ptr,rec->d.id,i);
214 new_rec->d.id = (char*)ptr;
215 ptr += i;
216
217 args->nbuf++;
218 hts_expand(bcf1_t*, args->nbuf, args->mbuf, args->buf);
219 args->buf[args->nbuf-1] = new_rec;
220
221 delta = ptr - ptr_beg;
222 args->mem += delta;
223
224 assert( args->mem <= args->max_mem );
225
226 bcf_destroy(rec);
227 }
228
sort_blocks(args_t * args)229 void sort_blocks(args_t *args)
230 {
231 htsFile *in = hts_open(args->fname, "r");
232 if ( !in ) clean_files_and_throw(args, "Could not read %s\n", args->fname);
233 args->hdr = bcf_hdr_read(in);
234 if ( !args->hdr) clean_files_and_throw(args, "Could not read VCF/BCF headers from %s\n", args->fname);
235
236 while ( 1 )
237 {
238 bcf1_t *rec = bcf_init();
239 int ret = bcf_read1(in, args->hdr, rec);
240 if ( ret < -1 ) clean_files_and_throw(args,"Error encountered while parsing the input\n");
241 if ( ret == -1 )
242 {
243 bcf_destroy(rec);
244 break;
245 }
246 if ( rec->errcode ) clean_files_and_throw(args,"Error encountered while parsing the input at %s:%d\n",bcf_seqname(args->hdr,rec),rec->pos+1);
247 bcf_unpack(rec, BCF_UN_STR);
248 buf_push(args, rec);
249 }
250 buf_flush(args);
251 free(args->buf);
252
253 if ( hts_close(in)!=0 ) clean_files_and_throw(args,"Close failed: %s\n", args->fname);
254 }
255
blk_is_smaller(blk_t ** aptr,blk_t ** bptr)256 static inline int blk_is_smaller(blk_t **aptr, blk_t **bptr)
257 {
258 blk_t *a = *aptr;
259 blk_t *b = *bptr;
260 int ret = cmp_bcf_pos(&a->rec, &b->rec);
261 if ( ret < 0 ) return 1;
262 return 0;
263 }
KHEAP_INIT(blk,blk_t *,blk_is_smaller)264 KHEAP_INIT(blk, blk_t*, blk_is_smaller)
265
266 void blk_read(args_t *args, khp_blk_t *bhp, bcf_hdr_t *hdr, blk_t *blk)
267 {
268 if ( !blk->fh ) return;
269 int ret = bcf_read(blk->fh, hdr, blk->rec);
270 if ( ret < -1 ) clean_files_and_throw(args, "Error reading %s\n", blk->fname);
271 if ( ret == -1 )
272 {
273 if ( hts_close(blk->fh)!=0 ) clean_files_and_throw(args, "Close failed: %s\n", blk->fname);
274 blk->fh = 0;
275 return;
276 }
277 bcf_unpack(blk->rec, BCF_UN_STR);
278 khp_insert(blk, bhp, &blk);
279 }
280
merge_blocks(args_t * args)281 void merge_blocks(args_t *args)
282 {
283 fprintf(stderr,"Merging %d temporary files\n", (int)args->nblk);
284 khp_blk_t *bhp = khp_init(blk);
285
286 int i;
287 for (i=0; i<args->nblk; i++)
288 {
289 blk_t *blk = args->blk + i;
290 blk->fh = hts_open(blk->fname, "r");
291 if ( !blk->fh ) clean_files_and_throw(args, "Could not read %s: %s\n", blk->fname, strerror(errno));
292 bcf_hdr_t *hdr = bcf_hdr_read(blk->fh);
293 bcf_hdr_destroy(hdr);
294 blk->rec = bcf_init();
295 blk_read(args, bhp, args->hdr, blk);
296 }
297
298 char wmode[8];
299 set_wmode(wmode,args->output_type,args->output_fname,args->clevel);
300 htsFile *out = hts_open(args->output_fname ? args->output_fname : "-", wmode);
301 if ( bcf_hdr_write(out, args->hdr)!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,args->output_fname);
302 while ( bhp->ndat )
303 {
304 blk_t *blk = bhp->dat[0];
305 if ( bcf_write(out, args->hdr, blk->rec)!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,args->output_fname);
306 khp_delete(blk, bhp);
307 blk_read(args, bhp, args->hdr, blk);
308 }
309 if ( hts_close(out)!=0 ) clean_files_and_throw(args, "Close failed: %s\n", args->output_fname);
310
311 clean_files(args);
312
313 free(args->blk);
314 khp_destroy(blk, bhp);
315 fprintf(stderr,"Done\n");
316 }
317
usage(args_t * args)318 static void usage(args_t *args)
319 {
320 fprintf(stderr, "\n");
321 fprintf(stderr, "About: Sort VCF/BCF file.\n");
322 fprintf(stderr, "Usage: bcftools sort [OPTIONS] <FILE.vcf>\n");
323 fprintf(stderr, "\n");
324 fprintf(stderr, "Options:\n");
325 fprintf(stderr, " -m, --max-mem FLOAT[kMG] maximum memory to use [768M]\n"); // using metric units, 1M=1e6
326 fprintf(stderr, " -o, --output FILE output file name [stdout]\n");
327 fprintf(stderr, " -O, --output-type b|u|z|v b: compressed BCF, u: uncompressed BCF, z: compressed VCF, v: uncompressed VCF [v]\n");
328 fprintf(stderr, " -O, --output-type u|b|v|z[0-9] u/b: un/compressed BCF, v/z: un/compressed VCF, 0-9: compression level [v]\n");
329
330 #ifdef _WIN32
331 fprintf(stderr, " -T, --temp-dir DIR temporary files [/bcftools.XXXXXX]\n");
332 #else
333 fprintf(stderr, " -T, --temp-dir DIR temporary files [/tmp/bcftools.XXXXXX]\n");
334 #endif
335 fprintf(stderr, "\n");
336 exit(1);
337 }
338
parse_mem_string(const char * str)339 size_t parse_mem_string(const char *str)
340 {
341 char *tmp;
342 double mem = strtod(str, &tmp);
343 if ( tmp==str ) error("Could not parse the memory string: \"%s\"\n", str);
344 if ( !strcasecmp("k",tmp) ) mem *= 1000;
345 else if ( !strcasecmp("m",tmp) ) mem *= 1000*1000;
346 else if ( !strcasecmp("g",tmp) ) mem *= 1000*1000*1000;
347 return mem;
348 }
349
350 void mkdir_p(const char *fmt, ...);
init(args_t * args)351 static void init(args_t *args)
352 {
353 args->max_mem *= 0.9;
354 args->mem_block = malloc(args->max_mem);
355 args->mem = 0;
356
357 args->tmp_dir = init_tmp_prefix(args->tmp_dir);
358
359 #ifdef _WIN32
360 int ret = mkdir(mktemp(args->tmp_dir), 0700);
361 if ( ret ) error("mkdir(%s) failed: %s\n", args->tmp_dir,strerror(errno));
362 #else
363 char *tmp = mkdtemp(args->tmp_dir);
364 if ( !tmp ) error("mkdtemp(%s) failed: %s\n", args->tmp_dir,strerror(errno));
365 int ret = chmod(tmp, S_IRUSR|S_IWUSR|S_IXUSR);
366 if ( ret ) error("chmod(%s,S_IRUSR|S_IWUSR|S_IXUSR) failed: %s\n", args->tmp_dir,strerror(errno));
367 #endif
368
369 fprintf(stderr,"Writing to %s\n", args->tmp_dir);
370 }
destroy(args_t * args)371 static void destroy(args_t *args)
372 {
373 bcf_hdr_destroy(args->hdr);
374 free(args->mem_block);
375 free(args->tmp_dir);
376 free(args);
377 }
378
main_sort(int argc,char * argv[])379 int main_sort(int argc, char *argv[])
380 {
381 int c;
382 args_t *args = (args_t*) calloc(1,sizeof(args_t));
383 args->argc = argc; args->argv = argv;
384 args->max_mem = 768*1000*1000;
385 args->output_fname = "-";
386 args->clevel = -1;
387
388 static struct option loptions[] =
389 {
390 {"max-mem",required_argument,NULL,'m'},
391 {"temp-dir",required_argument,NULL,'T'},
392 {"output-type",required_argument,NULL,'O'},
393 {"output-file",required_argument,NULL,'o'},
394 {"output",required_argument,NULL,'o'},
395 {"help",no_argument,NULL,'h'},
396 {0,0,0,0}
397 };
398 char *tmp;
399 while ((c = getopt_long(argc, argv, "m:T:O:o:h?",loptions,NULL)) >= 0)
400 {
401 switch (c)
402 {
403 case 'm': args->max_mem = parse_mem_string(optarg); break;
404 case 'T': args->tmp_dir = optarg; break;
405 case 'o': args->output_fname = optarg; break;
406 case 'O':
407 switch (optarg[0]) {
408 case 'b': args->output_type = FT_BCF_GZ; break;
409 case 'u': args->output_type = FT_BCF; break;
410 case 'z': args->output_type = FT_VCF_GZ; break;
411 case 'v': args->output_type = FT_VCF; break;
412 default:
413 {
414 args->clevel = strtol(optarg,&tmp,10);
415 if ( *tmp || args->clevel<0 || args->clevel>9 ) error("The output type \"%s\" not recognised\n", optarg);
416 }
417 };
418 if ( optarg[1] )
419 {
420 args->clevel = strtol(optarg+1,&tmp,10);
421 if ( *tmp || args->clevel<0 || args->clevel>9 ) error("Could not parse argument: --compression-level %s\n", optarg+1);
422 }
423 break;
424 case 'h':
425 case '?': usage(args); break;
426 default: error("Unknown argument: %s\n", optarg);
427 }
428 }
429
430 if ( optind>=argc )
431 {
432 if ( !isatty(fileno((FILE *)stdin)) ) args->fname = "-"; // reading from stdin
433 else usage(args);
434 }
435 else args->fname = argv[optind];
436
437 init(args);
438 sort_blocks(args);
439 merge_blocks(args);
440 destroy(args);
441
442 return 0;
443 }
444