1 /* The MIT License
2
3 Copyright (C) 2020-2021 Giulio Genovese
4
5 Author: Giulio Genovese <giulio.genovese@gmail.com>
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
24
25 */
26
27 #include <getopt.h>
28 #include <unistd.h>
29 #include <ctype.h>
30 #include <htslib/synced_bcf_reader.h>
31 #include "bcftools.h"
32 #include "htslib/khash_str2int.h"
33 #include "regidx.h"
34
35 #define FLT_INCLUDE 1
36 #define FLT_EXCLUDE 2
37
38 typedef struct
39 {
40 htsFile *fh; // output file handle
41 char *fname; // output file name
42 }
43 subset_t;
44
45 typedef struct
46 {
47 char *filter_str;
48 int filter_logic; // one of FLT_INCLUDE/FLT_EXCLUDE (-i or -e)
49 void *hash;
50 regidx_t *reg_idx;
51 regitr_t *reg_itr;
52 int argc, region_is_file, target_is_file, nsites, chunk_cnt, rec_cnt, scatter_is_file, output_type, n_threads, clevel;
53 char **argv, *region, *target, *scatter, *fname, *prefix, *extra, *output_dir;
54 bcf_srs_t *sr;
55 kstring_t str;
56 subset_t *sets;
57 int nsets, msets;
58 int record_cmd_line;
59 char **hts_opts;
60 int nhts_opts;
61 }
62 args_t;
63
about(void)64 const char *about(void)
65 {
66 return "Scatter VCF by chunks or regions, creating multiple VCFs.\n";
67 }
68
usage_text(void)69 static const char *usage_text(void)
70 {
71 return
72 "\n"
73 "About: Split VCF by chunks or regions, creating multiple VCFs.\n"
74 "\n"
75 "Usage: bcftools +scatter [Options]\n"
76 "Plugin options:\n"
77 " -e, --exclude EXPR Exclude sites for which the expression is true\n"
78 " -i, --include EXPR Select sites for which the expression is true\n"
79 " --no-version Do not append version and command line to the header\n"
80 " -o, --output DIR Write output to the directory DIR\n"
81 " -O, --output-type u|b|v|z[0-9] u/b: un/compressed BCF, v/z: un/compressed VCF, 0-9: compression level [v]\n"
82 " --threads INT Use multithreading with INT worker threads [0]\n"
83 " -r, --regions REGION Restrict to comma-separated list of regions\n"
84 " -R, --regions-file FILE Restrict to regions listed in a file\n"
85 " -t, --targets [^]REGION Similar to -r but streams rather than index-jumps. Exclude regions with \"^\" prefix\n"
86 " -T, --targets-file [^]FILE Similar to -R but streams rather than index-jumps. Exclude regions with \"^\" prefix\n"
87 " -n, --nsites-per-chunk INT Keep N sites for each chunk\n"
88 " -s, --scatter REGION Comma-separated list of regions defining variant windows for each output VCF\n"
89 " -S, --scatter-file FILE Regions listed in a file with an optional second column used to name each output VCF.\n"
90 " Variants from multiple regions can be assigned to the same output VCF.\n"
91 " -x, --extra STRING Output records not overlapping listed regions in separate file\n"
92 " -p, --prefix STRING Prepend string to output VCF names\n"
93 " --hts-opts LIST Low-level options to pass to HTSlib, e.g. block_size=32768\n"
94 "\n"
95 "Examples:\n"
96 " # Scatter a VCF file by shards with 10000 variants each\n"
97 " bcftools +scatter input.bcf -Ob -o dir -n 10000 -p part-\n"
98 "\n"
99 " # Scatter a VCF file by chromosomes\n"
100 " bcftools +scatter input.bcf -Ob -o dir -s chr1,chr2,chr3,<...>,chr22,chrX -x other\n"
101 "\n";
102 }
103
104 void mkdir_p(const char *fmt, ...);
105
106 // most of this code was inspired by Petr Danecek's code in regidx.c
107 #define MAX_COOR_0 REGIDX_MAX // CSI and hts_itr_query limit, 0-based
regidx_parse_reg_name(const char * line,char ** chr_beg,char ** chr_end,uint32_t * beg,uint32_t * end,void * payload,void * usr)108 int regidx_parse_reg_name(const char *line, char **chr_beg, char **chr_end, uint32_t *beg, uint32_t *end, void *payload, void *usr)
109 {
110 char *ss = (char*) line;
111 while ( *ss && isspace(*ss) ) ss++;
112 if ( !*ss ) return -1; // skip blank lines
113 if ( *ss=='#' ) return -1; // skip comments
114
115 char *se = ss;
116 while ( *se && *se!=':' && !isspace(*se) ) se++;
117
118 *chr_beg = ss;
119 *chr_end = se-1;
120
121 if ( *se != ':' )
122 {
123 *beg = 0;
124 *end = MAX_COOR_0;
125 }
126 else
127 {
128 ss = se+1;
129 *beg = strtod(ss, &se);
130 if ( ss==se ) { fprintf(stderr,"Could not parse reg line: %s\n", line); return -2; }
131 if ( *beg==0 ) { fprintf(stderr,"Could not parse reg line, expected 1-based coordinate: %s\n", line); return -2; }
132 (*beg)--;
133
134 if ( !se[0] || isspace(se[0])) {
135 *end = *beg;
136 } else if ( se[0] == '-' && (!se[1] || isspace(se[1])) ) {
137 *end = MAX_COOR_0;
138 se++;
139 } else {
140 ss = se+1;
141 *end = strtod(ss, &se);
142 if ( ss==se ) *end = *beg;
143 else if ( *end==0 ) { fprintf(stderr,"Could not parse reg line, expected 1-based coordinate: %s\n", line); return -2; }
144 else (*end)--;
145 }
146 }
147
148 ss = se;
149 while ( *ss && isspace(*ss) ) ss++;
150 if ( !ss[0] ) ss = (char *)line;
151
152 int *idx = (int *)payload;
153 args_t *args = (args_t *)usr;
154 int ret = khash_str2int_get(args->hash, ss, idx);
155 if (ret < 0) {
156 hts_expand0(subset_t, args->nsets + 1, args->msets, args->sets);
157 args->sets[args->nsets].fname = strdup(ss);
158 *idx = khash_str2int_inc(args->hash, args->sets[args->nsets].fname);
159 args->nsets++;
160 }
161
162 return 0;
163 }
164
open_set(subset_t * set,args_t * args)165 static void open_set(subset_t *set, args_t *args)
166 {
167 int j;
168 args->str.l = 0;
169 kputs(args->output_dir, &args->str);
170 if ( args->str.s[args->str.l-1] != '/' ) kputc('/', &args->str);
171 int k, l = args->str.l;
172 if (args->prefix) kputs(args->prefix, &args->str);
173 kputs(set->fname, &args->str);
174 for (k=l; k<args->str.l; k++) if ( isspace(args->str.s[k]) ) args->str.s[k] = '_';
175 if ( args->output_type & FT_BCF ) kputs(".bcf", &args->str);
176 else if ( args->output_type & FT_GZ ) kputs(".vcf.gz", &args->str);
177 else kputs(".vcf", &args->str);
178
179 char wmode[8];
180 set_wmode(wmode,args->output_type,args->str.s,args->clevel);
181 set->fh = hts_open(args->str.s, wmode);
182 if ( set->fh == NULL ) error("[%s] Error: cannot write to \"%s\": %s\n", __func__, args->str.s, strerror(errno));
183 if ( args->n_threads > 0)
184 hts_set_opt(set->fh, HTS_OPT_THREAD_POOL, args->sr->p);
185 if ( args->hts_opts )
186 {
187 hts_opt *opts = NULL;
188 for (j=0; j<args->nhts_opts; j++)
189 if ( hts_opt_add(&opts, args->hts_opts[j]) ) error("Could not set the HTS option \"%s\"\n", args->hts_opts[j]);
190 if ( hts_opt_apply(set->fh, opts) ) error("Could not apply the HTS options\n");
191 hts_opt_free(opts);
192 }
193 bcf_hdr_t *hdr = bcf_sr_get_header(args->sr, 0);
194 if ( bcf_hdr_write(set->fh, hdr)!=0 ) error("[%s] Error: cannot write the header to %s\n", __func__, args->str.s);
195 if (args->record_cmd_line) bcf_hdr_append_version(hdr, args->argc, args->argv, "bcftools_plugin");
196 }
197
init_data(args_t * args)198 static void init_data(args_t *args)
199 {
200 args->sr = bcf_sr_init();
201 if ( args->region )
202 {
203 args->sr->require_index = 1;
204 if ( bcf_sr_set_regions(args->sr, args->region, args->region_is_file)<0 )
205 error("Failed to read the regions: %s\n", args->region);
206 }
207 if ( args->target && bcf_sr_set_targets(args->sr, args->target, args->target_is_file, 0)<0 )
208 error("Failed to read the targets: %s\n", args->target);
209 if ( bcf_sr_set_threads(args->sr, args->n_threads)<0 ) error("Failed to create threads\n");
210 if ( !bcf_sr_add_reader(args->sr, args->fname) ) error("Error: %s\n", bcf_sr_strerror(args->sr->errnum));
211
212 mkdir_p("%s/", args->output_dir);
213
214 if (args->nsites) {
215 args->nsets = 1;
216 hts_expand0(subset_t, args->nsets, args->msets, args->sets);
217 } else {
218 args->hash = khash_str2int_init();
219 if (args->scatter_is_file) {
220 args->reg_idx = regidx_init(args->scatter, regidx_parse_reg_name, NULL, sizeof(int), args);
221 if ( !args->reg_idx ) exit(-1);
222 } else {
223 char *ss;
224 for (ss = args->scatter; *ss; ss++) if (*ss == ',') *ss = '\n';
225 args->reg_idx = regidx_init_string(args->scatter, regidx_parse_reg_name, NULL, sizeof(int), args);
226 for (ss = args->scatter; *ss; ss++) if (*ss == '\n') *ss = ',';
227 }
228 args->reg_itr = regitr_init(args->reg_idx);
229 if (args->extra) {
230 hts_expand0(subset_t, args->nsets + 1, args->msets, args->sets);
231 args->sets[args->nsets].fname = strdup(args->extra);
232 args->nsets++;
233 }
234 int i;
235 for (i=0; i<args->nsets; i++) open_set(&args->sets[i], args);
236 }
237 }
238
destroy_data(args_t * args)239 static void destroy_data(args_t *args)
240 {
241 if (args->scatter) {
242 khash_str2int_destroy(args->hash);
243 regidx_destroy(args->reg_idx);
244 regitr_destroy(args->reg_itr);
245 }
246 free(args->str.s);
247 int i;
248 for (i=0; i<args->nsets; i++)
249 {
250 subset_t *set = &args->sets[i];
251 if (set->fname) {
252 if ( hts_close(set->fh)!=0 ) error("Error: close failed .. %s\n", set->fname);
253 free(set->fname);
254 }
255 }
256 for (i=0; i<args->nhts_opts; i++) free(args->hts_opts[i]);
257 free(args->hts_opts);
258 free(args->sets);
259 bcf_sr_destroy(args->sr);
260 free(args);
261 }
262
process(args_t * args)263 static void process(args_t *args)
264 {
265 bcf_hdr_t *hdr = bcf_sr_get_header(args->sr, 0);
266 bcf1_t *rec = bcf_sr_get_line(args->sr, 0);
267 subset_t *set;
268
269 if (args->nsites) {
270 subset_t *set = &args->sets[0];
271 if (!args->rec_cnt) {
272 args->str.l = 0;
273 kputw(args->chunk_cnt, &args->str);
274 set->fname = strdup(args->str.s);
275 open_set(set, args);
276 args->rec_cnt = 0;
277 }
278 if ( bcf_write(set->fh, hdr, rec)!=0 ) error("[%s] Error: failed to write the record\n", __func__);
279 args->rec_cnt++;
280 if (args->rec_cnt == args->nsites) {
281 args->rec_cnt = 0;
282 if ( hts_close(set->fh)!=0 ) error("Error: close failed .. %s\n", set->fname);
283 free(set->fname);
284 set->fname = NULL;
285 args->chunk_cnt++;
286 }
287 } else {
288 if ( regidx_overlap(args->reg_idx, bcf_hdr_id2name(hdr, rec->rid), rec->pos, rec->pos + rec->rlen-1, args->reg_itr) ) {
289 while (regitr_overlap(args->reg_itr)) {
290 int idx = regitr_payload(args->reg_itr, int);
291 set = &args->sets[idx];
292 if ( bcf_write(set->fh, hdr, rec)!=0 ) error("[%s] Error: failed to write the record\n", __func__);
293 }
294 } else if (args->extra) {
295 set = &args->sets[args->nsets-1];
296 if ( bcf_write(set->fh, hdr, rec)!=0 ) error("[%s] Error: failed to write the record\n", __func__);
297 }
298 }
299 }
300
run(int argc,char ** argv)301 int run(int argc, char **argv)
302 {
303 args_t *args = (args_t*) calloc(1, sizeof(args_t));
304 args->argc = argc; args->argv = argv;
305 args->output_type = FT_VCF;
306 args->record_cmd_line = 1;
307 args->clevel = -1;
308 static struct option loptions[] =
309 {
310 {"exclude",required_argument,NULL,'e'},
311 {"include",required_argument,NULL,'i'},
312 {"no-version",no_argument,NULL,1},
313 {"output",required_argument,NULL,'o'},
314 {"output-type",required_argument,NULL,'O'},
315 {"threads",required_argument,NULL,2},
316 {"regions",required_argument,NULL,'r'},
317 {"regions-file",required_argument,NULL,'R'},
318 {"targets", required_argument, NULL, 't'},
319 {"targets-file", required_argument, NULL, 'T'},
320 {"nsites-per-chunk",required_argument,NULL,'n'},
321 {"scatter",required_argument,NULL,'s'},
322 {"scatter-file",required_argument,NULL,'S'},
323 {"extra",required_argument,NULL,'x'},
324 {"prefix",required_argument,NULL,'p'},
325 {"hts-opts",required_argument,NULL,3},
326 {NULL,0,NULL,0}
327 };
328 int c;
329 char *tmp;
330 while ((c = getopt_long(argc, argv, "e:i:o:O:r:R:t:T:n:s:S:x:p:h?", loptions, NULL)) >= 0)
331 {
332 switch (c)
333 {
334 case 'e':
335 if ( args->filter_str ) error("Error: only one -i or -e expression can be given, and they cannot be combined\n");
336 args->filter_str = optarg; args->filter_logic |= FLT_EXCLUDE; break;
337 case 'i':
338 if ( args->filter_str ) error("Error: only one -i or -e expression can be given, and they cannot be combined\n");
339 args->filter_str = optarg; args->filter_logic |= FLT_INCLUDE; break;
340 case 1 : args->record_cmd_line = 0; break;
341 case 'o': args->output_dir = optarg; break;
342 case 'O':
343 switch (optarg[0]) {
344 case 'b': args->output_type = FT_BCF_GZ; break;
345 case 'u': args->output_type = FT_BCF; break;
346 case 'z': args->output_type = FT_VCF_GZ; break;
347 case 'v': args->output_type = FT_VCF; break;
348 default:
349 {
350 args->clevel = strtol(optarg,&tmp,10);
351 if ( *tmp || args->clevel<0 || args->clevel>9 ) error("The output type \"%s\" not recognised\n", optarg);
352 }
353 }
354 if ( optarg[1] )
355 {
356 args->clevel = strtol(optarg+1,&tmp,10);
357 if ( *tmp || args->clevel<0 || args->clevel>9 ) error("Could not parse argument: --compression-level %s\n", optarg+1);
358 }
359 break;
360 case 2 : args->n_threads = strtol(optarg, 0, 0); break;
361 case 'r': args->region = optarg; break;
362 case 'R': args->region = optarg; args->region_is_file = 1; break;
363 case 't': args->target = optarg; break;
364 case 'T': args->target = optarg; args->target_is_file = 1; break;
365 case 'n':
366 args->nsites = strtod(optarg, &tmp);
367 if ( tmp==optarg || *tmp ) error("Could not parse: --nsites-per-chunk %s\n", optarg);
368 if ( args->nsites <= 0 ) error("Positive integer required: --nsites-per-chunk %s\n", optarg);
369 break;
370 case 's': args->scatter = optarg; break;
371 case 'S': args->scatter = optarg; args->scatter_is_file = 1; break;
372 case 'x': args->extra = optarg; break;
373 case 'p': args->prefix = optarg; break;
374 case 3 : args->hts_opts = hts_readlist(optarg, 0, &args->nhts_opts); break;
375 case 'h':
376 case '?':
377 default: error("%s", usage_text()); break;
378 }
379 }
380 if ( optind==argc )
381 {
382 if ( !isatty(fileno((FILE *)stdin)) ) args->fname = "-"; // reading from stdin
383 else { error("%s", usage_text()); }
384 }
385 else if ( optind+1!=argc ) error("%s", usage_text());
386 else args->fname = argv[optind];
387
388 if ( !args->output_dir ) error("Missing the -o option\n");
389 if ( args->filter_logic == (FLT_EXCLUDE|FLT_INCLUDE) ) error("Only one of -i or -e can be given\n");
390 if ( !args->nsites && !args->scatter ) error("Missing either the -n or one of the -s or -S options\n");
391 if ( args->nsites && args->scatter ) error("Only one of -n or either -s or -S can be given\n");
392 if ( args->nsites && args->extra ) error("Cannot use -x together with -n\n");
393
394 init_data(args);
395
396 while ( bcf_sr_next_line(args->sr) ) process(args);
397
398 destroy_data(args);
399 return 0;
400 }
401