1 /*  synced_bcf_reader.c -- stream through multiple VCF files.
2 
3     Copyright (C) 2012-2014 Genome Research Ltd.
4 
5     Author: Petr Danecek <pd3@sanger.ac.uk>
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE.  */
24 
25 #include <config.h>
26 
27 #include <stdio.h>
28 #include <unistd.h>
29 #include <string.h>
30 #include <strings.h>
31 #include <limits.h>
32 #include <errno.h>
33 #include <sys/stat.h>
34 #include "htslib/synced_bcf_reader.h"
35 #include "htslib/kseq.h"
36 #include "htslib/khash_str2int.h"
37 #include "htslib/bgzf.h"
38 #include "htslib/thread_pool.h"
39 #include "bcf_sr_sort.h"
40 
41 #define MAX_CSI_COOR 0x7fffffff     // maximum indexable coordinate of .csi
42 
43 typedef struct
44 {
45     uint32_t start, end;
46 }
47 region1_t;
48 
49 typedef struct _region_t
50 {
51     region1_t *regs;
52     int nregs, mregs, creg;
53 }
54 region_t;
55 
56 #define BCF_SR_AUX(x) ((aux_t*)((x)->aux))
57 typedef struct
58 {
59     sr_sort_t sort;
60 }
61 aux_t;
62 
63 static void _regions_add(bcf_sr_regions_t *reg, const char *chr, int start, int end);
64 static bcf_sr_regions_t *_regions_init_string(const char *str);
65 static int _regions_match_alleles(bcf_sr_regions_t *reg, int als_idx, bcf1_t *rec);
66 
bcf_sr_strerror(int errnum)67 char *bcf_sr_strerror(int errnum)
68 {
69     switch (errnum)
70     {
71         case open_failed:
72             return strerror(errno); break;
73         case not_bgzf:
74             return "not compressed with bgzip"; break;
75         case idx_load_failed:
76             return "could not load index"; break;
77         case file_type_error:
78             return "unknown file type"; break;
79         case api_usage_error:
80             return "API usage error"; break;
81         case header_error:
82             return "could not parse header"; break;
83         case no_eof:
84             return "no BGZF EOF marker; file may be truncated"; break;
85         case no_memory:
86             return "Out of memory"; break;
87         case vcf_parse_error:
88             return "VCF parse error"; break;
89         case bcf_read_error:
90             return "BCF read error"; break;
91         default: return "";
92     }
93 }
94 
bcf_sr_set_opt(bcf_srs_t * readers,bcf_sr_opt_t opt,...)95 int bcf_sr_set_opt(bcf_srs_t *readers, bcf_sr_opt_t opt, ...)
96 {
97     va_list args;
98     switch (opt)
99     {
100         case BCF_SR_REQUIRE_IDX:
101             readers->require_index = 1;
102             return 0;
103 
104         case BCF_SR_PAIR_LOGIC:
105             va_start(args, opt);
106             BCF_SR_AUX(readers)->sort.pair = va_arg(args, int);
107             return 0;
108 
109         default:
110             break;
111     }
112     return 1;
113 }
114 
init_filters(bcf_hdr_t * hdr,const char * filters,int * nfilters)115 static int *init_filters(bcf_hdr_t *hdr, const char *filters, int *nfilters)
116 {
117     kstring_t str = {0,0,0};
118     const char *tmp = filters, *prev = filters;
119     int nout = 0, *out = NULL;
120     while ( 1 )
121     {
122         if ( *tmp==',' || !*tmp )
123         {
124             out = (int*) realloc(out, (nout+1)*sizeof(int));
125             if ( tmp-prev==1 && *prev=='.' )
126             {
127                 out[nout] = -1;
128                 nout++;
129             }
130             else
131             {
132                 str.l = 0;
133                 kputsn(prev, tmp-prev, &str);
134                 out[nout] = bcf_hdr_id2int(hdr, BCF_DT_ID, str.s);
135                 if ( out[nout]>=0 ) nout++;
136             }
137             if ( !*tmp ) break;
138             prev = tmp+1;
139         }
140         tmp++;
141     }
142     if ( str.m ) free(str.s);
143     *nfilters = nout;
144     return out;
145 }
146 
bcf_sr_set_regions(bcf_srs_t * readers,const char * regions,int is_file)147 int bcf_sr_set_regions(bcf_srs_t *readers, const char *regions, int is_file)
148 {
149     assert( !readers->regions );
150     if ( readers->nreaders )
151     {
152         fprintf(stderr,"[%s:%d %s] Error: bcf_sr_set_regions() must be called before bcf_sr_add_reader()\n", __FILE__,__LINE__,__FUNCTION__);
153         return -1;
154     }
155     readers->regions = bcf_sr_regions_init(regions,is_file,0,1,-2);
156     if ( !readers->regions ) return -1;
157     readers->explicit_regs = 1;
158     readers->require_index = 1;
159     return 0;
160 }
bcf_sr_set_targets(bcf_srs_t * readers,const char * targets,int is_file,int alleles)161 int bcf_sr_set_targets(bcf_srs_t *readers, const char *targets, int is_file, int alleles)
162 {
163     assert( !readers->targets );
164     if ( targets[0]=='^' )
165     {
166         readers->targets_exclude = 1;
167         targets++;
168     }
169     readers->targets = bcf_sr_regions_init(targets,is_file,0,1,-2);
170     if ( !readers->targets ) return -1;
171     readers->targets_als = alleles;
172     return 0;
173 }
174 
bcf_sr_set_threads(bcf_srs_t * files,int n_threads)175 int bcf_sr_set_threads(bcf_srs_t *files, int n_threads)
176 {
177     if (!(files->n_threads = n_threads))
178         return 0;
179 
180     files->p = calloc(1, sizeof(*files->p));
181     if (!files->p) {
182         files->errnum = no_memory;
183         return -1;
184     }
185     if (!(files->p->pool = hts_tpool_init(n_threads)))
186         return -1;
187 
188     return 0;
189 }
190 
bcf_sr_destroy_threads(bcf_srs_t * files)191 void bcf_sr_destroy_threads(bcf_srs_t *files) {
192     if (!files->p)
193         return;
194 
195     if (files->p->pool)
196         hts_tpool_destroy(files->p->pool);
197     free(files->p);
198 }
199 
bcf_sr_add_reader(bcf_srs_t * files,const char * fname)200 int bcf_sr_add_reader(bcf_srs_t *files, const char *fname)
201 {
202     htsFile* file_ptr = hts_open(fname, "r");
203     if ( ! file_ptr ) {
204         files->errnum = open_failed;
205         return 0;
206     }
207 
208     files->has_line = (int*) realloc(files->has_line, sizeof(int)*(files->nreaders+1));
209     files->has_line[files->nreaders] = 0;
210     files->readers  = (bcf_sr_t*) realloc(files->readers, sizeof(bcf_sr_t)*(files->nreaders+1));
211     bcf_sr_t *reader = &files->readers[files->nreaders++];
212     memset(reader,0,sizeof(bcf_sr_t));
213 
214     reader->file = file_ptr;
215 
216     files->errnum = 0;
217 
218     if ( reader->file->format.compression==bgzf )
219     {
220         BGZF *bgzf = hts_get_bgzfp(reader->file);
221         if ( bgzf && bgzf_check_EOF(bgzf) == 0 ) {
222             files->errnum = no_eof;
223             fprintf(stderr,"[%s] Warning: no BGZF EOF marker; file may be truncated.\n", fname);
224         }
225         if (files->p)
226             bgzf_thread_pool(bgzf, files->p->pool, files->p->qsize);
227     }
228 
229     if ( files->require_index )
230     {
231         if ( reader->file->format.format==vcf )
232         {
233             if ( reader->file->format.compression!=bgzf )
234             {
235                 files->errnum = not_bgzf;
236                 return 0;
237             }
238 
239             reader->tbx_idx = tbx_index_load(fname);
240             if ( !reader->tbx_idx )
241             {
242                 files->errnum = idx_load_failed;
243                 return 0;
244             }
245 
246             reader->header = bcf_hdr_read(reader->file);
247         }
248         else if ( reader->file->format.format==bcf )
249         {
250             if ( reader->file->format.compression!=bgzf )
251             {
252                 files->errnum = not_bgzf;
253                 return 0;
254             }
255 
256             reader->header = bcf_hdr_read(reader->file);
257 
258             reader->bcf_idx = bcf_index_load(fname);
259             if ( !reader->bcf_idx )
260             {
261                 files->errnum = idx_load_failed;
262                 return 0;
263             }
264         }
265         else
266         {
267             files->errnum = file_type_error;
268             return 0;
269         }
270     }
271     else
272     {
273         if ( reader->file->format.format==bcf || reader->file->format.format==vcf )
274         {
275             reader->header = bcf_hdr_read(reader->file);
276         }
277         else
278         {
279             files->errnum = file_type_error;
280             return 0;
281         }
282         files->streaming = 1;
283     }
284     if ( files->streaming && files->nreaders>1 )
285     {
286         files->errnum = api_usage_error;
287         fprintf(stderr,"[%s:%d %s] Error: %d readers, yet require_index not set\n", __FILE__,__LINE__,__FUNCTION__,files->nreaders);
288         return 0;
289     }
290     if ( files->streaming && files->regions )
291     {
292         files->errnum = api_usage_error;
293         fprintf(stderr,"[%s:%d %s] Error: cannot tabix-jump in streaming mode\n", __FILE__,__LINE__,__FUNCTION__);
294         return 0;
295     }
296     if ( !reader->header )
297     {
298         files->errnum = header_error;
299         return 0;
300     }
301 
302     reader->fname = strdup(fname);
303     if ( files->apply_filters )
304         reader->filter_ids = init_filters(reader->header, files->apply_filters, &reader->nfilter_ids);
305 
306     // Update list of chromosomes
307     if ( !files->explicit_regs && !files->streaming )
308     {
309         int n,i;
310         const char **names = reader->tbx_idx ? tbx_seqnames(reader->tbx_idx, &n) : bcf_hdr_seqnames(reader->header, &n);
311         for (i=0; i<n; i++)
312         {
313             if ( !files->regions )
314                 files->regions = _regions_init_string(names[i]);
315             else
316                 _regions_add(files->regions, names[i], -1, -1);
317         }
318         free(names);
319     }
320 
321     return 1;
322 }
323 
bcf_sr_init(void)324 bcf_srs_t *bcf_sr_init(void)
325 {
326     bcf_srs_t *files = (bcf_srs_t*) calloc(1,sizeof(bcf_srs_t));
327     files->aux = (aux_t*) calloc(1,sizeof(aux_t));
328     bcf_sr_sort_init(&BCF_SR_AUX(files)->sort);
329     return files;
330 }
331 
bcf_sr_destroy1(bcf_sr_t * reader)332 static void bcf_sr_destroy1(bcf_sr_t *reader)
333 {
334     free(reader->fname);
335     if ( reader->tbx_idx ) tbx_destroy(reader->tbx_idx);
336     if ( reader->bcf_idx ) hts_idx_destroy(reader->bcf_idx);
337     bcf_hdr_destroy(reader->header);
338     hts_close(reader->file);
339     if ( reader->itr ) tbx_itr_destroy(reader->itr);
340     int j;
341     for (j=0; j<reader->mbuffer; j++)
342         bcf_destroy1(reader->buffer[j]);
343     free(reader->buffer);
344     free(reader->samples);
345     free(reader->filter_ids);
346 }
347 
bcf_sr_destroy(bcf_srs_t * files)348 void bcf_sr_destroy(bcf_srs_t *files)
349 {
350     int i;
351     for (i=0; i<files->nreaders; i++)
352         bcf_sr_destroy1(&files->readers[i]);
353     free(files->has_line);
354     free(files->readers);
355     for (i=0; i<files->n_smpl; i++) free(files->samples[i]);
356     free(files->samples);
357     if (files->targets) bcf_sr_regions_destroy(files->targets);
358     if (files->regions) bcf_sr_regions_destroy(files->regions);
359     if (files->tmps.m) free(files->tmps.s);
360     if (files->n_threads) bcf_sr_destroy_threads(files);
361     bcf_sr_sort_destroy(&BCF_SR_AUX(files)->sort);
362     free(files->aux);
363     free(files);
364 }
365 
bcf_sr_remove_reader(bcf_srs_t * files,int i)366 void bcf_sr_remove_reader(bcf_srs_t *files, int i)
367 {
368     assert( !files->samples );  // not ready for this yet
369     bcf_sr_sort_remove_reader(files, &BCF_SR_AUX(files)->sort, i);
370     bcf_sr_destroy1(&files->readers[i]);
371     if ( i+1 < files->nreaders )
372     {
373         memmove(&files->readers[i], &files->readers[i+1], (files->nreaders-i-1)*sizeof(bcf_sr_t));
374         memmove(&files->has_line[i], &files->has_line[i+1], (files->nreaders-i-1)*sizeof(int));
375     }
376     files->nreaders--;
377 }
378 
379 
debug_buffer(FILE * fp,bcf_sr_t * reader)380 void debug_buffer(FILE *fp, bcf_sr_t *reader)
381 {
382     int j;
383     for (j=0; j<=reader->nbuffer; j++)
384     {
385         bcf1_t *line = reader->buffer[j];
386         fprintf(fp,"\t%p\t%s%s\t%s:%d\t%s ", (void*)line,reader->fname,j==0?"*":" ",reader->header->id[BCF_DT_CTG][line->rid].key,line->pos+1,line->n_allele?line->d.allele[0]:"");
387         int k;
388         for (k=1; k<line->n_allele; k++) fprintf(fp," %s", line->d.allele[k]);
389         fprintf(fp,"\n");
390     }
391 }
392 
debug_buffers(FILE * fp,bcf_srs_t * files)393 void debug_buffers(FILE *fp, bcf_srs_t *files)
394 {
395     int i;
396     for (i=0; i<files->nreaders; i++)
397     {
398         fprintf(fp, "has_line: %d\t%s\n", bcf_sr_has_line(files,i),files->readers[i].fname);
399         debug_buffer(fp, &files->readers[i]);
400     }
401     fprintf(fp,"\n");
402 }
403 
has_filter(bcf_sr_t * reader,bcf1_t * line)404 static inline int has_filter(bcf_sr_t *reader, bcf1_t *line)
405 {
406     int i, j;
407     if ( !line->d.n_flt )
408     {
409         for (j=0; j<reader->nfilter_ids; j++)
410             if ( reader->filter_ids[j]<0 ) return 1;
411         return 0;
412     }
413     for (i=0; i<line->d.n_flt; i++)
414     {
415         for (j=0; j<reader->nfilter_ids; j++)
416             if ( line->d.flt[i]==reader->filter_ids[j] ) return 1;
417     }
418     return 0;
419 }
420 
_reader_seek(bcf_sr_t * reader,const char * seq,int start,int end)421 static int _reader_seek(bcf_sr_t *reader, const char *seq, int start, int end)
422 {
423     if ( end>=MAX_CSI_COOR )
424     {
425         fprintf(stderr,"The coordinate is out of csi index limit: %d\n", end+1);
426         exit(1);
427     }
428     if ( reader->itr )
429     {
430         hts_itr_destroy(reader->itr);
431         reader->itr = NULL;
432     }
433     reader->nbuffer = 0;
434     if ( reader->tbx_idx )
435     {
436         int tid = tbx_name2id(reader->tbx_idx, seq);
437         if ( tid==-1 ) return -1;    // the sequence not present in this file
438         reader->itr = tbx_itr_queryi(reader->tbx_idx,tid,start,end+1);
439     }
440     else
441     {
442         int tid = bcf_hdr_name2id(reader->header, seq);
443         if ( tid==-1 ) return -1;    // the sequence not present in this file
444         reader->itr = bcf_itr_queryi(reader->bcf_idx,tid,start,end+1);
445     }
446     if ( !reader->itr ) fprintf(stderr,"Could not seek: %s:%d-%d\n",seq,start+1,end+1);
447     assert(reader->itr);
448     return 0;
449 }
450 
451 /*
452  *  _readers_next_region() - jumps to next region if necessary
453  *  Returns 0 on success or -1 when there are no more regions left
454  */
_readers_next_region(bcf_srs_t * files)455 static int _readers_next_region(bcf_srs_t *files)
456 {
457     // Need to open new chromosome? Check number of lines in all readers' buffers
458     int i, eos = 0;
459     for (i=0; i<files->nreaders; i++)
460         if ( !files->readers[i].itr && !files->readers[i].nbuffer ) eos++;
461 
462     if ( eos!=files->nreaders )
463     {
464         // Some of the readers still has buffered lines
465         return 0;
466     }
467 
468     // No lines in the buffer, need to open new region or quit
469     if ( bcf_sr_regions_next(files->regions)<0 ) return -1;
470 
471     for (i=0; i<files->nreaders; i++)
472         _reader_seek(&files->readers[i],files->regions->seq_names[files->regions->iseq],files->regions->start,files->regions->end);
473 
474     return 0;
475 }
476 
477 /*
478  *  _reader_fill_buffer() - buffers all records with the same coordinate
479  */
_reader_fill_buffer(bcf_srs_t * files,bcf_sr_t * reader)480 static void _reader_fill_buffer(bcf_srs_t *files, bcf_sr_t *reader)
481 {
482     // Return if the buffer is full: the coordinate of the last buffered record differs
483     if ( reader->nbuffer && reader->buffer[reader->nbuffer]->pos != reader->buffer[1]->pos ) return;
484 
485     // No iterator (sequence not present in this file) and not streaming
486     if ( !reader->itr && !files->streaming ) return;
487 
488     // Fill the buffer with records starting at the same position
489     int i, ret = 0;
490     while (1)
491     {
492         if ( reader->nbuffer+1 >= reader->mbuffer )
493         {
494             // Increase buffer size
495             reader->mbuffer += 8;
496             reader->buffer = (bcf1_t**) realloc(reader->buffer, sizeof(bcf1_t*)*reader->mbuffer);
497             for (i=8; i>0; i--)     // initialize
498             {
499                 reader->buffer[reader->mbuffer-i] = bcf_init1();
500                 reader->buffer[reader->mbuffer-i]->max_unpack = files->max_unpack;
501                 reader->buffer[reader->mbuffer-i]->pos = -1;    // for rare cases when VCF starts from 1
502             }
503         }
504         if ( files->streaming )
505         {
506             if ( reader->file->format.format==vcf )
507             {
508                 if ( (ret=hts_getline(reader->file, KS_SEP_LINE, &files->tmps)) < 0 ) break;   // no more lines
509                 ret = vcf_parse1(&files->tmps, reader->header, reader->buffer[reader->nbuffer+1]);
510                 if ( ret<0 ) { files->errnum = vcf_parse_error; break; }
511             }
512             else if ( reader->file->format.format==bcf )
513             {
514                 ret = bcf_read1(reader->file, reader->header, reader->buffer[reader->nbuffer+1]);
515                 if ( ret < -1 ) files->errnum = bcf_read_error;
516                 if ( ret < 0 ) break; // no more lines or an error
517             }
518             else
519             {
520                 fprintf(stderr,"[%s:%d %s] fixme: not ready for this\n", __FILE__,__LINE__,__FUNCTION__);
521                 exit(1);
522             }
523         }
524         else if ( reader->tbx_idx )
525         {
526             if ( (ret=tbx_itr_next(reader->file, reader->tbx_idx, reader->itr, &files->tmps)) < 0 ) break;  // no more lines
527             ret = vcf_parse1(&files->tmps, reader->header, reader->buffer[reader->nbuffer+1]);
528             if ( ret<0 ) { files->errnum = vcf_parse_error; break; }
529         }
530         else
531         {
532             ret = bcf_itr_next(reader->file, reader->itr, reader->buffer[reader->nbuffer+1]);
533             if ( ret < -1 ) files->errnum = bcf_read_error;
534             if ( ret < 0 ) break; // no more lines or an error
535             bcf_subset_format(reader->header,reader->buffer[reader->nbuffer+1]);
536         }
537 
538         // apply filter
539         if ( !reader->nfilter_ids )
540             bcf_unpack(reader->buffer[reader->nbuffer+1], BCF_UN_STR);
541         else
542         {
543             bcf_unpack(reader->buffer[reader->nbuffer+1], BCF_UN_STR|BCF_UN_FLT);
544             if ( !has_filter(reader, reader->buffer[reader->nbuffer+1]) ) continue;
545         }
546         reader->nbuffer++;
547 
548         if ( reader->buffer[reader->nbuffer]->pos != reader->buffer[1]->pos ) break;    // the buffer is full
549     }
550     if ( ret<0 )
551     {
552         // done for this region
553         tbx_itr_destroy(reader->itr);
554         reader->itr = NULL;
555     }
556 }
557 
558 /*
559  *  _readers_shift_buffer() - removes the first line and all subsequent lines with the same position
560  */
_reader_shift_buffer(bcf_sr_t * reader)561 static void _reader_shift_buffer(bcf_sr_t *reader)
562 {
563     int i;
564     for (i=2; i<=reader->nbuffer; i++)
565         if ( reader->buffer[i]->pos!=reader->buffer[1]->pos ) break;
566     if ( i<=reader->nbuffer )
567     {
568         // A record with a different position follows, swap it. Because of the reader's logic,
569         // only one such line can be present.
570         bcf1_t *tmp = reader->buffer[1]; reader->buffer[1] = reader->buffer[i]; reader->buffer[i] = tmp;
571         reader->nbuffer = 1;
572     }
573     else
574         reader->nbuffer = 0;    // no other line
575 }
576 
_reader_next_line(bcf_srs_t * files)577 int _reader_next_line(bcf_srs_t *files)
578 {
579     int i, min_pos = INT_MAX;
580     const char *chr = NULL;
581 
582     // Loop until next suitable line is found or all readers have finished
583     while ( 1 )
584     {
585         // Get all readers ready for the next region.
586         if ( files->regions && _readers_next_region(files)<0 ) break;
587 
588         // Fill buffers
589         for (i=0; i<files->nreaders; i++)
590         {
591             _reader_fill_buffer(files, &files->readers[i]);
592 
593             // Update the minimum coordinate
594             if ( !files->readers[i].nbuffer ) continue;
595             if ( min_pos > files->readers[i].buffer[1]->pos )
596             {
597                 min_pos = files->readers[i].buffer[1]->pos;
598                 chr = bcf_seqname(files->readers[i].header, files->readers[i].buffer[1]);
599             }
600         }
601         if ( min_pos==INT_MAX )
602         {
603             if ( !files->regions ) break;
604             continue;
605         }
606 
607         // Skip this position if not present in targets
608         if ( files->targets )
609         {
610             int ret = bcf_sr_regions_overlap(files->targets, chr, min_pos, min_pos);
611             if ( (!files->targets_exclude && ret<0) || (files->targets_exclude && !ret) )
612             {
613                 // Remove all lines with this position from the buffer
614                 for (i=0; i<files->nreaders; i++)
615                     if ( files->readers[i].nbuffer && files->readers[i].buffer[1]->pos==min_pos )
616                         _reader_shift_buffer(&files->readers[i]);
617                 min_pos = INT_MAX;
618                 chr = NULL;
619                 continue;
620             }
621         }
622 
623         break;  // done: chr and min_pos are set
624     }
625     if ( !chr ) return 0;
626 
627     return bcf_sr_sort_next(files, &BCF_SR_AUX(files)->sort, chr, min_pos);
628 }
629 
bcf_sr_next_line(bcf_srs_t * files)630 int bcf_sr_next_line(bcf_srs_t *files)
631 {
632     if ( !files->targets_als )
633         return _reader_next_line(files);
634 
635     while (1)
636     {
637         int i, ret = _reader_next_line(files);
638         if ( !ret ) return ret;
639 
640         for (i=0; i<files->nreaders; i++)
641             if ( files->has_line[i] ) break;
642 
643         if ( _regions_match_alleles(files->targets, files->targets_als-1, files->readers[i].buffer[0]) ) return ret;
644 
645         // Check if there are more duplicate lines in the buffers. If not, return this line as if it
646         // matched the targets, even if there is a type mismatch
647         for (i=0; i<files->nreaders; i++)
648         {
649             if ( !files->has_line[i] ) continue;
650             if ( files->readers[i].nbuffer==0 || files->readers[i].buffer[1]->pos!=files->readers[i].buffer[0]->pos ) continue;
651             break;
652         }
653         if ( i==files->nreaders ) return ret;   // no more lines left, output even if target alleles are not of the same type
654     }
655 }
656 
bcf_sr_seek_start(bcf_srs_t * readers)657 static void bcf_sr_seek_start(bcf_srs_t *readers)
658 {
659     bcf_sr_regions_t *reg = readers->regions;
660     int i;
661     for (i=0; i<reg->nseqs; i++)
662         reg->regs[i].creg = -1;
663     reg->iseq = 0;
664 }
665 
666 
bcf_sr_seek(bcf_srs_t * readers,const char * seq,int pos)667 int bcf_sr_seek(bcf_srs_t *readers, const char *seq, int pos)
668 {
669     if ( !readers->regions ) return 0;
670     if ( !seq && !pos )
671     {
672         // seek to start
673         bcf_sr_seek_start(readers);
674         return 0;
675     }
676     bcf_sr_regions_overlap(readers->regions, seq, pos, pos);
677     int i, nret = 0;
678     for (i=0; i<readers->nreaders; i++)
679     {
680         nret += _reader_seek(&readers->readers[i],seq,pos,MAX_CSI_COOR-1);
681     }
682     return nret;
683 }
684 
bcf_sr_set_samples(bcf_srs_t * files,const char * fname,int is_file)685 int bcf_sr_set_samples(bcf_srs_t *files, const char *fname, int is_file)
686 {
687     int i, j, nsmpl, free_smpl = 0;
688     char **smpl = NULL;
689 
690     void *exclude = (fname[0]=='^') ? khash_str2int_init() : NULL;
691     if ( exclude || strcmp("-",fname) ) // "-" stands for all samples
692     {
693         smpl = hts_readlist(fname, is_file, &nsmpl);
694         if ( !smpl )
695         {
696             fprintf(stderr,"Could not read the file: \"%s\"\n", fname);
697             return 0;
698         }
699         if ( exclude )
700         {
701             for (i=0; i<nsmpl; i++)
702                 khash_str2int_inc(exclude, smpl[i]);
703         }
704         free_smpl = 1;
705     }
706     if ( !smpl )
707     {
708         smpl  = files->readers[0].header->samples;   // intersection of all samples
709         nsmpl = bcf_hdr_nsamples(files->readers[0].header);
710     }
711 
712     files->samples = NULL;
713     files->n_smpl  = 0;
714     for (i=0; i<nsmpl; i++)
715     {
716         if ( exclude && khash_str2int_has_key(exclude,smpl[i])  ) continue;
717 
718         int n_isec = 0;
719         for (j=0; j<files->nreaders; j++)
720         {
721             if ( bcf_hdr_id2int(files->readers[j].header, BCF_DT_SAMPLE, smpl[i])<0 ) break;
722             n_isec++;
723         }
724         if ( n_isec!=files->nreaders )
725         {
726             fprintf(stderr,"Warning: The sample \"%s\" was not found in %s, skipping\n", smpl[i], files->readers[n_isec].fname);
727             continue;
728         }
729 
730         files->samples = (char**) realloc(files->samples, (files->n_smpl+1)*sizeof(const char*));
731         files->samples[files->n_smpl++] = strdup(smpl[i]);
732     }
733 
734     if ( exclude ) khash_str2int_destroy(exclude);
735     if ( free_smpl )
736     {
737         for (i=0; i<nsmpl; i++) free(smpl[i]);
738         free(smpl);
739     }
740 
741     if ( !files->n_smpl )
742     {
743         if ( files->nreaders>1 )
744             fprintf(stderr,"No samples in common.\n");
745         return 0;
746     }
747     for (i=0; i<files->nreaders; i++)
748     {
749         bcf_sr_t *reader = &files->readers[i];
750         reader->samples  = (int*) malloc(sizeof(int)*files->n_smpl);
751         reader->n_smpl   = files->n_smpl;
752         for (j=0; j<files->n_smpl; j++)
753             reader->samples[j] = bcf_hdr_id2int(reader->header, BCF_DT_SAMPLE, files->samples[j]);
754     }
755     return 1;
756 }
757 
758 // Add a new region into a list sorted by start,end. On input the coordinates
759 // are 1-based, stored 0-based, inclusive.
_regions_add(bcf_sr_regions_t * reg,const char * chr,int start,int end)760 static void _regions_add(bcf_sr_regions_t *reg, const char *chr, int start, int end)
761 {
762     if ( start==-1 && end==-1 )
763     {
764         start = 0; end = MAX_CSI_COOR-1;
765     }
766     else
767     {
768         start--; end--; // store 0-based coordinates
769     }
770 
771     if ( !reg->seq_hash )
772          reg->seq_hash = khash_str2int_init();
773 
774     int iseq;
775     if ( khash_str2int_get(reg->seq_hash, chr, &iseq)<0 )
776     {
777         // the chromosome block does not exist
778         iseq = reg->nseqs++;
779         reg->seq_names = (char**) realloc(reg->seq_names,sizeof(char*)*reg->nseqs);
780         reg->regs = (region_t*) realloc(reg->regs,sizeof(region_t)*reg->nseqs);
781         memset(&reg->regs[reg->nseqs-1],0,sizeof(region_t));
782         reg->seq_names[iseq] = strdup(chr);
783         reg->regs[iseq].creg = -1;
784         khash_str2int_set(reg->seq_hash,reg->seq_names[iseq],iseq);
785     }
786 
787     region_t *creg = &reg->regs[iseq];
788 
789     // the regions may not be sorted on input: binary search
790     int i, min = 0, max = creg->nregs - 1;
791     while ( min<=max )
792     {
793         i = (max+min)/2;
794         if ( start < creg->regs[i].start ) max = i - 1;
795         else if ( start > creg->regs[i].start ) min = i + 1;
796         else break;
797     }
798     if ( min>max || creg->regs[i].start!=start || creg->regs[i].end!=end )
799     {
800         // no such region, insert a new one just after max
801         hts_expand(region1_t,creg->nregs+1,creg->mregs,creg->regs);
802         if ( ++max < creg->nregs )
803             memmove(&creg->regs[max+1],&creg->regs[max],(creg->nregs - max)*sizeof(region1_t));
804         creg->regs[max].start = start;
805         creg->regs[max].end   = end;
806         creg->nregs++;
807     }
808 }
809 
810 // File name or a list of genomic locations. If file name, NULL is returned.
_regions_init_string(const char * str)811 static bcf_sr_regions_t *_regions_init_string(const char *str)
812 {
813     bcf_sr_regions_t *reg = (bcf_sr_regions_t *) calloc(1, sizeof(bcf_sr_regions_t));
814     reg->start = reg->end = -1;
815     reg->prev_start = reg->prev_seq = -1;
816 
817     kstring_t tmp = {0,0,0};
818     const char *sp = str, *ep = str;
819     int from, to;
820     while ( 1 )
821     {
822         while ( *ep && *ep!=',' && *ep!=':' ) ep++;
823         tmp.l = 0;
824         kputsn(sp,ep-sp,&tmp);
825         if ( *ep==':' )
826         {
827             sp = ep+1;
828             from = hts_parse_decimal(sp,(char**)&ep,0);
829             if ( sp==ep )
830             {
831                 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
832                 free(reg); free(tmp.s); return NULL;
833             }
834             if ( !*ep || *ep==',' )
835             {
836                 _regions_add(reg, tmp.s, from, from);
837                 sp = ep;
838                 continue;
839             }
840             if ( *ep!='-' )
841             {
842                 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
843                 free(reg); free(tmp.s); return NULL;
844             }
845             ep++;
846             sp = ep;
847             to = hts_parse_decimal(sp,(char**)&ep,0);
848             if ( *ep && *ep!=',' )
849             {
850                 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
851                 free(reg); free(tmp.s); return NULL;
852             }
853             if ( sp==ep ) to = MAX_CSI_COOR-1;
854             _regions_add(reg, tmp.s, from, to);
855             if ( !*ep ) break;
856             sp = ep;
857         }
858         else
859         {
860             if ( tmp.l ) _regions_add(reg, tmp.s, -1, -1);
861             if ( !*ep ) break;
862             sp = ++ep;
863         }
864     }
865     free(tmp.s);
866     return reg;
867 }
868 
869 // ichr,ifrom,ito are 0-based;
870 // returns -1 on error, 0 if the line is a comment line, 1 on success
_regions_parse_line(char * line,int ichr,int ifrom,int ito,char ** chr,char ** chr_end,int * from,int * to)871 static int _regions_parse_line(char *line, int ichr,int ifrom,int ito, char **chr,char **chr_end,int *from,int *to)
872 {
873     *chr_end = NULL;
874 
875     if ( line[0]=='#' ) return 0;
876 
877     int k,l;    // index of the start and end column of the tab-delimited file
878     if ( ifrom <= ito )
879         k = ifrom, l = ito;
880     else
881         l = ifrom, k = ito;
882 
883     int i;
884     char *se = line, *ss = NULL; // start and end
885     char *tmp;
886     for (i=0; i<=k && *se; i++)
887     {
888         ss = i==0 ? se++ : ++se;
889         while (*se && *se!='\t') se++;
890     }
891     if ( i<=k ) return -1;
892     if ( k==l )
893     {
894         *from = *to = hts_parse_decimal(ss, &tmp, 0);
895         if ( tmp==ss ) return -1;
896     }
897     else
898     {
899         if ( k==ifrom )
900             *from = hts_parse_decimal(ss, &tmp, 0);
901         else
902             *to = hts_parse_decimal(ss, &tmp, 0);
903         if ( ss==tmp ) return -1;
904 
905         for (i=k; i<l && *se; i++)
906         {
907             ss = ++se;
908             while (*se && *se!='\t') se++;
909         }
910         if ( i<l ) return -1;
911         if ( k==ifrom )
912             *to = hts_parse_decimal(ss, &tmp, 0);
913         else
914             *from = hts_parse_decimal(ss, &tmp, 0);
915         if ( ss==tmp ) return -1;
916     }
917 
918     ss = se = line;
919     for (i=0; i<=ichr && *se; i++)
920     {
921         if ( i>0 ) ss = ++se;
922         while (*se && *se!='\t') se++;
923     }
924     if ( i<=ichr ) return -1;
925     *chr_end = se;
926     *chr = ss;
927     return 1;
928 }
929 
bcf_sr_regions_init(const char * regions,int is_file,int ichr,int ifrom,int ito)930 bcf_sr_regions_t *bcf_sr_regions_init(const char *regions, int is_file, int ichr, int ifrom, int ito)
931 {
932     bcf_sr_regions_t *reg;
933     if ( !is_file ) return _regions_init_string(regions);
934 
935     reg = (bcf_sr_regions_t *) calloc(1, sizeof(bcf_sr_regions_t));
936     reg->start = reg->end = -1;
937     reg->prev_start = reg->prev_seq = -1;
938 
939     reg->file = hts_open(regions, "rb");
940     if ( !reg->file )
941     {
942         fprintf(stderr,"[%s:%d %s] Could not open file: %s\n", __FILE__,__LINE__,__FUNCTION__,regions);
943         free(reg);
944         return NULL;
945     }
946 
947     reg->tbx = tbx_index_load(regions);
948     if ( !reg->tbx )
949     {
950         int len = strlen(regions);
951         int is_bed  = strcasecmp(".bed",regions+len-4) ? 0 : 1;
952         if ( !is_bed && !strcasecmp(".bed.gz",regions+len-7) ) is_bed = 1;
953 
954         if ( reg->file->format.format==vcf ) ito = 1;
955 
956         // read the whole file, tabix index is not present
957         while ( hts_getline(reg->file, KS_SEP_LINE, &reg->line) > 0 )
958         {
959             char *chr, *chr_end;
960             int from, to, ret;
961             ret = _regions_parse_line(reg->line.s, ichr,ifrom,abs(ito), &chr,&chr_end,&from,&to);
962             if ( ret < 0 )
963             {
964                 if ( ito<0 )
965                     ret = _regions_parse_line(reg->line.s, ichr,ifrom,ifrom, &chr,&chr_end,&from,&to);
966                 if ( ret<0 )
967                 {
968                     fprintf(stderr,"[%s:%d] Could not parse the file %s, using the columns %d,%d[,%d]\n", __FILE__,__LINE__,regions,ichr+1,ifrom+1,ito+1);
969                     hts_close(reg->file); reg->file = NULL; free(reg);
970                     return NULL;
971                 }
972             }
973             if ( !ret ) continue;
974             if ( is_bed ) from++;
975             *chr_end = 0;
976             _regions_add(reg, chr, from, to);
977             *chr_end = '\t';
978         }
979         hts_close(reg->file); reg->file = NULL;
980         if ( !reg->nseqs ) { free(reg); return NULL; }
981         return reg;
982     }
983 
984     reg->seq_names = (char**) tbx_seqnames(reg->tbx, &reg->nseqs);
985     if ( !reg->seq_hash )
986         reg->seq_hash = khash_str2int_init();
987     int i;
988     for (i=0; i<reg->nseqs; i++)
989     {
990         khash_str2int_set(reg->seq_hash,reg->seq_names[i],i);
991     }
992     reg->fname  = strdup(regions);
993     reg->is_bin = 1;
994     return reg;
995 }
996 
bcf_sr_regions_destroy(bcf_sr_regions_t * reg)997 void bcf_sr_regions_destroy(bcf_sr_regions_t *reg)
998 {
999     int i;
1000     free(reg->fname);
1001     if ( reg->itr ) tbx_itr_destroy(reg->itr);
1002     if ( reg->tbx ) tbx_destroy(reg->tbx);
1003     if ( reg->file ) hts_close(reg->file);
1004     if ( reg->als ) free(reg->als);
1005     if ( reg->als_str.s ) free(reg->als_str.s);
1006     free(reg->line.s);
1007     if ( reg->regs )
1008     {
1009          // free only in-memory names, tbx names are const
1010         for (i=0; i<reg->nseqs; i++)
1011         {
1012             free(reg->seq_names[i]);
1013             free(reg->regs[i].regs);
1014         }
1015     }
1016     free(reg->regs);
1017     free(reg->seq_names);
1018     khash_str2int_destroy(reg->seq_hash);
1019     free(reg);
1020 }
1021 
bcf_sr_regions_seek(bcf_sr_regions_t * reg,const char * seq)1022 int bcf_sr_regions_seek(bcf_sr_regions_t *reg, const char *seq)
1023 {
1024     reg->iseq = reg->start = reg->end = -1;
1025     if ( khash_str2int_get(reg->seq_hash, seq, &reg->iseq) < 0 ) return -1;  // sequence seq not in regions
1026 
1027     // using in-memory regions
1028     if ( reg->regs )
1029     {
1030         reg->regs[reg->iseq].creg = -1;
1031         return 0;
1032     }
1033 
1034     // reading regions from tabix
1035     if ( reg->itr ) tbx_itr_destroy(reg->itr);
1036     reg->itr = tbx_itr_querys(reg->tbx, seq);
1037     if ( reg->itr ) return 0;
1038 
1039     return -1;
1040 }
1041 
bcf_sr_regions_next(bcf_sr_regions_t * reg)1042 int bcf_sr_regions_next(bcf_sr_regions_t *reg)
1043 {
1044     if ( reg->iseq<0 ) return -1;
1045     reg->start = reg->end = -1;
1046     reg->nals = 0;
1047 
1048     // using in-memory regions
1049     if ( reg->regs )
1050     {
1051         while ( reg->iseq < reg->nseqs )
1052         {
1053             reg->regs[reg->iseq].creg++;
1054             if ( reg->regs[reg->iseq].creg < reg->regs[reg->iseq].nregs ) break;
1055             reg->iseq++;
1056         }
1057         if ( reg->iseq >= reg->nseqs ) { reg->iseq = -1; return -1; } // no more regions left
1058         region1_t *creg = &reg->regs[reg->iseq].regs[reg->regs[reg->iseq].creg];
1059         reg->start = creg->start;
1060         reg->end   = creg->end;
1061         return 0;
1062     }
1063 
1064     // reading from tabix
1065     char *chr, *chr_end;
1066     int ichr = 0, ifrom = 1, ito = 2, is_bed = 0, from, to;
1067     if ( reg->tbx )
1068     {
1069         ichr   = reg->tbx->conf.sc-1;
1070         ifrom  = reg->tbx->conf.bc-1;
1071         ito    = reg->tbx->conf.ec-1;
1072         if ( ito<0 ) ito = ifrom;
1073         is_bed = reg->tbx->conf.preset==TBX_UCSC ? 1 : 0;
1074     }
1075 
1076     int ret = 0;
1077     while ( !ret )
1078     {
1079         if ( reg->itr )
1080         {
1081             // tabix index present, reading a chromosome block
1082             ret = tbx_itr_next(reg->file, reg->tbx, reg->itr, &reg->line);
1083             if ( ret<0 ) { reg->iseq = -1; return -1; }
1084         }
1085         else
1086         {
1087             if ( reg->is_bin )
1088             {
1089                 // Waited for seek which never came. Reopen in text mode and stream
1090                 // through the regions, otherwise hts_getline would fail
1091                 hts_close(reg->file);
1092                 reg->file = hts_open(reg->fname, "r");
1093                 if ( !reg->file )
1094                 {
1095                     fprintf(stderr,"[%s:%d %s] Could not open file: %s\n", __FILE__,__LINE__,__FUNCTION__,reg->fname);
1096                     reg->file = NULL;
1097                     bcf_sr_regions_destroy(reg);
1098                     return -1;
1099                 }
1100                 reg->is_bin = 0;
1101             }
1102 
1103             // tabix index absent, reading the whole file
1104             ret = hts_getline(reg->file, KS_SEP_LINE, &reg->line);
1105             if ( ret<0 ) { reg->iseq = -1; return -1; }
1106         }
1107         ret = _regions_parse_line(reg->line.s, ichr,ifrom,ito, &chr,&chr_end,&from,&to);
1108         if ( ret<0 )
1109         {
1110             fprintf(stderr,"[%s:%d] Could not parse the file %s, using the columns %d,%d,%d\n", __FILE__,__LINE__,reg->fname,ichr+1,ifrom+1,ito+1);
1111             return -1;
1112         }
1113     }
1114     if ( is_bed ) from++;
1115 
1116     *chr_end = 0;
1117     if ( khash_str2int_get(reg->seq_hash, chr, &reg->iseq)<0 )
1118     {
1119         fprintf(stderr,"Broken tabix index? The sequence \"%s\" not in dictionary [%s]\n", chr,reg->line.s);
1120         exit(1);
1121     }
1122     *chr_end = '\t';
1123 
1124     reg->start = from - 1;
1125     reg->end   = to - 1;
1126     return 0;
1127 }
1128 
_regions_match_alleles(bcf_sr_regions_t * reg,int als_idx,bcf1_t * rec)1129 static int _regions_match_alleles(bcf_sr_regions_t *reg, int als_idx, bcf1_t *rec)
1130 {
1131     if ( reg->regs )
1132     {
1133         // payload is not supported for in-memory regions, switch to regidx instead in future
1134         fprintf(stderr,"Error: Compressed and indexed targets file is required\n");
1135         exit(1);
1136     }
1137 
1138     int i = 0, max_len = 0;
1139     if ( !reg->nals )
1140     {
1141         char *ss = reg->line.s;
1142         while ( i<als_idx && *ss )
1143         {
1144             if ( *ss=='\t' ) i++;
1145             ss++;
1146         }
1147         char *se = ss;
1148         reg->nals = 1;
1149         while ( *se && *se!='\t' )
1150         {
1151             if ( *se==',' ) reg->nals++;
1152             se++;
1153         }
1154         ks_resize(&reg->als_str, se-ss+1+reg->nals);
1155         reg->als_str.l = 0;
1156         hts_expand(char*,reg->nals,reg->mals,reg->als);
1157         reg->nals = 0;
1158 
1159         se = ss;
1160         while ( *(++se) )
1161         {
1162             if ( *se=='\t' ) break;
1163             if ( *se!=',' ) continue;
1164             reg->als[reg->nals] = &reg->als_str.s[reg->als_str.l];
1165             kputsn(ss,se-ss,&reg->als_str);
1166             if ( &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals] > max_len ) max_len = &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals];
1167             reg->als_str.l++;
1168             reg->nals++;
1169             ss = ++se;
1170         }
1171         reg->als[reg->nals] = &reg->als_str.s[reg->als_str.l];
1172         kputsn(ss,se-ss,&reg->als_str);
1173         if ( &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals] > max_len ) max_len = &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals];
1174         reg->nals++;
1175         reg->als_type = max_len > 1 ? VCF_INDEL : VCF_SNP;  // this is a simplified check, see vcf.c:bcf_set_variant_types
1176     }
1177     int type = bcf_get_variant_types(rec);
1178     if ( reg->als_type & VCF_INDEL )
1179         return type & VCF_INDEL ? 1 : 0;
1180     return !(type & VCF_INDEL) ? 1 : 0;
1181 }
1182 
bcf_sr_regions_overlap(bcf_sr_regions_t * reg,const char * seq,int start,int end)1183 int bcf_sr_regions_overlap(bcf_sr_regions_t *reg, const char *seq, int start, int end)
1184 {
1185     int iseq;
1186     if ( khash_str2int_get(reg->seq_hash, seq, &iseq)<0 ) return -1;    // no such sequence
1187 
1188     if ( reg->prev_seq==-1 || iseq!=reg->prev_seq || reg->prev_start > start ) // new chromosome or after a seek
1189     {
1190         // flush regions left on previous chromosome
1191         if ( reg->missed_reg_handler && reg->prev_seq!=-1 && reg->iseq!=-1 )
1192             bcf_sr_regions_flush(reg);
1193 
1194         bcf_sr_regions_seek(reg, seq);
1195         reg->start = reg->end = -1;
1196     }
1197     if ( reg->prev_seq==iseq && reg->iseq!=iseq ) return -2;    // no more regions on this chromosome
1198     reg->prev_seq = reg->iseq;
1199     reg->prev_start = start;
1200 
1201     while ( iseq==reg->iseq && reg->end < start )
1202     {
1203         if ( bcf_sr_regions_next(reg) < 0 ) return -2;  // no more regions left
1204         if ( reg->iseq != iseq ) return -1; // does not overlap any regions
1205         if ( reg->missed_reg_handler && reg->end < start ) reg->missed_reg_handler(reg, reg->missed_reg_data);
1206     }
1207     if ( reg->start <= end ) return 0;    // region overlap
1208     return -1;  // no overlap
1209 }
1210 
bcf_sr_regions_flush(bcf_sr_regions_t * reg)1211 void bcf_sr_regions_flush(bcf_sr_regions_t *reg)
1212 {
1213     if ( !reg->missed_reg_handler || reg->prev_seq==-1 ) return;
1214     while ( !bcf_sr_regions_next(reg) ) reg->missed_reg_handler(reg, reg->missed_reg_data);
1215     return;
1216 }
1217 
1218