1 /*  vcfsort.c -- sort subcommand
2 
3    Copyright (C) 2017-2021 Genome Research Ltd.
4 
5    Author: Petr Danecek <pd3@sanger.ac.uk>
6 
7    Permission is hereby granted, free of charge, to any person obtaining a copy
8    of this software and associated documentation files (the "Software"), to deal
9    in the Software without restriction, including without limitation the rights
10    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11    copies of the Software, and to permit persons to whom the Software is
12    furnished to do so, subject to the following conditions:
13 
14    The above copyright notice and this permission notice shall be included in
15    all copies or substantial portions of the Software.
16 
17    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23    THE SOFTWARE.
24 
25  */
26 
27 #include <stdio.h>
28 #include <unistd.h>
29 #include <getopt.h>
30 #include <ctype.h>
31 #include <string.h>
32 #include <strings.h>
33 #include <errno.h>
34 #include <sys/stat.h>
35 #include <sys/types.h>
36 #include <fcntl.h>
37 #include <math.h>
38 #ifdef _WIN32
39 #include <windows.h>
40 #endif
41 #include <htslib/vcf.h>
42 #include <htslib/kstring.h>
43 #include <htslib/hts_os.h>
44 #include "kheap.h"
45 #include "bcftools.h"
46 
47 typedef struct
48 {
49     char *fname;
50     htsFile *fh;
51     bcf1_t *rec;
52 }
53 blk_t;
54 
55 typedef struct _args_t
56 {
57     bcf_hdr_t *hdr;
58     char **argv, *fname, *output_fname, *tmp_dir;
59     int argc, output_type, clevel;
60     size_t max_mem, mem;
61     bcf1_t **buf;
62     uint8_t *mem_block;
63     size_t nbuf, mbuf, nblk;
64     blk_t *blk;
65 }
66 args_t;
67 
clean_files(args_t * args)68 void clean_files(args_t *args)
69 {
70     int i;
71     fprintf(stderr,"Cleaning\n");
72     for (i=0; i<args->nblk; i++)
73     {
74         blk_t *blk = args->blk + i;
75         if ( blk->fname )
76         {
77             unlink(blk->fname);
78             free(blk->fname);
79         }
80         if ( blk->rec )
81             bcf_destroy(blk->rec);
82     }
83     rmdir(args->tmp_dir);
84 }
clean_files_and_throw(args_t * args,const char * format,...)85 void clean_files_and_throw(args_t *args, const char *format, ...)
86 {
87     va_list ap;
88     va_start(ap, format);
89     vfprintf(stderr, format, ap);
90     va_end(ap);
91     clean_files(args);
92     exit(-1);
93 }
94 
cmp_bcf_pos(const void * aptr,const void * bptr)95 int cmp_bcf_pos(const void *aptr, const void *bptr)
96 {
97     bcf1_t *a = *((bcf1_t**)aptr);
98     bcf1_t *b = *((bcf1_t**)bptr);
99     if ( a->rid < b->rid ) return -1;
100     if ( a->rid > b->rid ) return 1;
101     if ( a->pos < b->pos ) return -1;
102     if ( a->pos > b->pos ) return 1;
103 
104     // Sort the same chr:pos records lexicographically by ref,alt.
105     // This will be called rarely so should not slow the sorting down
106     // noticeably.
107 
108     int i;
109     for (i=0; i<a->n_allele; i++)
110     {
111         if ( i >= b->n_allele ) return 1;
112         int ret = strcasecmp(a->d.allele[i],b->d.allele[i]);
113         if ( ret ) return ret;
114     }
115     if ( a->n_allele < b->n_allele ) return -1;
116     return 0;
117 }
118 
buf_flush(args_t * args)119 void buf_flush(args_t *args)
120 {
121     if ( !args->nbuf ) return;
122 
123     qsort(args->buf, args->nbuf, sizeof(*args->buf), cmp_bcf_pos);
124 
125     args->nblk++;
126     args->blk = (blk_t*) realloc(args->blk, sizeof(blk_t)*args->nblk);
127     blk_t *blk = args->blk + args->nblk - 1;
128 
129     kstring_t str = {0,0,0};
130     ksprintf(&str, "%s/%05d.bcf", args->tmp_dir, (int)args->nblk);
131     blk->fname = str.s;
132     blk->rec   = NULL;
133     blk->fh    = NULL;
134 
135     htsFile *fh = hts_open(blk->fname, "wbu");
136     if ( fh == NULL ) clean_files_and_throw(args, "Cannot write %s: %s\n", blk->fname, strerror(errno));
137     if ( bcf_hdr_write(fh, args->hdr)!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,blk->fname);
138 
139     int i;
140     for (i=0; i<args->nbuf; i++)
141     {
142         if ( bcf_write(fh, args->hdr, args->buf[i])!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,blk->fname);
143     }
144     if ( hts_close(fh)!=0 ) clean_files_and_throw(args, "[%s] Error: close failed .. %s\n", __func__,blk->fname);
145 
146     args->nbuf = 0;
147     args->mem  = 0;
148 }
149 
150 
_align_up(uint8_t * ptr)151 static inline uint8_t *_align_up(uint8_t *ptr)
152 {
153     return (uint8_t*)(((size_t)ptr + 8 - 1) & ~((size_t)(8 - 1)));
154 }
155 
buf_push(args_t * args,bcf1_t * rec)156 void buf_push(args_t *args, bcf1_t *rec)
157 {
158     size_t delta = sizeof(bcf1_t) + rec->shared.l + rec->indiv.l + rec->unpack_size[0] + rec->unpack_size[1]
159         + sizeof(*rec->d.allele)*rec->d.m_allele
160         + sizeof(bcf1_t*)       // args->buf
161         + 8;                    // the number of _align_up() calls
162 
163     if ( delta > args->max_mem - args->mem )
164     {
165         args->nbuf++;
166         hts_expand(bcf1_t*, args->nbuf, args->mbuf, args->buf);
167         args->buf[args->nbuf-1] = rec;
168         buf_flush(args);
169         bcf_destroy(rec);
170         return;
171     }
172 
173     // make sure nothing has changed in htslib
174     assert( rec->unpacked==BCF_UN_STR && !rec->d.flt && !rec->d.info && !rec->d.fmt && !rec->d.var );
175 
176     uint8_t *ptr_beg = args->mem_block + args->mem;
177     uint8_t *ptr = _align_up(ptr_beg);
178     bcf1_t *new_rec = (bcf1_t*)ptr;
179     memcpy(new_rec,rec,sizeof(*rec));
180     ptr += sizeof(*rec);
181 
182     // The array of allele pointers does not need alignment as bcf1_t is already padded to the biggest
183     // data type in the structure
184     char **allele = (char**)ptr;
185     ptr += rec->n_allele*sizeof(*allele);
186 
187     // This is just to prevent valgrind from complaining about memcpy, unpack_size is a high-water mark
188     // and the end may be uninitialized
189     delta = rec->d.allele[rec->n_allele-1] - rec->d.allele[0];
190     while ( delta < rec->unpack_size[1] ) if ( !rec->d.als[delta++] ) break;
191     memcpy(ptr,rec->d.als,delta);
192     new_rec->d.als = (char*)ptr;
193     ptr = ptr + delta;
194 
195     int i;
196     for (i=0; i<rec->n_allele; i++) allele[i] = new_rec->d.als + (ptrdiff_t)(rec->d.allele[i] - rec->d.allele[0]);
197     new_rec->d.allele = allele;
198 
199     memcpy(ptr,rec->shared.s,rec->shared.l);
200     new_rec->shared.s = (char*)ptr;
201     new_rec->shared.m = rec->shared.l;
202     ptr += rec->shared.l;
203 
204     memcpy(ptr,rec->indiv.s,rec->indiv.l);
205     new_rec->indiv.s = (char*)ptr;
206     new_rec->indiv.m = rec->indiv.l;
207     ptr += rec->indiv.l;
208 
209     // This is just to prevent valgrind from complaining about memcpy, unpack_size is a high-water mark
210     // and the end may be uninitialized
211     i = 0;
212     while ( i < rec->unpack_size[0] ) if ( !rec->d.id[i++] ) break;
213     memcpy(ptr,rec->d.id,i);
214     new_rec->d.id = (char*)ptr;
215     ptr += i;
216 
217     args->nbuf++;
218     hts_expand(bcf1_t*, args->nbuf, args->mbuf, args->buf);
219     args->buf[args->nbuf-1] = new_rec;
220 
221     delta = ptr - ptr_beg;
222     args->mem += delta;
223 
224     assert( args->mem <= args->max_mem );
225 
226     bcf_destroy(rec);
227 }
228 
sort_blocks(args_t * args)229 void sort_blocks(args_t *args)
230 {
231     htsFile *in = hts_open(args->fname, "r");
232     if ( !in ) clean_files_and_throw(args, "Could not read %s\n", args->fname);
233     args->hdr = bcf_hdr_read(in);
234     if ( !args->hdr) clean_files_and_throw(args, "Could not read VCF/BCF headers from %s\n", args->fname);
235 
236     while ( 1 )
237     {
238         bcf1_t *rec = bcf_init();
239         int ret = bcf_read1(in, args->hdr, rec);
240         if ( ret < -1 ) clean_files_and_throw(args,"Error encountered while parsing the input\n");
241         if ( ret == -1 )
242         {
243             bcf_destroy(rec);
244             break;
245         }
246         if ( rec->errcode ) clean_files_and_throw(args,"Error encountered while parsing the input at %s:%d\n",bcf_seqname(args->hdr,rec),rec->pos+1);
247         bcf_unpack(rec, BCF_UN_STR);
248         buf_push(args, rec);
249     }
250     buf_flush(args);
251     free(args->buf);
252 
253     if ( hts_close(in)!=0 ) clean_files_and_throw(args,"Close failed: %s\n", args->fname);
254 }
255 
blk_is_smaller(blk_t ** aptr,blk_t ** bptr)256 static inline int blk_is_smaller(blk_t **aptr, blk_t **bptr)
257 {
258     blk_t *a = *aptr;
259     blk_t *b = *bptr;
260     int ret = cmp_bcf_pos(&a->rec, &b->rec);
261     if ( ret < 0 ) return 1;
262     return 0;
263 }
KHEAP_INIT(blk,blk_t *,blk_is_smaller)264 KHEAP_INIT(blk, blk_t*, blk_is_smaller)
265 
266 void blk_read(args_t *args, khp_blk_t *bhp, bcf_hdr_t *hdr, blk_t *blk)
267 {
268     if ( !blk->fh ) return;
269     int ret = bcf_read(blk->fh, hdr, blk->rec);
270     if ( ret < -1 ) clean_files_and_throw(args, "Error reading %s\n", blk->fname);
271     if ( ret == -1 )
272     {
273         if ( hts_close(blk->fh)!=0 ) clean_files_and_throw(args, "Close failed: %s\n", blk->fname);
274         blk->fh = 0;
275         return;
276     }
277     bcf_unpack(blk->rec, BCF_UN_STR);
278     khp_insert(blk, bhp, &blk);
279 }
280 
merge_blocks(args_t * args)281 void merge_blocks(args_t *args)
282 {
283     fprintf(stderr,"Merging %d temporary files\n", (int)args->nblk);
284     khp_blk_t *bhp = khp_init(blk);
285 
286     int i;
287     for (i=0; i<args->nblk; i++)
288     {
289         blk_t *blk = args->blk + i;
290         blk->fh = hts_open(blk->fname, "r");
291         if ( !blk->fh ) clean_files_and_throw(args, "Could not read %s: %s\n", blk->fname, strerror(errno));
292         bcf_hdr_t *hdr = bcf_hdr_read(blk->fh);
293         bcf_hdr_destroy(hdr);
294         blk->rec = bcf_init();
295         blk_read(args, bhp, args->hdr, blk);
296     }
297 
298     char wmode[8];
299     set_wmode(wmode,args->output_type,args->output_fname,args->clevel);
300     htsFile *out = hts_open(args->output_fname ? args->output_fname : "-", wmode);
301     if ( bcf_hdr_write(out, args->hdr)!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,args->output_fname);
302     while ( bhp->ndat )
303     {
304         blk_t *blk = bhp->dat[0];
305         if ( bcf_write(out, args->hdr, blk->rec)!=0 ) clean_files_and_throw(args, "[%s] Error: cannot write to %s\n", __func__,args->output_fname);
306         khp_delete(blk, bhp);
307         blk_read(args, bhp, args->hdr, blk);
308     }
309     if ( hts_close(out)!=0 ) clean_files_and_throw(args, "Close failed: %s\n", args->output_fname);
310 
311     clean_files(args);
312 
313     free(args->blk);
314     khp_destroy(blk, bhp);
315     fprintf(stderr,"Done\n");
316 }
317 
usage(args_t * args)318 static void usage(args_t *args)
319 {
320     fprintf(stderr, "\n");
321     fprintf(stderr, "About:   Sort VCF/BCF file.\n");
322     fprintf(stderr, "Usage:   bcftools sort [OPTIONS] <FILE.vcf>\n");
323     fprintf(stderr, "\n");
324     fprintf(stderr, "Options:\n");
325     fprintf(stderr, "    -m, --max-mem FLOAT[kMG]       maximum memory to use [768M]\n");    // using metric units, 1M=1e6
326     fprintf(stderr, "    -o, --output FILE              output file name [stdout]\n");
327     fprintf(stderr, "    -O, --output-type b|u|z|v      b: compressed BCF, u: uncompressed BCF, z: compressed VCF, v: uncompressed VCF [v]\n");
328     fprintf(stderr, "    -O, --output-type u|b|v|z[0-9] u/b: un/compressed BCF, v/z: un/compressed VCF, 0-9: compression level [v]\n");
329 
330 #ifdef _WIN32
331     fprintf(stderr, "    -T, --temp-dir DIR             temporary files [/bcftools.XXXXXX]\n");
332 #else
333     fprintf(stderr, "    -T, --temp-dir DIR             temporary files [/tmp/bcftools.XXXXXX]\n");
334 #endif
335     fprintf(stderr, "\n");
336     exit(1);
337 }
338 
parse_mem_string(const char * str)339 size_t parse_mem_string(const char *str)
340 {
341     char *tmp;
342     double mem = strtod(str, &tmp);
343     if ( tmp==str ) error("Could not parse the memory string: \"%s\"\n", str);
344     if ( !strcasecmp("k",tmp) ) mem *= 1000;
345     else if ( !strcasecmp("m",tmp) ) mem *= 1000*1000;
346     else if ( !strcasecmp("g",tmp) ) mem *= 1000*1000*1000;
347     return mem;
348 }
349 
350 void mkdir_p(const char *fmt, ...);
init(args_t * args)351 static void init(args_t *args)
352 {
353     args->max_mem *= 0.9;
354     args->mem_block = malloc(args->max_mem);
355     args->mem = 0;
356 
357     args->tmp_dir = init_tmp_prefix(args->tmp_dir);
358 
359 #ifdef _WIN32
360         int ret = mkdir(mktemp(args->tmp_dir), 0700);
361         if ( ret ) error("mkdir(%s) failed: %s\n", args->tmp_dir,strerror(errno));
362 #else
363         char *tmp = mkdtemp(args->tmp_dir);
364         if ( !tmp ) error("mkdtemp(%s) failed: %s\n",  args->tmp_dir,strerror(errno));
365         int ret = chmod(tmp, S_IRUSR|S_IWUSR|S_IXUSR);
366         if ( ret ) error("chmod(%s,S_IRUSR|S_IWUSR|S_IXUSR) failed: %s\n", args->tmp_dir,strerror(errno));
367 #endif
368 
369     fprintf(stderr,"Writing to %s\n", args->tmp_dir);
370 }
destroy(args_t * args)371 static void destroy(args_t *args)
372 {
373     bcf_hdr_destroy(args->hdr);
374     free(args->mem_block);
375     free(args->tmp_dir);
376     free(args);
377 }
378 
main_sort(int argc,char * argv[])379 int main_sort(int argc, char *argv[])
380 {
381     int c;
382     args_t *args  = (args_t*) calloc(1,sizeof(args_t));
383     args->argc    = argc; args->argv = argv;
384     args->max_mem = 768*1000*1000;
385     args->output_fname = "-";
386     args->clevel = -1;
387 
388     static struct option loptions[] =
389     {
390         {"max-mem",required_argument,NULL,'m'},
391         {"temp-dir",required_argument,NULL,'T'},
392         {"output-type",required_argument,NULL,'O'},
393         {"output-file",required_argument,NULL,'o'},
394         {"output",required_argument,NULL,'o'},
395         {"help",no_argument,NULL,'h'},
396         {0,0,0,0}
397     };
398     char *tmp;
399     while ((c = getopt_long(argc, argv, "m:T:O:o:h?",loptions,NULL)) >= 0)
400     {
401         switch (c)
402         {
403             case 'm': args->max_mem = parse_mem_string(optarg); break;
404             case 'T': args->tmp_dir = optarg; break;
405             case 'o': args->output_fname = optarg; break;
406             case 'O':
407                       switch (optarg[0]) {
408                           case 'b': args->output_type = FT_BCF_GZ; break;
409                           case 'u': args->output_type = FT_BCF; break;
410                           case 'z': args->output_type = FT_VCF_GZ; break;
411                           case 'v': args->output_type = FT_VCF; break;
412                           default:
413                           {
414                               args->clevel = strtol(optarg,&tmp,10);
415                               if ( *tmp || args->clevel<0 || args->clevel>9 ) error("The output type \"%s\" not recognised\n", optarg);
416                           }
417                       };
418                       if ( optarg[1] )
419                       {
420                           args->clevel = strtol(optarg+1,&tmp,10);
421                           if ( *tmp || args->clevel<0 || args->clevel>9 ) error("Could not parse argument: --compression-level %s\n", optarg+1);
422                       }
423                       break;
424             case 'h':
425             case '?': usage(args); break;
426             default: error("Unknown argument: %s\n", optarg);
427         }
428     }
429 
430     if ( optind>=argc )
431     {
432         if ( !isatty(fileno((FILE *)stdin)) ) args->fname = "-";  // reading from stdin
433         else usage(args);
434     }
435     else args->fname = argv[optind];
436 
437     init(args);
438     sort_blocks(args);
439     merge_blocks(args);
440     destroy(args);
441 
442     return 0;
443 }
444