1 /* synced_bcf_reader.c -- stream through multiple VCF files.
2
3 Copyright (C) 2012-2014 Genome Research Ltd.
4
5 Author: Petr Danecek <pd3@sanger.ac.uk>
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE. */
24
25 #include <config.h>
26
27 #include <stdio.h>
28 #include <unistd.h>
29 #include <string.h>
30 #include <strings.h>
31 #include <limits.h>
32 #include <errno.h>
33 #include <sys/stat.h>
34 #include "htslib/synced_bcf_reader.h"
35 #include "htslib/kseq.h"
36 #include "htslib/khash_str2int.h"
37 #include "htslib/bgzf.h"
38 #include "htslib/thread_pool.h"
39 #include "bcf_sr_sort.h"
40
41 #define MAX_CSI_COOR 0x7fffffff // maximum indexable coordinate of .csi
42
43 typedef struct
44 {
45 uint32_t start, end;
46 }
47 region1_t;
48
49 typedef struct _region_t
50 {
51 region1_t *regs;
52 int nregs, mregs, creg;
53 }
54 region_t;
55
56 #define BCF_SR_AUX(x) ((aux_t*)((x)->aux))
57 typedef struct
58 {
59 sr_sort_t sort;
60 }
61 aux_t;
62
63 static void _regions_add(bcf_sr_regions_t *reg, const char *chr, int start, int end);
64 static bcf_sr_regions_t *_regions_init_string(const char *str);
65 static int _regions_match_alleles(bcf_sr_regions_t *reg, int als_idx, bcf1_t *rec);
66
bcf_sr_strerror(int errnum)67 char *bcf_sr_strerror(int errnum)
68 {
69 switch (errnum)
70 {
71 case open_failed:
72 return strerror(errno); break;
73 case not_bgzf:
74 return "not compressed with bgzip"; break;
75 case idx_load_failed:
76 return "could not load index"; break;
77 case file_type_error:
78 return "unknown file type"; break;
79 case api_usage_error:
80 return "API usage error"; break;
81 case header_error:
82 return "could not parse header"; break;
83 case no_eof:
84 return "no BGZF EOF marker; file may be truncated"; break;
85 case no_memory:
86 return "Out of memory"; break;
87 case vcf_parse_error:
88 return "VCF parse error"; break;
89 case bcf_read_error:
90 return "BCF read error"; break;
91 default: return "";
92 }
93 }
94
bcf_sr_set_opt(bcf_srs_t * readers,bcf_sr_opt_t opt,...)95 int bcf_sr_set_opt(bcf_srs_t *readers, bcf_sr_opt_t opt, ...)
96 {
97 va_list args;
98 switch (opt)
99 {
100 case BCF_SR_REQUIRE_IDX:
101 readers->require_index = 1;
102 return 0;
103
104 case BCF_SR_PAIR_LOGIC:
105 va_start(args, opt);
106 BCF_SR_AUX(readers)->sort.pair = va_arg(args, int);
107 return 0;
108
109 default:
110 break;
111 }
112 return 1;
113 }
114
init_filters(bcf_hdr_t * hdr,const char * filters,int * nfilters)115 static int *init_filters(bcf_hdr_t *hdr, const char *filters, int *nfilters)
116 {
117 kstring_t str = {0,0,0};
118 const char *tmp = filters, *prev = filters;
119 int nout = 0, *out = NULL;
120 while ( 1 )
121 {
122 if ( *tmp==',' || !*tmp )
123 {
124 out = (int*) realloc(out, (nout+1)*sizeof(int));
125 if ( tmp-prev==1 && *prev=='.' )
126 {
127 out[nout] = -1;
128 nout++;
129 }
130 else
131 {
132 str.l = 0;
133 kputsn(prev, tmp-prev, &str);
134 out[nout] = bcf_hdr_id2int(hdr, BCF_DT_ID, str.s);
135 if ( out[nout]>=0 ) nout++;
136 }
137 if ( !*tmp ) break;
138 prev = tmp+1;
139 }
140 tmp++;
141 }
142 if ( str.m ) free(str.s);
143 *nfilters = nout;
144 return out;
145 }
146
bcf_sr_set_regions(bcf_srs_t * readers,const char * regions,int is_file)147 int bcf_sr_set_regions(bcf_srs_t *readers, const char *regions, int is_file)
148 {
149 assert( !readers->regions );
150 if ( readers->nreaders )
151 {
152 fprintf(stderr,"[%s:%d %s] Error: bcf_sr_set_regions() must be called before bcf_sr_add_reader()\n", __FILE__,__LINE__,__FUNCTION__);
153 return -1;
154 }
155 readers->regions = bcf_sr_regions_init(regions,is_file,0,1,-2);
156 if ( !readers->regions ) return -1;
157 readers->explicit_regs = 1;
158 readers->require_index = 1;
159 return 0;
160 }
bcf_sr_set_targets(bcf_srs_t * readers,const char * targets,int is_file,int alleles)161 int bcf_sr_set_targets(bcf_srs_t *readers, const char *targets, int is_file, int alleles)
162 {
163 assert( !readers->targets );
164 if ( targets[0]=='^' )
165 {
166 readers->targets_exclude = 1;
167 targets++;
168 }
169 readers->targets = bcf_sr_regions_init(targets,is_file,0,1,-2);
170 if ( !readers->targets ) return -1;
171 readers->targets_als = alleles;
172 return 0;
173 }
174
bcf_sr_set_threads(bcf_srs_t * files,int n_threads)175 int bcf_sr_set_threads(bcf_srs_t *files, int n_threads)
176 {
177 if (!(files->n_threads = n_threads))
178 return 0;
179
180 files->p = calloc(1, sizeof(*files->p));
181 if (!files->p) {
182 files->errnum = no_memory;
183 return -1;
184 }
185 if (!(files->p->pool = hts_tpool_init(n_threads)))
186 return -1;
187
188 return 0;
189 }
190
bcf_sr_destroy_threads(bcf_srs_t * files)191 void bcf_sr_destroy_threads(bcf_srs_t *files) {
192 if (!files->p)
193 return;
194
195 if (files->p->pool)
196 hts_tpool_destroy(files->p->pool);
197 free(files->p);
198 }
199
bcf_sr_add_reader(bcf_srs_t * files,const char * fname)200 int bcf_sr_add_reader(bcf_srs_t *files, const char *fname)
201 {
202 htsFile* file_ptr = hts_open(fname, "r");
203 if ( ! file_ptr ) {
204 files->errnum = open_failed;
205 return 0;
206 }
207
208 files->has_line = (int*) realloc(files->has_line, sizeof(int)*(files->nreaders+1));
209 files->has_line[files->nreaders] = 0;
210 files->readers = (bcf_sr_t*) realloc(files->readers, sizeof(bcf_sr_t)*(files->nreaders+1));
211 bcf_sr_t *reader = &files->readers[files->nreaders++];
212 memset(reader,0,sizeof(bcf_sr_t));
213
214 reader->file = file_ptr;
215
216 files->errnum = 0;
217
218 if ( reader->file->format.compression==bgzf )
219 {
220 BGZF *bgzf = hts_get_bgzfp(reader->file);
221 if ( bgzf && bgzf_check_EOF(bgzf) == 0 ) {
222 files->errnum = no_eof;
223 fprintf(stderr,"[%s] Warning: no BGZF EOF marker; file may be truncated.\n", fname);
224 }
225 if (files->p)
226 bgzf_thread_pool(bgzf, files->p->pool, files->p->qsize);
227 }
228
229 if ( files->require_index )
230 {
231 if ( reader->file->format.format==vcf )
232 {
233 if ( reader->file->format.compression!=bgzf )
234 {
235 files->errnum = not_bgzf;
236 return 0;
237 }
238
239 reader->tbx_idx = tbx_index_load(fname);
240 if ( !reader->tbx_idx )
241 {
242 files->errnum = idx_load_failed;
243 return 0;
244 }
245
246 reader->header = bcf_hdr_read(reader->file);
247 }
248 else if ( reader->file->format.format==bcf )
249 {
250 if ( reader->file->format.compression!=bgzf )
251 {
252 files->errnum = not_bgzf;
253 return 0;
254 }
255
256 reader->header = bcf_hdr_read(reader->file);
257
258 reader->bcf_idx = bcf_index_load(fname);
259 if ( !reader->bcf_idx )
260 {
261 files->errnum = idx_load_failed;
262 return 0;
263 }
264 }
265 else
266 {
267 files->errnum = file_type_error;
268 return 0;
269 }
270 }
271 else
272 {
273 if ( reader->file->format.format==bcf || reader->file->format.format==vcf )
274 {
275 reader->header = bcf_hdr_read(reader->file);
276 }
277 else
278 {
279 files->errnum = file_type_error;
280 return 0;
281 }
282 files->streaming = 1;
283 }
284 if ( files->streaming && files->nreaders>1 )
285 {
286 files->errnum = api_usage_error;
287 fprintf(stderr,"[%s:%d %s] Error: %d readers, yet require_index not set\n", __FILE__,__LINE__,__FUNCTION__,files->nreaders);
288 return 0;
289 }
290 if ( files->streaming && files->regions )
291 {
292 files->errnum = api_usage_error;
293 fprintf(stderr,"[%s:%d %s] Error: cannot tabix-jump in streaming mode\n", __FILE__,__LINE__,__FUNCTION__);
294 return 0;
295 }
296 if ( !reader->header )
297 {
298 files->errnum = header_error;
299 return 0;
300 }
301
302 reader->fname = strdup(fname);
303 if ( files->apply_filters )
304 reader->filter_ids = init_filters(reader->header, files->apply_filters, &reader->nfilter_ids);
305
306 // Update list of chromosomes
307 if ( !files->explicit_regs && !files->streaming )
308 {
309 int n,i;
310 const char **names = reader->tbx_idx ? tbx_seqnames(reader->tbx_idx, &n) : bcf_hdr_seqnames(reader->header, &n);
311 for (i=0; i<n; i++)
312 {
313 if ( !files->regions )
314 files->regions = _regions_init_string(names[i]);
315 else
316 _regions_add(files->regions, names[i], -1, -1);
317 }
318 free(names);
319 }
320
321 return 1;
322 }
323
bcf_sr_init(void)324 bcf_srs_t *bcf_sr_init(void)
325 {
326 bcf_srs_t *files = (bcf_srs_t*) calloc(1,sizeof(bcf_srs_t));
327 files->aux = (aux_t*) calloc(1,sizeof(aux_t));
328 bcf_sr_sort_init(&BCF_SR_AUX(files)->sort);
329 return files;
330 }
331
bcf_sr_destroy1(bcf_sr_t * reader)332 static void bcf_sr_destroy1(bcf_sr_t *reader)
333 {
334 free(reader->fname);
335 if ( reader->tbx_idx ) tbx_destroy(reader->tbx_idx);
336 if ( reader->bcf_idx ) hts_idx_destroy(reader->bcf_idx);
337 bcf_hdr_destroy(reader->header);
338 hts_close(reader->file);
339 if ( reader->itr ) tbx_itr_destroy(reader->itr);
340 int j;
341 for (j=0; j<reader->mbuffer; j++)
342 bcf_destroy1(reader->buffer[j]);
343 free(reader->buffer);
344 free(reader->samples);
345 free(reader->filter_ids);
346 }
347
bcf_sr_destroy(bcf_srs_t * files)348 void bcf_sr_destroy(bcf_srs_t *files)
349 {
350 int i;
351 for (i=0; i<files->nreaders; i++)
352 bcf_sr_destroy1(&files->readers[i]);
353 free(files->has_line);
354 free(files->readers);
355 for (i=0; i<files->n_smpl; i++) free(files->samples[i]);
356 free(files->samples);
357 if (files->targets) bcf_sr_regions_destroy(files->targets);
358 if (files->regions) bcf_sr_regions_destroy(files->regions);
359 if (files->tmps.m) free(files->tmps.s);
360 if (files->n_threads) bcf_sr_destroy_threads(files);
361 bcf_sr_sort_destroy(&BCF_SR_AUX(files)->sort);
362 free(files->aux);
363 free(files);
364 }
365
bcf_sr_remove_reader(bcf_srs_t * files,int i)366 void bcf_sr_remove_reader(bcf_srs_t *files, int i)
367 {
368 assert( !files->samples ); // not ready for this yet
369 bcf_sr_sort_remove_reader(files, &BCF_SR_AUX(files)->sort, i);
370 bcf_sr_destroy1(&files->readers[i]);
371 if ( i+1 < files->nreaders )
372 {
373 memmove(&files->readers[i], &files->readers[i+1], (files->nreaders-i-1)*sizeof(bcf_sr_t));
374 memmove(&files->has_line[i], &files->has_line[i+1], (files->nreaders-i-1)*sizeof(int));
375 }
376 files->nreaders--;
377 }
378
379
debug_buffer(FILE * fp,bcf_sr_t * reader)380 void debug_buffer(FILE *fp, bcf_sr_t *reader)
381 {
382 int j;
383 for (j=0; j<=reader->nbuffer; j++)
384 {
385 bcf1_t *line = reader->buffer[j];
386 fprintf(fp,"\t%p\t%s%s\t%s:%d\t%s ", (void*)line,reader->fname,j==0?"*":" ",reader->header->id[BCF_DT_CTG][line->rid].key,line->pos+1,line->n_allele?line->d.allele[0]:"");
387 int k;
388 for (k=1; k<line->n_allele; k++) fprintf(fp," %s", line->d.allele[k]);
389 fprintf(fp,"\n");
390 }
391 }
392
debug_buffers(FILE * fp,bcf_srs_t * files)393 void debug_buffers(FILE *fp, bcf_srs_t *files)
394 {
395 int i;
396 for (i=0; i<files->nreaders; i++)
397 {
398 fprintf(fp, "has_line: %d\t%s\n", bcf_sr_has_line(files,i),files->readers[i].fname);
399 debug_buffer(fp, &files->readers[i]);
400 }
401 fprintf(fp,"\n");
402 }
403
has_filter(bcf_sr_t * reader,bcf1_t * line)404 static inline int has_filter(bcf_sr_t *reader, bcf1_t *line)
405 {
406 int i, j;
407 if ( !line->d.n_flt )
408 {
409 for (j=0; j<reader->nfilter_ids; j++)
410 if ( reader->filter_ids[j]<0 ) return 1;
411 return 0;
412 }
413 for (i=0; i<line->d.n_flt; i++)
414 {
415 for (j=0; j<reader->nfilter_ids; j++)
416 if ( line->d.flt[i]==reader->filter_ids[j] ) return 1;
417 }
418 return 0;
419 }
420
_reader_seek(bcf_sr_t * reader,const char * seq,int start,int end)421 static int _reader_seek(bcf_sr_t *reader, const char *seq, int start, int end)
422 {
423 if ( end>=MAX_CSI_COOR )
424 {
425 fprintf(stderr,"The coordinate is out of csi index limit: %d\n", end+1);
426 exit(1);
427 }
428 if ( reader->itr )
429 {
430 hts_itr_destroy(reader->itr);
431 reader->itr = NULL;
432 }
433 reader->nbuffer = 0;
434 if ( reader->tbx_idx )
435 {
436 int tid = tbx_name2id(reader->tbx_idx, seq);
437 if ( tid==-1 ) return -1; // the sequence not present in this file
438 reader->itr = tbx_itr_queryi(reader->tbx_idx,tid,start,end+1);
439 }
440 else
441 {
442 int tid = bcf_hdr_name2id(reader->header, seq);
443 if ( tid==-1 ) return -1; // the sequence not present in this file
444 reader->itr = bcf_itr_queryi(reader->bcf_idx,tid,start,end+1);
445 }
446 if ( !reader->itr ) fprintf(stderr,"Could not seek: %s:%d-%d\n",seq,start+1,end+1);
447 assert(reader->itr);
448 return 0;
449 }
450
451 /*
452 * _readers_next_region() - jumps to next region if necessary
453 * Returns 0 on success or -1 when there are no more regions left
454 */
_readers_next_region(bcf_srs_t * files)455 static int _readers_next_region(bcf_srs_t *files)
456 {
457 // Need to open new chromosome? Check number of lines in all readers' buffers
458 int i, eos = 0;
459 for (i=0; i<files->nreaders; i++)
460 if ( !files->readers[i].itr && !files->readers[i].nbuffer ) eos++;
461
462 if ( eos!=files->nreaders )
463 {
464 // Some of the readers still has buffered lines
465 return 0;
466 }
467
468 // No lines in the buffer, need to open new region or quit
469 if ( bcf_sr_regions_next(files->regions)<0 ) return -1;
470
471 for (i=0; i<files->nreaders; i++)
472 _reader_seek(&files->readers[i],files->regions->seq_names[files->regions->iseq],files->regions->start,files->regions->end);
473
474 return 0;
475 }
476
477 /*
478 * _reader_fill_buffer() - buffers all records with the same coordinate
479 */
_reader_fill_buffer(bcf_srs_t * files,bcf_sr_t * reader)480 static void _reader_fill_buffer(bcf_srs_t *files, bcf_sr_t *reader)
481 {
482 // Return if the buffer is full: the coordinate of the last buffered record differs
483 if ( reader->nbuffer && reader->buffer[reader->nbuffer]->pos != reader->buffer[1]->pos ) return;
484
485 // No iterator (sequence not present in this file) and not streaming
486 if ( !reader->itr && !files->streaming ) return;
487
488 // Fill the buffer with records starting at the same position
489 int i, ret = 0;
490 while (1)
491 {
492 if ( reader->nbuffer+1 >= reader->mbuffer )
493 {
494 // Increase buffer size
495 reader->mbuffer += 8;
496 reader->buffer = (bcf1_t**) realloc(reader->buffer, sizeof(bcf1_t*)*reader->mbuffer);
497 for (i=8; i>0; i--) // initialize
498 {
499 reader->buffer[reader->mbuffer-i] = bcf_init1();
500 reader->buffer[reader->mbuffer-i]->max_unpack = files->max_unpack;
501 reader->buffer[reader->mbuffer-i]->pos = -1; // for rare cases when VCF starts from 1
502 }
503 }
504 if ( files->streaming )
505 {
506 if ( reader->file->format.format==vcf )
507 {
508 if ( (ret=hts_getline(reader->file, KS_SEP_LINE, &files->tmps)) < 0 ) break; // no more lines
509 ret = vcf_parse1(&files->tmps, reader->header, reader->buffer[reader->nbuffer+1]);
510 if ( ret<0 ) { files->errnum = vcf_parse_error; break; }
511 }
512 else if ( reader->file->format.format==bcf )
513 {
514 ret = bcf_read1(reader->file, reader->header, reader->buffer[reader->nbuffer+1]);
515 if ( ret < -1 ) files->errnum = bcf_read_error;
516 if ( ret < 0 ) break; // no more lines or an error
517 }
518 else
519 {
520 fprintf(stderr,"[%s:%d %s] fixme: not ready for this\n", __FILE__,__LINE__,__FUNCTION__);
521 exit(1);
522 }
523 }
524 else if ( reader->tbx_idx )
525 {
526 if ( (ret=tbx_itr_next(reader->file, reader->tbx_idx, reader->itr, &files->tmps)) < 0 ) break; // no more lines
527 ret = vcf_parse1(&files->tmps, reader->header, reader->buffer[reader->nbuffer+1]);
528 if ( ret<0 ) { files->errnum = vcf_parse_error; break; }
529 }
530 else
531 {
532 ret = bcf_itr_next(reader->file, reader->itr, reader->buffer[reader->nbuffer+1]);
533 if ( ret < -1 ) files->errnum = bcf_read_error;
534 if ( ret < 0 ) break; // no more lines or an error
535 bcf_subset_format(reader->header,reader->buffer[reader->nbuffer+1]);
536 }
537
538 // apply filter
539 if ( !reader->nfilter_ids )
540 bcf_unpack(reader->buffer[reader->nbuffer+1], BCF_UN_STR);
541 else
542 {
543 bcf_unpack(reader->buffer[reader->nbuffer+1], BCF_UN_STR|BCF_UN_FLT);
544 if ( !has_filter(reader, reader->buffer[reader->nbuffer+1]) ) continue;
545 }
546 reader->nbuffer++;
547
548 if ( reader->buffer[reader->nbuffer]->pos != reader->buffer[1]->pos ) break; // the buffer is full
549 }
550 if ( ret<0 )
551 {
552 // done for this region
553 tbx_itr_destroy(reader->itr);
554 reader->itr = NULL;
555 }
556 }
557
558 /*
559 * _readers_shift_buffer() - removes the first line and all subsequent lines with the same position
560 */
_reader_shift_buffer(bcf_sr_t * reader)561 static void _reader_shift_buffer(bcf_sr_t *reader)
562 {
563 int i;
564 for (i=2; i<=reader->nbuffer; i++)
565 if ( reader->buffer[i]->pos!=reader->buffer[1]->pos ) break;
566 if ( i<=reader->nbuffer )
567 {
568 // A record with a different position follows, swap it. Because of the reader's logic,
569 // only one such line can be present.
570 bcf1_t *tmp = reader->buffer[1]; reader->buffer[1] = reader->buffer[i]; reader->buffer[i] = tmp;
571 reader->nbuffer = 1;
572 }
573 else
574 reader->nbuffer = 0; // no other line
575 }
576
_reader_next_line(bcf_srs_t * files)577 int _reader_next_line(bcf_srs_t *files)
578 {
579 int i, min_pos = INT_MAX;
580 const char *chr = NULL;
581
582 // Loop until next suitable line is found or all readers have finished
583 while ( 1 )
584 {
585 // Get all readers ready for the next region.
586 if ( files->regions && _readers_next_region(files)<0 ) break;
587
588 // Fill buffers
589 for (i=0; i<files->nreaders; i++)
590 {
591 _reader_fill_buffer(files, &files->readers[i]);
592
593 // Update the minimum coordinate
594 if ( !files->readers[i].nbuffer ) continue;
595 if ( min_pos > files->readers[i].buffer[1]->pos )
596 {
597 min_pos = files->readers[i].buffer[1]->pos;
598 chr = bcf_seqname(files->readers[i].header, files->readers[i].buffer[1]);
599 }
600 }
601 if ( min_pos==INT_MAX )
602 {
603 if ( !files->regions ) break;
604 continue;
605 }
606
607 // Skip this position if not present in targets
608 if ( files->targets )
609 {
610 int ret = bcf_sr_regions_overlap(files->targets, chr, min_pos, min_pos);
611 if ( (!files->targets_exclude && ret<0) || (files->targets_exclude && !ret) )
612 {
613 // Remove all lines with this position from the buffer
614 for (i=0; i<files->nreaders; i++)
615 if ( files->readers[i].nbuffer && files->readers[i].buffer[1]->pos==min_pos )
616 _reader_shift_buffer(&files->readers[i]);
617 min_pos = INT_MAX;
618 chr = NULL;
619 continue;
620 }
621 }
622
623 break; // done: chr and min_pos are set
624 }
625 if ( !chr ) return 0;
626
627 return bcf_sr_sort_next(files, &BCF_SR_AUX(files)->sort, chr, min_pos);
628 }
629
bcf_sr_next_line(bcf_srs_t * files)630 int bcf_sr_next_line(bcf_srs_t *files)
631 {
632 if ( !files->targets_als )
633 return _reader_next_line(files);
634
635 while (1)
636 {
637 int i, ret = _reader_next_line(files);
638 if ( !ret ) return ret;
639
640 for (i=0; i<files->nreaders; i++)
641 if ( files->has_line[i] ) break;
642
643 if ( _regions_match_alleles(files->targets, files->targets_als-1, files->readers[i].buffer[0]) ) return ret;
644
645 // Check if there are more duplicate lines in the buffers. If not, return this line as if it
646 // matched the targets, even if there is a type mismatch
647 for (i=0; i<files->nreaders; i++)
648 {
649 if ( !files->has_line[i] ) continue;
650 if ( files->readers[i].nbuffer==0 || files->readers[i].buffer[1]->pos!=files->readers[i].buffer[0]->pos ) continue;
651 break;
652 }
653 if ( i==files->nreaders ) return ret; // no more lines left, output even if target alleles are not of the same type
654 }
655 }
656
bcf_sr_seek_start(bcf_srs_t * readers)657 static void bcf_sr_seek_start(bcf_srs_t *readers)
658 {
659 bcf_sr_regions_t *reg = readers->regions;
660 int i;
661 for (i=0; i<reg->nseqs; i++)
662 reg->regs[i].creg = -1;
663 reg->iseq = 0;
664 }
665
666
bcf_sr_seek(bcf_srs_t * readers,const char * seq,int pos)667 int bcf_sr_seek(bcf_srs_t *readers, const char *seq, int pos)
668 {
669 if ( !readers->regions ) return 0;
670 if ( !seq && !pos )
671 {
672 // seek to start
673 bcf_sr_seek_start(readers);
674 return 0;
675 }
676 bcf_sr_regions_overlap(readers->regions, seq, pos, pos);
677 int i, nret = 0;
678 for (i=0; i<readers->nreaders; i++)
679 {
680 nret += _reader_seek(&readers->readers[i],seq,pos,MAX_CSI_COOR-1);
681 }
682 return nret;
683 }
684
bcf_sr_set_samples(bcf_srs_t * files,const char * fname,int is_file)685 int bcf_sr_set_samples(bcf_srs_t *files, const char *fname, int is_file)
686 {
687 int i, j, nsmpl, free_smpl = 0;
688 char **smpl = NULL;
689
690 void *exclude = (fname[0]=='^') ? khash_str2int_init() : NULL;
691 if ( exclude || strcmp("-",fname) ) // "-" stands for all samples
692 {
693 smpl = hts_readlist(fname, is_file, &nsmpl);
694 if ( !smpl )
695 {
696 fprintf(stderr,"Could not read the file: \"%s\"\n", fname);
697 return 0;
698 }
699 if ( exclude )
700 {
701 for (i=0; i<nsmpl; i++)
702 khash_str2int_inc(exclude, smpl[i]);
703 }
704 free_smpl = 1;
705 }
706 if ( !smpl )
707 {
708 smpl = files->readers[0].header->samples; // intersection of all samples
709 nsmpl = bcf_hdr_nsamples(files->readers[0].header);
710 }
711
712 files->samples = NULL;
713 files->n_smpl = 0;
714 for (i=0; i<nsmpl; i++)
715 {
716 if ( exclude && khash_str2int_has_key(exclude,smpl[i]) ) continue;
717
718 int n_isec = 0;
719 for (j=0; j<files->nreaders; j++)
720 {
721 if ( bcf_hdr_id2int(files->readers[j].header, BCF_DT_SAMPLE, smpl[i])<0 ) break;
722 n_isec++;
723 }
724 if ( n_isec!=files->nreaders )
725 {
726 fprintf(stderr,"Warning: The sample \"%s\" was not found in %s, skipping\n", smpl[i], files->readers[n_isec].fname);
727 continue;
728 }
729
730 files->samples = (char**) realloc(files->samples, (files->n_smpl+1)*sizeof(const char*));
731 files->samples[files->n_smpl++] = strdup(smpl[i]);
732 }
733
734 if ( exclude ) khash_str2int_destroy(exclude);
735 if ( free_smpl )
736 {
737 for (i=0; i<nsmpl; i++) free(smpl[i]);
738 free(smpl);
739 }
740
741 if ( !files->n_smpl )
742 {
743 if ( files->nreaders>1 )
744 fprintf(stderr,"No samples in common.\n");
745 return 0;
746 }
747 for (i=0; i<files->nreaders; i++)
748 {
749 bcf_sr_t *reader = &files->readers[i];
750 reader->samples = (int*) malloc(sizeof(int)*files->n_smpl);
751 reader->n_smpl = files->n_smpl;
752 for (j=0; j<files->n_smpl; j++)
753 reader->samples[j] = bcf_hdr_id2int(reader->header, BCF_DT_SAMPLE, files->samples[j]);
754 }
755 return 1;
756 }
757
758 // Add a new region into a list sorted by start,end. On input the coordinates
759 // are 1-based, stored 0-based, inclusive.
_regions_add(bcf_sr_regions_t * reg,const char * chr,int start,int end)760 static void _regions_add(bcf_sr_regions_t *reg, const char *chr, int start, int end)
761 {
762 if ( start==-1 && end==-1 )
763 {
764 start = 0; end = MAX_CSI_COOR-1;
765 }
766 else
767 {
768 start--; end--; // store 0-based coordinates
769 }
770
771 if ( !reg->seq_hash )
772 reg->seq_hash = khash_str2int_init();
773
774 int iseq;
775 if ( khash_str2int_get(reg->seq_hash, chr, &iseq)<0 )
776 {
777 // the chromosome block does not exist
778 iseq = reg->nseqs++;
779 reg->seq_names = (char**) realloc(reg->seq_names,sizeof(char*)*reg->nseqs);
780 reg->regs = (region_t*) realloc(reg->regs,sizeof(region_t)*reg->nseqs);
781 memset(®->regs[reg->nseqs-1],0,sizeof(region_t));
782 reg->seq_names[iseq] = strdup(chr);
783 reg->regs[iseq].creg = -1;
784 khash_str2int_set(reg->seq_hash,reg->seq_names[iseq],iseq);
785 }
786
787 region_t *creg = ®->regs[iseq];
788
789 // the regions may not be sorted on input: binary search
790 int i, min = 0, max = creg->nregs - 1;
791 while ( min<=max )
792 {
793 i = (max+min)/2;
794 if ( start < creg->regs[i].start ) max = i - 1;
795 else if ( start > creg->regs[i].start ) min = i + 1;
796 else break;
797 }
798 if ( min>max || creg->regs[i].start!=start || creg->regs[i].end!=end )
799 {
800 // no such region, insert a new one just after max
801 hts_expand(region1_t,creg->nregs+1,creg->mregs,creg->regs);
802 if ( ++max < creg->nregs )
803 memmove(&creg->regs[max+1],&creg->regs[max],(creg->nregs - max)*sizeof(region1_t));
804 creg->regs[max].start = start;
805 creg->regs[max].end = end;
806 creg->nregs++;
807 }
808 }
809
810 // File name or a list of genomic locations. If file name, NULL is returned.
_regions_init_string(const char * str)811 static bcf_sr_regions_t *_regions_init_string(const char *str)
812 {
813 bcf_sr_regions_t *reg = (bcf_sr_regions_t *) calloc(1, sizeof(bcf_sr_regions_t));
814 reg->start = reg->end = -1;
815 reg->prev_start = reg->prev_seq = -1;
816
817 kstring_t tmp = {0,0,0};
818 const char *sp = str, *ep = str;
819 int from, to;
820 while ( 1 )
821 {
822 while ( *ep && *ep!=',' && *ep!=':' ) ep++;
823 tmp.l = 0;
824 kputsn(sp,ep-sp,&tmp);
825 if ( *ep==':' )
826 {
827 sp = ep+1;
828 from = hts_parse_decimal(sp,(char**)&ep,0);
829 if ( sp==ep )
830 {
831 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
832 free(reg); free(tmp.s); return NULL;
833 }
834 if ( !*ep || *ep==',' )
835 {
836 _regions_add(reg, tmp.s, from, from);
837 sp = ep;
838 continue;
839 }
840 if ( *ep!='-' )
841 {
842 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
843 free(reg); free(tmp.s); return NULL;
844 }
845 ep++;
846 sp = ep;
847 to = hts_parse_decimal(sp,(char**)&ep,0);
848 if ( *ep && *ep!=',' )
849 {
850 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
851 free(reg); free(tmp.s); return NULL;
852 }
853 if ( sp==ep ) to = MAX_CSI_COOR-1;
854 _regions_add(reg, tmp.s, from, to);
855 if ( !*ep ) break;
856 sp = ep;
857 }
858 else
859 {
860 if ( tmp.l ) _regions_add(reg, tmp.s, -1, -1);
861 if ( !*ep ) break;
862 sp = ++ep;
863 }
864 }
865 free(tmp.s);
866 return reg;
867 }
868
869 // ichr,ifrom,ito are 0-based;
870 // returns -1 on error, 0 if the line is a comment line, 1 on success
_regions_parse_line(char * line,int ichr,int ifrom,int ito,char ** chr,char ** chr_end,int * from,int * to)871 static int _regions_parse_line(char *line, int ichr,int ifrom,int ito, char **chr,char **chr_end,int *from,int *to)
872 {
873 *chr_end = NULL;
874
875 if ( line[0]=='#' ) return 0;
876
877 int k,l; // index of the start and end column of the tab-delimited file
878 if ( ifrom <= ito )
879 k = ifrom, l = ito;
880 else
881 l = ifrom, k = ito;
882
883 int i;
884 char *se = line, *ss = NULL; // start and end
885 char *tmp;
886 for (i=0; i<=k && *se; i++)
887 {
888 ss = i==0 ? se++ : ++se;
889 while (*se && *se!='\t') se++;
890 }
891 if ( i<=k ) return -1;
892 if ( k==l )
893 {
894 *from = *to = hts_parse_decimal(ss, &tmp, 0);
895 if ( tmp==ss ) return -1;
896 }
897 else
898 {
899 if ( k==ifrom )
900 *from = hts_parse_decimal(ss, &tmp, 0);
901 else
902 *to = hts_parse_decimal(ss, &tmp, 0);
903 if ( ss==tmp ) return -1;
904
905 for (i=k; i<l && *se; i++)
906 {
907 ss = ++se;
908 while (*se && *se!='\t') se++;
909 }
910 if ( i<l ) return -1;
911 if ( k==ifrom )
912 *to = hts_parse_decimal(ss, &tmp, 0);
913 else
914 *from = hts_parse_decimal(ss, &tmp, 0);
915 if ( ss==tmp ) return -1;
916 }
917
918 ss = se = line;
919 for (i=0; i<=ichr && *se; i++)
920 {
921 if ( i>0 ) ss = ++se;
922 while (*se && *se!='\t') se++;
923 }
924 if ( i<=ichr ) return -1;
925 *chr_end = se;
926 *chr = ss;
927 return 1;
928 }
929
bcf_sr_regions_init(const char * regions,int is_file,int ichr,int ifrom,int ito)930 bcf_sr_regions_t *bcf_sr_regions_init(const char *regions, int is_file, int ichr, int ifrom, int ito)
931 {
932 bcf_sr_regions_t *reg;
933 if ( !is_file ) return _regions_init_string(regions);
934
935 reg = (bcf_sr_regions_t *) calloc(1, sizeof(bcf_sr_regions_t));
936 reg->start = reg->end = -1;
937 reg->prev_start = reg->prev_seq = -1;
938
939 reg->file = hts_open(regions, "rb");
940 if ( !reg->file )
941 {
942 fprintf(stderr,"[%s:%d %s] Could not open file: %s\n", __FILE__,__LINE__,__FUNCTION__,regions);
943 free(reg);
944 return NULL;
945 }
946
947 reg->tbx = tbx_index_load(regions);
948 if ( !reg->tbx )
949 {
950 int len = strlen(regions);
951 int is_bed = strcasecmp(".bed",regions+len-4) ? 0 : 1;
952 if ( !is_bed && !strcasecmp(".bed.gz",regions+len-7) ) is_bed = 1;
953
954 if ( reg->file->format.format==vcf ) ito = 1;
955
956 // read the whole file, tabix index is not present
957 while ( hts_getline(reg->file, KS_SEP_LINE, ®->line) > 0 )
958 {
959 char *chr, *chr_end;
960 int from, to, ret;
961 ret = _regions_parse_line(reg->line.s, ichr,ifrom,abs(ito), &chr,&chr_end,&from,&to);
962 if ( ret < 0 )
963 {
964 if ( ito<0 )
965 ret = _regions_parse_line(reg->line.s, ichr,ifrom,ifrom, &chr,&chr_end,&from,&to);
966 if ( ret<0 )
967 {
968 fprintf(stderr,"[%s:%d] Could not parse the file %s, using the columns %d,%d[,%d]\n", __FILE__,__LINE__,regions,ichr+1,ifrom+1,ito+1);
969 hts_close(reg->file); reg->file = NULL; free(reg);
970 return NULL;
971 }
972 }
973 if ( !ret ) continue;
974 if ( is_bed ) from++;
975 *chr_end = 0;
976 _regions_add(reg, chr, from, to);
977 *chr_end = '\t';
978 }
979 hts_close(reg->file); reg->file = NULL;
980 if ( !reg->nseqs ) { free(reg); return NULL; }
981 return reg;
982 }
983
984 reg->seq_names = (char**) tbx_seqnames(reg->tbx, ®->nseqs);
985 if ( !reg->seq_hash )
986 reg->seq_hash = khash_str2int_init();
987 int i;
988 for (i=0; i<reg->nseqs; i++)
989 {
990 khash_str2int_set(reg->seq_hash,reg->seq_names[i],i);
991 }
992 reg->fname = strdup(regions);
993 reg->is_bin = 1;
994 return reg;
995 }
996
bcf_sr_regions_destroy(bcf_sr_regions_t * reg)997 void bcf_sr_regions_destroy(bcf_sr_regions_t *reg)
998 {
999 int i;
1000 free(reg->fname);
1001 if ( reg->itr ) tbx_itr_destroy(reg->itr);
1002 if ( reg->tbx ) tbx_destroy(reg->tbx);
1003 if ( reg->file ) hts_close(reg->file);
1004 if ( reg->als ) free(reg->als);
1005 if ( reg->als_str.s ) free(reg->als_str.s);
1006 free(reg->line.s);
1007 if ( reg->regs )
1008 {
1009 // free only in-memory names, tbx names are const
1010 for (i=0; i<reg->nseqs; i++)
1011 {
1012 free(reg->seq_names[i]);
1013 free(reg->regs[i].regs);
1014 }
1015 }
1016 free(reg->regs);
1017 free(reg->seq_names);
1018 khash_str2int_destroy(reg->seq_hash);
1019 free(reg);
1020 }
1021
bcf_sr_regions_seek(bcf_sr_regions_t * reg,const char * seq)1022 int bcf_sr_regions_seek(bcf_sr_regions_t *reg, const char *seq)
1023 {
1024 reg->iseq = reg->start = reg->end = -1;
1025 if ( khash_str2int_get(reg->seq_hash, seq, ®->iseq) < 0 ) return -1; // sequence seq not in regions
1026
1027 // using in-memory regions
1028 if ( reg->regs )
1029 {
1030 reg->regs[reg->iseq].creg = -1;
1031 return 0;
1032 }
1033
1034 // reading regions from tabix
1035 if ( reg->itr ) tbx_itr_destroy(reg->itr);
1036 reg->itr = tbx_itr_querys(reg->tbx, seq);
1037 if ( reg->itr ) return 0;
1038
1039 return -1;
1040 }
1041
bcf_sr_regions_next(bcf_sr_regions_t * reg)1042 int bcf_sr_regions_next(bcf_sr_regions_t *reg)
1043 {
1044 if ( reg->iseq<0 ) return -1;
1045 reg->start = reg->end = -1;
1046 reg->nals = 0;
1047
1048 // using in-memory regions
1049 if ( reg->regs )
1050 {
1051 while ( reg->iseq < reg->nseqs )
1052 {
1053 reg->regs[reg->iseq].creg++;
1054 if ( reg->regs[reg->iseq].creg < reg->regs[reg->iseq].nregs ) break;
1055 reg->iseq++;
1056 }
1057 if ( reg->iseq >= reg->nseqs ) { reg->iseq = -1; return -1; } // no more regions left
1058 region1_t *creg = ®->regs[reg->iseq].regs[reg->regs[reg->iseq].creg];
1059 reg->start = creg->start;
1060 reg->end = creg->end;
1061 return 0;
1062 }
1063
1064 // reading from tabix
1065 char *chr, *chr_end;
1066 int ichr = 0, ifrom = 1, ito = 2, is_bed = 0, from, to;
1067 if ( reg->tbx )
1068 {
1069 ichr = reg->tbx->conf.sc-1;
1070 ifrom = reg->tbx->conf.bc-1;
1071 ito = reg->tbx->conf.ec-1;
1072 if ( ito<0 ) ito = ifrom;
1073 is_bed = reg->tbx->conf.preset==TBX_UCSC ? 1 : 0;
1074 }
1075
1076 int ret = 0;
1077 while ( !ret )
1078 {
1079 if ( reg->itr )
1080 {
1081 // tabix index present, reading a chromosome block
1082 ret = tbx_itr_next(reg->file, reg->tbx, reg->itr, ®->line);
1083 if ( ret<0 ) { reg->iseq = -1; return -1; }
1084 }
1085 else
1086 {
1087 if ( reg->is_bin )
1088 {
1089 // Waited for seek which never came. Reopen in text mode and stream
1090 // through the regions, otherwise hts_getline would fail
1091 hts_close(reg->file);
1092 reg->file = hts_open(reg->fname, "r");
1093 if ( !reg->file )
1094 {
1095 fprintf(stderr,"[%s:%d %s] Could not open file: %s\n", __FILE__,__LINE__,__FUNCTION__,reg->fname);
1096 reg->file = NULL;
1097 bcf_sr_regions_destroy(reg);
1098 return -1;
1099 }
1100 reg->is_bin = 0;
1101 }
1102
1103 // tabix index absent, reading the whole file
1104 ret = hts_getline(reg->file, KS_SEP_LINE, ®->line);
1105 if ( ret<0 ) { reg->iseq = -1; return -1; }
1106 }
1107 ret = _regions_parse_line(reg->line.s, ichr,ifrom,ito, &chr,&chr_end,&from,&to);
1108 if ( ret<0 )
1109 {
1110 fprintf(stderr,"[%s:%d] Could not parse the file %s, using the columns %d,%d,%d\n", __FILE__,__LINE__,reg->fname,ichr+1,ifrom+1,ito+1);
1111 return -1;
1112 }
1113 }
1114 if ( is_bed ) from++;
1115
1116 *chr_end = 0;
1117 if ( khash_str2int_get(reg->seq_hash, chr, ®->iseq)<0 )
1118 {
1119 fprintf(stderr,"Broken tabix index? The sequence \"%s\" not in dictionary [%s]\n", chr,reg->line.s);
1120 exit(1);
1121 }
1122 *chr_end = '\t';
1123
1124 reg->start = from - 1;
1125 reg->end = to - 1;
1126 return 0;
1127 }
1128
_regions_match_alleles(bcf_sr_regions_t * reg,int als_idx,bcf1_t * rec)1129 static int _regions_match_alleles(bcf_sr_regions_t *reg, int als_idx, bcf1_t *rec)
1130 {
1131 if ( reg->regs )
1132 {
1133 // payload is not supported for in-memory regions, switch to regidx instead in future
1134 fprintf(stderr,"Error: Compressed and indexed targets file is required\n");
1135 exit(1);
1136 }
1137
1138 int i = 0, max_len = 0;
1139 if ( !reg->nals )
1140 {
1141 char *ss = reg->line.s;
1142 while ( i<als_idx && *ss )
1143 {
1144 if ( *ss=='\t' ) i++;
1145 ss++;
1146 }
1147 char *se = ss;
1148 reg->nals = 1;
1149 while ( *se && *se!='\t' )
1150 {
1151 if ( *se==',' ) reg->nals++;
1152 se++;
1153 }
1154 ks_resize(®->als_str, se-ss+1+reg->nals);
1155 reg->als_str.l = 0;
1156 hts_expand(char*,reg->nals,reg->mals,reg->als);
1157 reg->nals = 0;
1158
1159 se = ss;
1160 while ( *(++se) )
1161 {
1162 if ( *se=='\t' ) break;
1163 if ( *se!=',' ) continue;
1164 reg->als[reg->nals] = ®->als_str.s[reg->als_str.l];
1165 kputsn(ss,se-ss,®->als_str);
1166 if ( ®->als_str.s[reg->als_str.l] - reg->als[reg->nals] > max_len ) max_len = ®->als_str.s[reg->als_str.l] - reg->als[reg->nals];
1167 reg->als_str.l++;
1168 reg->nals++;
1169 ss = ++se;
1170 }
1171 reg->als[reg->nals] = ®->als_str.s[reg->als_str.l];
1172 kputsn(ss,se-ss,®->als_str);
1173 if ( ®->als_str.s[reg->als_str.l] - reg->als[reg->nals] > max_len ) max_len = ®->als_str.s[reg->als_str.l] - reg->als[reg->nals];
1174 reg->nals++;
1175 reg->als_type = max_len > 1 ? VCF_INDEL : VCF_SNP; // this is a simplified check, see vcf.c:bcf_set_variant_types
1176 }
1177 int type = bcf_get_variant_types(rec);
1178 if ( reg->als_type & VCF_INDEL )
1179 return type & VCF_INDEL ? 1 : 0;
1180 return !(type & VCF_INDEL) ? 1 : 0;
1181 }
1182
bcf_sr_regions_overlap(bcf_sr_regions_t * reg,const char * seq,int start,int end)1183 int bcf_sr_regions_overlap(bcf_sr_regions_t *reg, const char *seq, int start, int end)
1184 {
1185 int iseq;
1186 if ( khash_str2int_get(reg->seq_hash, seq, &iseq)<0 ) return -1; // no such sequence
1187
1188 if ( reg->prev_seq==-1 || iseq!=reg->prev_seq || reg->prev_start > start ) // new chromosome or after a seek
1189 {
1190 // flush regions left on previous chromosome
1191 if ( reg->missed_reg_handler && reg->prev_seq!=-1 && reg->iseq!=-1 )
1192 bcf_sr_regions_flush(reg);
1193
1194 bcf_sr_regions_seek(reg, seq);
1195 reg->start = reg->end = -1;
1196 }
1197 if ( reg->prev_seq==iseq && reg->iseq!=iseq ) return -2; // no more regions on this chromosome
1198 reg->prev_seq = reg->iseq;
1199 reg->prev_start = start;
1200
1201 while ( iseq==reg->iseq && reg->end < start )
1202 {
1203 if ( bcf_sr_regions_next(reg) < 0 ) return -2; // no more regions left
1204 if ( reg->iseq != iseq ) return -1; // does not overlap any regions
1205 if ( reg->missed_reg_handler && reg->end < start ) reg->missed_reg_handler(reg, reg->missed_reg_data);
1206 }
1207 if ( reg->start <= end ) return 0; // region overlap
1208 return -1; // no overlap
1209 }
1210
bcf_sr_regions_flush(bcf_sr_regions_t * reg)1211 void bcf_sr_regions_flush(bcf_sr_regions_t *reg)
1212 {
1213 if ( !reg->missed_reg_handler || reg->prev_seq==-1 ) return;
1214 while ( !bcf_sr_regions_next(reg) ) reg->missed_reg_handler(reg, reg->missed_reg_data);
1215 return;
1216 }
1217
1218