1 /* The MIT License
2 
3     Copyright (C) 2020-2021 Giulio Genovese
4 
5     Author: Giulio Genovese <giulio.genovese@gmail.com>
6 
7     Permission is hereby granted, free of charge, to any person obtaining a copy
8     of this software and associated documentation files (the "Software"), to deal
9     in the Software without restriction, including without limitation the rights
10     to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11     copies of the Software, and to permit persons to whom the Software is
12     furnished to do so, subject to the following conditions:
13 
14     The above copyright notice and this permission notice shall be included in
15     all copies or substantial portions of the Software.
16 
17     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22     OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23     THE SOFTWARE.
24 
25  */
26 
27 #include <getopt.h>
28 #include <unistd.h>
29 #include <ctype.h>
30 #include <htslib/synced_bcf_reader.h>
31 #include "bcftools.h"
32 #include "htslib/khash_str2int.h"
33 #include "regidx.h"
34 
35 #define FLT_INCLUDE 1
36 #define FLT_EXCLUDE 2
37 
38 typedef struct
39 {
40     htsFile *fh;        // output file handle
41     char *fname;        // output file name
42 }
43 subset_t;
44 
45 typedef struct
46 {
47     char *filter_str;
48     int filter_logic;   // one of FLT_INCLUDE/FLT_EXCLUDE (-i or -e)
49     void *hash;
50     regidx_t *reg_idx;
51     regitr_t *reg_itr;
52     int argc, region_is_file, target_is_file, nsites, chunk_cnt, rec_cnt, scatter_is_file, output_type, n_threads, clevel;
53     char **argv, *region, *target, *scatter, *fname, *prefix, *extra, *output_dir;
54     bcf_srs_t *sr;
55     kstring_t str;
56     subset_t *sets;
57     int nsets, msets;
58     int record_cmd_line;
59     char **hts_opts;
60     int nhts_opts;
61 }
62 args_t;
63 
about(void)64 const char *about(void)
65 {
66     return "Scatter VCF by chunks or regions, creating multiple VCFs.\n";
67 }
68 
usage_text(void)69 static const char *usage_text(void)
70 {
71     return
72         "\n"
73         "About: Split VCF by chunks or regions, creating multiple VCFs.\n"
74         "\n"
75         "Usage: bcftools +scatter [Options]\n"
76         "Plugin options:\n"
77         "   -e, --exclude EXPR              Exclude sites for which the expression is true\n"
78         "   -i, --include EXPR              Select sites for which the expression is true\n"
79         "       --no-version                Do not append version and command line to the header\n"
80         "   -o, --output DIR                Write output to the directory DIR\n"
81         "   -O, --output-type u|b|v|z[0-9]  u/b: un/compressed BCF, v/z: un/compressed VCF, 0-9: compression level [v]\n"
82         "       --threads INT               Use multithreading with INT worker threads [0]\n"
83         "   -r, --regions REGION            Restrict to comma-separated list of regions\n"
84         "   -R, --regions-file FILE         Restrict to regions listed in a file\n"
85         "   -t, --targets [^]REGION         Similar to -r but streams rather than index-jumps. Exclude regions with \"^\" prefix\n"
86         "   -T, --targets-file [^]FILE      Similar to -R but streams rather than index-jumps. Exclude regions with \"^\" prefix\n"
87         "   -n, --nsites-per-chunk INT      Keep N sites for each chunk\n"
88         "   -s, --scatter REGION            Comma-separated list of regions defining variant windows for each output VCF\n"
89         "   -S, --scatter-file FILE         Regions listed in a file with an optional second column used to name each output VCF.\n"
90         "                                        Variants from multiple regions can be assigned to the same output VCF.\n"
91         "   -x, --extra STRING              Output records not overlapping listed regions in separate file\n"
92         "   -p, --prefix STRING             Prepend string to output VCF names\n"
93         "       --hts-opts LIST             Low-level options to pass to HTSlib, e.g. block_size=32768\n"
94         "\n"
95         "Examples:\n"
96         "   # Scatter a VCF file by shards with 10000 variants each\n"
97         "   bcftools +scatter input.bcf -Ob -o dir -n 10000 -p part-\n"
98         "\n"
99         "   # Scatter a VCF file by chromosomes\n"
100         "   bcftools +scatter input.bcf -Ob -o dir -s chr1,chr2,chr3,<...>,chr22,chrX -x other\n"
101         "\n";
102 }
103 
104 void mkdir_p(const char *fmt, ...);
105 
106 // most of this code was inspired by Petr Danecek's code in regidx.c
107 #define MAX_COOR_0 REGIDX_MAX   // CSI and hts_itr_query limit, 0-based
regidx_parse_reg_name(const char * line,char ** chr_beg,char ** chr_end,uint32_t * beg,uint32_t * end,void * payload,void * usr)108 int regidx_parse_reg_name(const char *line, char **chr_beg, char **chr_end, uint32_t *beg, uint32_t *end, void *payload, void *usr)
109 {
110     char *ss = (char*) line;
111     while ( *ss && isspace(*ss) ) ss++;
112     if ( !*ss ) return -1;      // skip blank lines
113     if ( *ss=='#' ) return -1;  // skip comments
114 
115     char *se = ss;
116     while ( *se && *se!=':' && !isspace(*se) ) se++;
117 
118     *chr_beg = ss;
119     *chr_end = se-1;
120 
121     if ( *se != ':' )
122     {
123         *beg = 0;
124         *end = MAX_COOR_0;
125     }
126     else
127     {
128         ss = se+1;
129         *beg = strtod(ss, &se);
130         if ( ss==se ) { fprintf(stderr,"Could not parse reg line: %s\n", line); return -2; }
131         if ( *beg==0 ) { fprintf(stderr,"Could not parse reg line, expected 1-based coordinate: %s\n", line); return -2; }
132         (*beg)--;
133 
134         if ( !se[0] || isspace(se[0])) {
135             *end = *beg;
136         } else if ( se[0] == '-' && (!se[1] || isspace(se[1])) ) {
137            *end = MAX_COOR_0;
138            se++;
139         } else {
140             ss = se+1;
141             *end = strtod(ss, &se);
142             if ( ss==se ) *end = *beg;
143             else if ( *end==0 ) { fprintf(stderr,"Could not parse reg line, expected 1-based coordinate: %s\n", line); return -2; }
144             else (*end)--;
145         }
146     }
147 
148     ss = se;
149     while ( *ss && isspace(*ss) ) ss++;
150     if ( !ss[0] ) ss = (char *)line;
151 
152     int *idx = (int *)payload;
153     args_t *args = (args_t *)usr;
154     int ret = khash_str2int_get(args->hash, ss, idx);
155     if (ret < 0) {
156         hts_expand0(subset_t, args->nsets + 1, args->msets, args->sets);
157         args->sets[args->nsets].fname = strdup(ss);
158         *idx = khash_str2int_inc(args->hash, args->sets[args->nsets].fname);
159         args->nsets++;
160     }
161 
162     return 0;
163 }
164 
open_set(subset_t * set,args_t * args)165 static void open_set(subset_t *set, args_t *args)
166 {
167     int j;
168     args->str.l = 0;
169     kputs(args->output_dir, &args->str);
170     if ( args->str.s[args->str.l-1] != '/' ) kputc('/', &args->str);
171     int k, l = args->str.l;
172     if (args->prefix) kputs(args->prefix, &args->str);
173     kputs(set->fname, &args->str);
174     for (k=l; k<args->str.l; k++) if ( isspace(args->str.s[k]) ) args->str.s[k] = '_';
175     if ( args->output_type & FT_BCF ) kputs(".bcf", &args->str);
176     else if ( args->output_type & FT_GZ ) kputs(".vcf.gz", &args->str);
177     else kputs(".vcf", &args->str);
178 
179     char wmode[8];
180     set_wmode(wmode,args->output_type,args->str.s,args->clevel);
181     set->fh = hts_open(args->str.s, wmode);
182     if ( set->fh == NULL ) error("[%s] Error: cannot write to \"%s\": %s\n", __func__, args->str.s, strerror(errno));
183     if ( args->n_threads > 0)
184         hts_set_opt(set->fh, HTS_OPT_THREAD_POOL, args->sr->p);
185     if ( args->hts_opts )
186     {
187         hts_opt *opts = NULL;
188         for (j=0; j<args->nhts_opts; j++)
189             if ( hts_opt_add(&opts, args->hts_opts[j]) ) error("Could not set the HTS option \"%s\"\n", args->hts_opts[j]);
190         if ( hts_opt_apply(set->fh, opts) ) error("Could not apply the HTS options\n");
191         hts_opt_free(opts);
192     }
193     bcf_hdr_t *hdr = bcf_sr_get_header(args->sr, 0);
194     if ( bcf_hdr_write(set->fh, hdr)!=0 ) error("[%s] Error: cannot write the header to %s\n", __func__, args->str.s);
195     if (args->record_cmd_line) bcf_hdr_append_version(hdr, args->argc, args->argv, "bcftools_plugin");
196 }
197 
init_data(args_t * args)198 static void init_data(args_t *args)
199 {
200     args->sr = bcf_sr_init();
201     if ( args->region )
202     {
203         args->sr->require_index = 1;
204         if ( bcf_sr_set_regions(args->sr, args->region, args->region_is_file)<0 )
205             error("Failed to read the regions: %s\n", args->region);
206     }
207     if ( args->target && bcf_sr_set_targets(args->sr, args->target, args->target_is_file, 0)<0 )
208         error("Failed to read the targets: %s\n", args->target);
209     if ( bcf_sr_set_threads(args->sr, args->n_threads)<0 ) error("Failed to create threads\n");
210     if ( !bcf_sr_add_reader(args->sr, args->fname) ) error("Error: %s\n", bcf_sr_strerror(args->sr->errnum));
211 
212     mkdir_p("%s/", args->output_dir);
213 
214     if (args->nsites) {
215         args->nsets = 1;
216         hts_expand0(subset_t, args->nsets, args->msets, args->sets);
217     } else {
218         args->hash = khash_str2int_init();
219         if (args->scatter_is_file) {
220             args->reg_idx = regidx_init(args->scatter, regidx_parse_reg_name, NULL, sizeof(int), args);
221             if ( !args->reg_idx ) exit(-1);
222         } else {
223             char *ss;
224             for (ss = args->scatter; *ss; ss++) if (*ss == ',') *ss = '\n';
225             args->reg_idx = regidx_init_string(args->scatter, regidx_parse_reg_name, NULL, sizeof(int), args);
226             for (ss = args->scatter; *ss; ss++) if (*ss == '\n') *ss = ',';
227         }
228         args->reg_itr = regitr_init(args->reg_idx);
229         if (args->extra) {
230             hts_expand0(subset_t, args->nsets + 1, args->msets, args->sets);
231             args->sets[args->nsets].fname = strdup(args->extra);
232             args->nsets++;
233         }
234         int i;
235         for (i=0; i<args->nsets; i++) open_set(&args->sets[i], args);
236     }
237 }
238 
destroy_data(args_t * args)239 static void destroy_data(args_t *args)
240 {
241     if (args->scatter) {
242         khash_str2int_destroy(args->hash);
243         regidx_destroy(args->reg_idx);
244         regitr_destroy(args->reg_itr);
245     }
246     free(args->str.s);
247     int i;
248     for (i=0; i<args->nsets; i++)
249     {
250         subset_t *set = &args->sets[i];
251         if (set->fname) {
252             if ( hts_close(set->fh)!=0 ) error("Error: close failed .. %s\n", set->fname);
253             free(set->fname);
254         }
255     }
256     for (i=0; i<args->nhts_opts; i++) free(args->hts_opts[i]);
257     free(args->hts_opts);
258     free(args->sets);
259     bcf_sr_destroy(args->sr);
260     free(args);
261 }
262 
process(args_t * args)263 static void process(args_t *args)
264 {
265     bcf_hdr_t *hdr = bcf_sr_get_header(args->sr, 0);
266     bcf1_t *rec = bcf_sr_get_line(args->sr, 0);
267     subset_t *set;
268 
269     if (args->nsites) {
270         subset_t *set = &args->sets[0];
271         if (!args->rec_cnt) {
272             args->str.l = 0;
273             kputw(args->chunk_cnt, &args->str);
274             set->fname = strdup(args->str.s);
275             open_set(set, args);
276             args->rec_cnt = 0;
277         }
278         if ( bcf_write(set->fh, hdr, rec)!=0 ) error("[%s] Error: failed to write the record\n", __func__);
279         args->rec_cnt++;
280         if (args->rec_cnt == args->nsites) {
281           args->rec_cnt = 0;
282           if ( hts_close(set->fh)!=0 ) error("Error: close failed .. %s\n", set->fname);
283           free(set->fname);
284           set->fname = NULL;
285           args->chunk_cnt++;
286         }
287     } else {
288         if ( regidx_overlap(args->reg_idx, bcf_hdr_id2name(hdr, rec->rid), rec->pos, rec->pos + rec->rlen-1, args->reg_itr) ) {
289             while (regitr_overlap(args->reg_itr)) {
290                 int idx = regitr_payload(args->reg_itr, int);
291                 set = &args->sets[idx];
292                 if ( bcf_write(set->fh, hdr, rec)!=0 ) error("[%s] Error: failed to write the record\n", __func__);
293             }
294         } else if (args->extra) {
295             set = &args->sets[args->nsets-1];
296             if ( bcf_write(set->fh, hdr, rec)!=0 ) error("[%s] Error: failed to write the record\n", __func__);
297         }
298     }
299 }
300 
run(int argc,char ** argv)301 int run(int argc, char **argv)
302 {
303     args_t *args = (args_t*) calloc(1, sizeof(args_t));
304     args->argc   = argc; args->argv = argv;
305     args->output_type  = FT_VCF;
306     args->record_cmd_line = 1;
307     args->clevel = -1;
308     static struct option loptions[] =
309     {
310         {"exclude",required_argument,NULL,'e'},
311         {"include",required_argument,NULL,'i'},
312         {"no-version",no_argument,NULL,1},
313         {"output",required_argument,NULL,'o'},
314         {"output-type",required_argument,NULL,'O'},
315         {"threads",required_argument,NULL,2},
316         {"regions",required_argument,NULL,'r'},
317         {"regions-file",required_argument,NULL,'R'},
318         {"targets", required_argument, NULL, 't'},
319         {"targets-file", required_argument, NULL, 'T'},
320         {"nsites-per-chunk",required_argument,NULL,'n'},
321         {"scatter",required_argument,NULL,'s'},
322         {"scatter-file",required_argument,NULL,'S'},
323         {"extra",required_argument,NULL,'x'},
324         {"prefix",required_argument,NULL,'p'},
325         {"hts-opts",required_argument,NULL,3},
326         {NULL,0,NULL,0}
327     };
328     int c;
329     char *tmp;
330     while ((c = getopt_long(argc, argv, "e:i:o:O:r:R:t:T:n:s:S:x:p:h?", loptions, NULL)) >= 0)
331     {
332         switch (c)
333         {
334             case 'e':
335                 if ( args->filter_str ) error("Error: only one -i or -e expression can be given, and they cannot be combined\n");
336                 args->filter_str = optarg; args->filter_logic |= FLT_EXCLUDE; break;
337             case 'i':
338                 if ( args->filter_str ) error("Error: only one -i or -e expression can be given, and they cannot be combined\n");
339                 args->filter_str = optarg; args->filter_logic |= FLT_INCLUDE; break;
340             case  1 : args->record_cmd_line = 0; break;
341             case 'o': args->output_dir = optarg; break;
342             case 'O':
343                       switch (optarg[0]) {
344                           case 'b': args->output_type = FT_BCF_GZ; break;
345                           case 'u': args->output_type = FT_BCF; break;
346                           case 'z': args->output_type = FT_VCF_GZ; break;
347                           case 'v': args->output_type = FT_VCF; break;
348                           default:
349                           {
350                               args->clevel = strtol(optarg,&tmp,10);
351                               if ( *tmp || args->clevel<0 || args->clevel>9 ) error("The output type \"%s\" not recognised\n", optarg);
352                           }
353                       }
354                       if ( optarg[1] )
355                       {
356                           args->clevel = strtol(optarg+1,&tmp,10);
357                           if ( *tmp || args->clevel<0 || args->clevel>9 ) error("Could not parse argument: --compression-level %s\n", optarg+1);
358                       }
359                       break;
360             case  2 : args->n_threads = strtol(optarg, 0, 0); break;
361             case 'r': args->region = optarg; break;
362             case 'R': args->region = optarg; args->region_is_file = 1;  break;
363             case 't': args->target = optarg; break;
364             case 'T': args->target = optarg; args->target_is_file = 1; break;
365             case 'n':
366                 args->nsites = strtod(optarg, &tmp);
367                 if ( tmp==optarg || *tmp ) error("Could not parse: --nsites-per-chunk %s\n", optarg);
368                 if ( args->nsites <= 0 ) error("Positive integer required: --nsites-per-chunk %s\n", optarg);
369                 break;
370             case 's': args->scatter = optarg; break;
371             case 'S': args->scatter = optarg; args->scatter_is_file = 1;  break;
372             case 'x': args->extra = optarg;  break;
373             case 'p': args->prefix = optarg;  break;
374             case  3 : args->hts_opts = hts_readlist(optarg, 0, &args->nhts_opts); break;
375             case 'h':
376             case '?':
377             default: error("%s", usage_text()); break;
378         }
379     }
380     if ( optind==argc )
381     {
382         if ( !isatty(fileno((FILE *)stdin)) ) args->fname = "-";  // reading from stdin
383         else { error("%s", usage_text()); }
384     }
385     else if ( optind+1!=argc ) error("%s", usage_text());
386     else args->fname = argv[optind];
387 
388     if ( !args->output_dir ) error("Missing the -o option\n");
389     if ( args->filter_logic == (FLT_EXCLUDE|FLT_INCLUDE) ) error("Only one of -i or -e can be given\n");
390     if ( !args->nsites && !args->scatter ) error("Missing either the -n or one of the -s or -S options\n");
391     if ( args->nsites && args->scatter ) error("Only one of -n or either -s or -S can be given\n");
392     if ( args->nsites && args->extra ) error("Cannot use -x together with -n\n");
393 
394     init_data(args);
395 
396     while ( bcf_sr_next_line(args->sr) ) process(args);
397 
398     destroy_data(args);
399     return 0;
400 }
401