1 /* The MIT License
2 
3    Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4                  2011, 2012 Attractive Chaos <attractor@live.co.uk>
5    Copyright (C) 2009, 2013-2017 Genome Research Ltd
6 
7    Permission is hereby granted, free of charge, to any person obtaining a copy
8    of this software and associated documentation files (the "Software"), to deal
9    in the Software without restriction, including without limitation the rights
10    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11    copies of the Software, and to permit persons to whom the Software is
12    furnished to do so, subject to the following conditions:
13 
14    The above copyright notice and this permission notice shall be included in
15    all copies or substantial portions of the Software.
16 
17    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23    THE SOFTWARE.
24 */
25 
26 #include <config.h>
27 
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <unistd.h>
33 #include <assert.h>
34 #include <pthread.h>
35 #include <sys/types.h>
36 #include <inttypes.h>
37 
38 #include "htslib/hts.h"
39 #include "htslib/bgzf.h"
40 #include "htslib/hfile.h"
41 #include "htslib/thread_pool.h"
42 #include "cram/pooled_alloc.h"
43 
44 #define BGZF_CACHE
45 #define BGZF_MT
46 
47 #define BLOCK_HEADER_LENGTH 18
48 #define BLOCK_FOOTER_LENGTH 8
49 
50 
51 /* BGZF/GZIP header (speciallized from RFC 1952; little endian):
52  +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
53  | 31|139|  8|  4|              0|  0|255|      6| 66| 67|      2|BLK_LEN|
54  +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
55   BGZF extension:
56                 ^                              ^   ^   ^
57                 |                              |   |   |
58                FLG.EXTRA                     XLEN  B   C
59 
60   BGZF format is compatible with GZIP. It limits the size of each compressed
61   block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
62   records the size.
63 
64 */
65 static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
66 
67 #ifdef BGZF_CACHE
68 typedef struct {
69     int size;
70     uint8_t *block;
71     int64_t end_offset;
72 } cache_t;
73 #include "htslib/khash.h"
74 KHASH_MAP_INIT_INT64(cache, cache_t)
75 #endif
76 
77 #ifdef BGZF_MT
78 
79 typedef struct bgzf_job {
80     BGZF *fp;
81     unsigned char comp_data[BGZF_MAX_BLOCK_SIZE];
82     size_t comp_len;
83     unsigned char uncomp_data[BGZF_MAX_BLOCK_SIZE];
84     size_t uncomp_len;
85     int errcode;
86     int64_t block_address;
87     int hit_eof;
88 } bgzf_job;
89 
90 enum mtaux_cmd {
91     NONE = 0,
92     SEEK,
93     HAS_EOF,
94     CLOSE,
95 };
96 
97 typedef struct bgzf_mtaux_t {
98     // Memory pool for bgzf_job structs, to avoid many malloc/free
99     pool_alloc_t *job_pool;
100     bgzf_job *curr_job;
101 
102     // Thread pool
103     int n_threads;
104     int own_pool;
105     hts_tpool *pool;
106 
107     // Output queue holding completed bgzf_jobs
108     hts_tpool_process *out_queue;
109 
110     // I/O thread.
111     pthread_t io_task;
112     pthread_mutex_t job_pool_m;
113     int jobs_pending; // number of jobs waiting
114     int flush_pending;
115     void *free_block;
116     int hit_eof;  // r/w entirely within main thread
117 
118     // Message passing to the reader thread; eg seek requests
119     int errcode;
120     uint64_t block_address;
121     int eof;
122     pthread_mutex_t command_m; // Set whenever fp is being updated
123     pthread_cond_t command_c;
124     enum mtaux_cmd command;
125 } mtaux_t;
126 #endif
127 
128 typedef struct
129 {
130     uint64_t uaddr;  // offset w.r.t. uncompressed data
131     uint64_t caddr;  // offset w.r.t. compressed data
132 }
133 bgzidx1_t;
134 
135 struct __bgzidx_t
136 {
137     int noffs, moffs;       // the size of the index, n:used, m:allocated
138     bgzidx1_t *offs;        // offsets
139     uint64_t ublock_addr;   // offset of the current block (uncompressed data)
140 };
141 
142 void bgzf_index_destroy(BGZF *fp);
143 int bgzf_index_add_block(BGZF *fp);
144 static void mt_destroy(mtaux_t *mt);
145 
packInt16(uint8_t * buffer,uint16_t value)146 static inline void packInt16(uint8_t *buffer, uint16_t value)
147 {
148     buffer[0] = value;
149     buffer[1] = value >> 8;
150 }
151 
unpackInt16(const uint8_t * buffer)152 static inline int unpackInt16(const uint8_t *buffer)
153 {
154     return buffer[0] | buffer[1] << 8;
155 }
156 
packInt32(uint8_t * buffer,uint32_t value)157 static inline void packInt32(uint8_t *buffer, uint32_t value)
158 {
159     buffer[0] = value;
160     buffer[1] = value >> 8;
161     buffer[2] = value >> 16;
162     buffer[3] = value >> 24;
163 }
164 
bgzf_zerr(int errnum,z_stream * zs)165 static const char *bgzf_zerr(int errnum, z_stream *zs)
166 {
167     static char buffer[32];
168 
169     /* Return zs->msg if available.
170        zlib doesn't set this very reliably.  Looking at the source suggests
171        that it may get set to a useful message for deflateInit2, inflateInit2
172        and inflate when it returns Z_DATA_ERROR. For inflate with other
173        return codes, deflate, deflateEnd and inflateEnd it doesn't appear
174        to be useful.  For the likely non-useful cases, the caller should
175        pass NULL into zs. */
176 
177     if (zs && zs->msg) return zs->msg;
178 
179     // gzerror OF((gzFile file, int *errnum)
180     switch (errnum) {
181     case Z_ERRNO:
182         return strerror(errno);
183     case Z_STREAM_ERROR:
184         return "invalid parameter/compression level, or inconsistent stream state";
185     case Z_DATA_ERROR:
186         return "invalid or incomplete IO";
187     case Z_MEM_ERROR:
188         return "out of memory";
189     case Z_BUF_ERROR:
190         return "progress temporarily not possible, or in() / out() returned an error";
191     case Z_VERSION_ERROR:
192         return "zlib version mismatch";
193     case Z_OK: // 0: maybe gzgets error Z_NULL
194     default:
195         snprintf(buffer, sizeof(buffer), "[%d] unknown", errnum);
196         return buffer;  // FIXME: Not thread-safe.
197     }
198 }
199 
bgzf_read_init(hFILE * hfpr)200 static BGZF *bgzf_read_init(hFILE *hfpr)
201 {
202     BGZF *fp;
203     uint8_t magic[18];
204     ssize_t n = hpeek(hfpr, magic, 18);
205     if (n < 0) return NULL;
206 
207     fp = (BGZF*)calloc(1, sizeof(BGZF));
208     if (fp == NULL) return NULL;
209 
210     fp->is_write = 0;
211     fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
212     if (fp->uncompressed_block == NULL) { free(fp); return NULL; }
213     fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
214     fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b);
215     fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
216 #ifdef BGZF_CACHE
217     fp->cache = kh_init(cache);
218 #endif
219     return fp;
220 }
221 
222 // get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
mode2level(const char * mode)223 static int mode2level(const char *mode)
224 {
225     int i, compress_level = -1;
226     for (i = 0; mode[i]; ++i)
227         if (mode[i] >= '0' && mode[i] <= '9') break;
228     if (mode[i]) compress_level = (int)mode[i] - '0';
229     if (strchr(mode, 'u')) compress_level = -2;
230     return compress_level;
231 }
bgzf_write_init(const char * mode)232 static BGZF *bgzf_write_init(const char *mode)
233 {
234     BGZF *fp;
235     fp = (BGZF*)calloc(1, sizeof(BGZF));
236     if (fp == NULL) goto mem_fail;
237     fp->is_write = 1;
238     int compress_level = mode2level(mode);
239     if ( compress_level==-2 )
240     {
241         fp->is_compressed = 0;
242         return fp;
243     }
244     fp->is_compressed = 1;
245 
246     fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
247     if (fp->uncompressed_block == NULL) goto mem_fail;
248     fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
249 
250     fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
251     if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
252     if ( strchr(mode,'g') )
253     {
254         // gzip output
255         fp->is_gzip = 1;
256         fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
257         if (fp->gz_stream == NULL) goto mem_fail;
258         fp->gz_stream->zalloc = NULL;
259         fp->gz_stream->zfree  = NULL;
260         fp->gz_stream->msg    = NULL;
261 
262         int ret = deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY);
263         if (ret!=Z_OK) {
264             if (hts_verbose >= 1) {
265                 fprintf(stderr, "[E::%s] deflateInit2 failed: %s\n",
266                         __func__, bgzf_zerr(ret, fp->gz_stream));
267             }
268             goto fail;
269         }
270     }
271     return fp;
272 
273  mem_fail:
274     if (hts_verbose >= 1) {
275         fprintf(stderr, "[E::%s] %s\n", __func__, strerror(errno));
276     }
277  fail:
278     if (fp != NULL) {
279         free(fp->uncompressed_block);
280         free(fp->gz_stream);
281         free(fp);
282     }
283     return NULL;
284 }
285 
bgzf_open(const char * path,const char * mode)286 BGZF *bgzf_open(const char *path, const char *mode)
287 {
288     BGZF *fp = 0;
289     assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
290     if (strchr(mode, 'r')) {
291         hFILE *fpr;
292         if ((fpr = hopen(path, mode)) == 0) return 0;
293         fp = bgzf_read_init(fpr);
294         if (fp == 0) { hclose_abruptly(fpr); return NULL; }
295         fp->fp = fpr;
296     } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
297         hFILE *fpw;
298         if ((fpw = hopen(path, mode)) == 0) return 0;
299         fp = bgzf_write_init(mode);
300         if (fp == NULL) return NULL;
301         fp->fp = fpw;
302     }
303     else { errno = EINVAL; return 0; }
304 
305     fp->is_be = ed_is_big();
306     return fp;
307 }
308 
bgzf_dopen(int fd,const char * mode)309 BGZF *bgzf_dopen(int fd, const char *mode)
310 {
311     BGZF *fp = 0;
312     assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
313     if (strchr(mode, 'r')) {
314         hFILE *fpr;
315         if ((fpr = hdopen(fd, mode)) == 0) return 0;
316         fp = bgzf_read_init(fpr);
317         if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
318         fp->fp = fpr;
319     } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
320         hFILE *fpw;
321         if ((fpw = hdopen(fd, mode)) == 0) return 0;
322         fp = bgzf_write_init(mode);
323         if (fp == NULL) return NULL;
324         fp->fp = fpw;
325     }
326     else { errno = EINVAL; return 0; }
327 
328     fp->is_be = ed_is_big();
329     return fp;
330 }
331 
bgzf_hopen(hFILE * hfp,const char * mode)332 BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
333 {
334     BGZF *fp = NULL;
335     assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
336     if (strchr(mode, 'r')) {
337         fp = bgzf_read_init(hfp);
338         if (fp == NULL) return NULL;
339     } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
340         fp = bgzf_write_init(mode);
341         if (fp == NULL) return NULL;
342     }
343     else { errno = EINVAL; return 0; }
344 
345     fp->fp = hfp;
346     fp->is_be = ed_is_big();
347     return fp;
348 }
349 
bgzf_compress(void * _dst,size_t * dlen,const void * src,size_t slen,int level)350 int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
351 {
352     uint32_t crc;
353     z_stream zs;
354     uint8_t *dst = (uint8_t*)_dst;
355 
356     // compress the body
357     zs.zalloc = NULL; zs.zfree = NULL;
358     zs.msg = NULL;
359     zs.next_in  = (Bytef*)src;
360     zs.avail_in = slen;
361     zs.next_out = dst + BLOCK_HEADER_LENGTH;
362     zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
363     int ret = deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); // -15 to disable zlib header/footer
364     if (ret!=Z_OK) {
365         if (hts_verbose >= 1) {
366             fprintf(stderr, "[E::%s] deflateInit2 failed: %s\n",
367                     __func__, bgzf_zerr(ret, &zs));
368         }
369         return -1;
370     }
371     if ((ret = deflate(&zs, Z_FINISH)) != Z_STREAM_END) {
372         if (hts_verbose >= 1) {
373             fprintf(stderr, "[E::%s] deflate failed: %s\n",
374                     __func__, bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
375         }
376         return -1;
377     }
378     if ((ret = deflateEnd(&zs)) != Z_OK) {
379         if (hts_verbose >= 1) {
380             fprintf(stderr, "[E::%s] deflateEnd failed: %s\n",
381                     __func__, bgzf_zerr(ret, NULL));
382         }
383         return -1;
384     }
385     *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
386     // write the header
387     memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
388     packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
389     // write the footer
390     crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
391     packInt32((uint8_t*)&dst[*dlen - 8], crc);
392     packInt32((uint8_t*)&dst[*dlen - 4], slen);
393     return 0;
394 }
395 
bgzf_gzip_compress(BGZF * fp,void * _dst,size_t * dlen,const void * src,size_t slen,int level)396 static int bgzf_gzip_compress(BGZF *fp, void *_dst, size_t *dlen, const void *src, size_t slen, int level)
397 {
398     uint8_t *dst = (uint8_t*)_dst;
399     z_stream *zs = fp->gz_stream;
400     int flush = slen ? Z_PARTIAL_FLUSH : Z_FINISH;
401     zs->next_in   = (Bytef*)src;
402     zs->avail_in  = slen;
403     zs->next_out  = dst;
404     zs->avail_out = *dlen;
405     int ret = deflate(zs, flush);
406     if (ret == Z_STREAM_ERROR) {
407         if (hts_verbose >= 1) {
408             fprintf(stderr, "[E::%s] deflate failed: %s\n",
409                     __func__, bgzf_zerr(ret, NULL));
410         }
411         return -1;
412     }
413     if (zs->avail_in != 0) {
414         if (hts_verbose >= 1) {
415             fprintf(stderr, "[E::%s] deflate block too large for output buffer:\n",
416                     __func__);
417         }
418         return -1;
419     }
420     *dlen = *dlen - zs->avail_out;
421     return 0;
422 }
423 
424 // Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
deflate_block(BGZF * fp,int block_length)425 static int deflate_block(BGZF *fp, int block_length)
426 {
427     size_t comp_size = BGZF_MAX_BLOCK_SIZE;
428     int ret;
429     if ( !fp->is_gzip )
430         ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
431     else
432         ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
433 
434     if ( ret != 0 )
435     {
436         if (hts_verbose >= 3) {
437             fprintf(stderr, "[E::%s] compression error %d\n", __func__, ret);
438         }
439         fp->errcode |= BGZF_ERR_ZLIB;
440         return -1;
441     }
442     fp->block_offset = 0;
443     return comp_size;
444 }
445 
bgzf_uncompress(uint8_t * dst,size_t * dlen,const uint8_t * src,size_t slen)446 static int bgzf_uncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen) {
447     z_stream zs;
448     zs.zalloc = NULL;
449     zs.zfree = NULL;
450     zs.msg = NULL;
451     zs.next_in = (Bytef*)src;
452     zs.avail_in = slen;
453     zs.next_out = (Bytef*)dst;
454     zs.avail_out = *dlen;
455 
456     int ret = inflateInit2(&zs, -15);
457     if (ret != Z_OK) {
458         if (hts_verbose >= 1) {
459             fprintf(stderr, "[E::%s] inflateInit2 failed: %s\n",
460                     __func__, bgzf_zerr(ret, &zs));
461         }
462         return -1;
463     }
464     if ((ret = inflate(&zs, Z_FINISH)) != Z_STREAM_END) {
465         if (hts_verbose >= 1) {
466             fprintf(stderr, "[E::%s] inflate failed: %s\n",
467                     __func__, bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
468         }
469         if ((ret = inflateEnd(&zs)) != Z_OK) {
470             if (hts_verbose >= 2) {
471                 fprintf(stderr, "[E::%s] inflateEnd failed: %s\n",
472                         __func__, bgzf_zerr(ret, NULL));
473             }
474         }
475         return -1;
476     }
477     if ((ret = inflateEnd(&zs)) != Z_OK) {
478         if (hts_verbose >= 1) {
479             fprintf(stderr, "[E::%s] inflateEnd failed: %s\n",
480                     __func__, bgzf_zerr(ret, NULL));
481         }
482         return -1;
483     }
484     *dlen = *dlen - zs.avail_out;
485     return 0;
486 }
487 
488 // Inflate the block in fp->compressed_block into fp->uncompressed_block
inflate_block(BGZF * fp,int block_length)489 static int inflate_block(BGZF* fp, int block_length)
490 {
491     size_t dlen = BGZF_MAX_BLOCK_SIZE;
492     int ret = bgzf_uncompress(fp->uncompressed_block, &dlen,
493                               (Bytef*)fp->compressed_block + 18, block_length - 18);
494     if (ret < 0) {
495         fp->errcode |= BGZF_ERR_ZLIB;
496         return -1;
497     }
498 
499     return dlen;
500 }
501 
inflate_gzip_block(BGZF * fp,int cached)502 static int inflate_gzip_block(BGZF *fp, int cached)
503 {
504     int ret = Z_OK;
505     do
506     {
507         if ( !cached && fp->gz_stream->avail_out!=0 )
508         {
509             fp->gz_stream->avail_in = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
510             if ( fp->gz_stream->avail_in<=0 ) return fp->gz_stream->avail_in;
511             if ( fp->gz_stream->avail_in==0 ) break;
512             fp->gz_stream->next_in = fp->compressed_block;
513         }
514         else cached = 0;
515         do
516         {
517             fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
518             fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
519             fp->gz_stream->msg = NULL;
520             ret = inflate(fp->gz_stream, Z_NO_FLUSH);
521             if (ret < 0 && ret != Z_BUF_ERROR) {
522                 if (hts_verbose >= 1) {
523                     fprintf(stderr, "[E::%s] inflate failed: %s\n",
524                             __func__,
525                             bgzf_zerr(ret, ret == Z_DATA_ERROR ? fp->gz_stream : NULL));
526                 }
527                 fp->errcode |= BGZF_ERR_ZLIB;
528                 return -1;
529             }
530             unsigned int have = BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
531             if ( have ) return have;
532         }
533         while ( fp->gz_stream->avail_out == 0 );
534     }
535     while (ret != Z_STREAM_END);
536     return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
537 }
538 
539 // Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
check_header(const uint8_t * header)540 static int check_header(const uint8_t *header)
541 {
542     if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
543     return ((header[3] & 4) != 0
544             && unpackInt16((uint8_t*)&header[10]) == 6
545             && header[12] == 'B' && header[13] == 'C'
546             && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
547 }
548 
549 #ifdef BGZF_CACHE
free_cache(BGZF * fp)550 static void free_cache(BGZF *fp)
551 {
552     khint_t k;
553     khash_t(cache) *h = (khash_t(cache)*)fp->cache;
554     if (fp->is_write) return;
555     for (k = kh_begin(h); k < kh_end(h); ++k)
556         if (kh_exist(h, k)) free(kh_val(h, k).block);
557     kh_destroy(cache, h);
558 }
559 
load_block_from_cache(BGZF * fp,int64_t block_address)560 static int load_block_from_cache(BGZF *fp, int64_t block_address)
561 {
562     khint_t k;
563     cache_t *p;
564 
565     khash_t(cache) *h = (khash_t(cache)*)fp->cache;
566     k = kh_get(cache, h, block_address);
567     if (k == kh_end(h)) return 0;
568     p = &kh_val(h, k);
569     if (fp->block_length != 0) fp->block_offset = 0;
570     fp->block_address = block_address;
571     fp->block_length = p->size;
572     // FIXME: why BGZF_MAX_BLOCK_SIZE and not p->size?
573     memcpy(fp->uncompressed_block, p->block, BGZF_MAX_BLOCK_SIZE);
574     if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
575     {
576         // todo: move the error up
577         fprintf(stderr,"Could not hseek to %"PRId64"\n", p->end_offset);
578         exit(1);
579     }
580     return p->size;
581 }
582 
cache_block(BGZF * fp,int size)583 static void cache_block(BGZF *fp, int size)
584 {
585     int ret;
586     khint_t k;
587     cache_t *p;
588     //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
589     khash_t(cache) *h = (khash_t(cache)*)fp->cache;
590     if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
591     if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
592         /* A better way would be to remove the oldest block in the
593          * cache, but here we remove a random one for simplicity. This
594          * should not have a big impact on performance. */
595         for (k = kh_begin(h); k < kh_end(h); ++k)
596             if (kh_exist(h, k)) break;
597         if (k < kh_end(h)) {
598             free(kh_val(h, k).block);
599             kh_del(cache, h, k);
600         }
601     }
602     k = kh_put(cache, h, fp->block_address, &ret);
603     if (ret == 0) return; // if this happens, a bug!
604     p = &kh_val(h, k);
605     p->size = fp->block_length;
606     p->end_offset = fp->block_address + size;
607     p->block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
608     memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
609 }
610 #else
free_cache(BGZF * fp)611 static void free_cache(BGZF *fp) {}
load_block_from_cache(BGZF * fp,int64_t block_address)612 static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
cache_block(BGZF * fp,int size)613 static void cache_block(BGZF *fp, int size) {}
614 #endif
615 
616 /*
617  * Absolute htell in this compressed file.
618  *
619  * Do not confuse with the external bgzf_tell macro which returns the virtual
620  * offset.
621  */
bgzf_htell(BGZF * fp)622 static off_t bgzf_htell(BGZF *fp) {
623     if (fp->mt) {
624         pthread_mutex_lock(&fp->mt->job_pool_m);
625         off_t pos = fp->block_address + fp->block_clength;
626         pthread_mutex_unlock(&fp->mt->job_pool_m);
627         return pos;
628     } else {
629         return htell(fp->fp);
630     }
631 }
632 
bgzf_read_block(BGZF * fp)633 int bgzf_read_block(BGZF *fp)
634 {
635     hts_tpool_result *r;
636 
637     if (fp->mt) {
638     again:
639         if (fp->mt->hit_eof) {
640             // Further reading at EOF will always return 0
641             fp->block_length = 0;
642             return 0;
643         }
644         r = hts_tpool_next_result_wait(fp->mt->out_queue);
645         bgzf_job *j = r ? (bgzf_job *)hts_tpool_result_data(r) : NULL;
646 
647         if (!j || j->errcode == BGZF_ERR_MT) {
648             if (!fp->mt->free_block) {
649                 fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
650                 if (fp->uncompressed_block == NULL) return -1;
651                 fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
652             } // else it's already allocated with malloc, maybe even in-use.
653             mt_destroy(fp->mt);
654             fp->mt = NULL;
655             hts_tpool_delete_result(r, 0);
656 
657             goto single_threaded;
658         }
659 
660         if (j->errcode) {
661             fp->errcode = j->errcode;
662             return -1;
663         }
664 
665         if (j->hit_eof) {
666             if (!fp->last_block_eof && !fp->no_eof_block) {
667                 fp->no_eof_block = 1;
668                 if (hts_verbose>1)
669                     fprintf(stderr, "[W::%s] EOF marker is absent. The input is probably truncated.\n", __func__);
670             }
671             fp->mt->hit_eof = 1;
672         }
673 
674         // Zero length blocks in the middle of a file are (wrongly)
675         // considered as EOF by many callers.  We work around this by
676         // trying again to see if we hit a genuine EOF.
677         if (!j->hit_eof && j->uncomp_len == 0) {
678             fp->last_block_eof = 1;
679             hts_tpool_delete_result(r, 0);
680             goto again;
681         }
682 
683         // block_length=0 and block_offset set by bgzf_seek.
684         if (fp->block_length != 0) fp->block_offset = 0;
685         fp->block_address = j->block_address;
686         fp->block_clength = j->comp_len;
687         fp->block_length = j->uncomp_len;
688         // bgzf_read() can change fp->block_length
689         fp->last_block_eof = (fp->block_length == 0);
690 
691         if ( j->uncomp_len && j->fp->idx_build_otf )
692         {
693             bgzf_index_add_block(j->fp);
694             j->fp->idx->ublock_addr += j->uncomp_len;
695         }
696 
697         // Steal the data block as it's quicker than a memcpy.
698         // We just need to make sure we delay the pool free.
699         if (fp->mt->curr_job) {
700             pthread_mutex_lock(&fp->mt->job_pool_m);
701             pool_free(fp->mt->job_pool, fp->mt->curr_job);
702             pthread_mutex_unlock(&fp->mt->job_pool_m);
703         }
704         fp->uncompressed_block = j->uncomp_data;
705         fp->mt->curr_job = j;
706         if (fp->mt->free_block) {
707             free(fp->mt->free_block); // clear up last non-mt block
708             fp->mt->free_block = NULL;
709         }
710 
711         hts_tpool_delete_result(r, 0);
712         return 0;
713     }
714 
715     uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
716     int count, size, block_length, remaining;
717 
718  single_threaded:
719     size = 0;
720 
721     // Reading an uncompressed file
722     if ( !fp->is_compressed )
723     {
724         count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
725         if (count < 0)  // Error
726         {
727             fp->errcode |= BGZF_ERR_IO;
728             return -1;
729         }
730         else if (count == 0)  // EOF
731         {
732             fp->block_length = 0;
733             return 0;
734         }
735         if (fp->block_length != 0) fp->block_offset = 0;
736         fp->block_address += count;
737         fp->block_length = count;
738         return 0;
739     }
740 
741     // Reading compressed file
742     int64_t block_address;
743     block_address = bgzf_htell(fp);
744     if ( fp->is_gzip && fp->gz_stream ) // is this is an initialized gzip stream?
745     {
746         count = inflate_gzip_block(fp, 0);
747         if ( count<0 )
748         {
749             fp->errcode |= BGZF_ERR_ZLIB;
750             return -1;
751         }
752         fp->block_length = count;
753         fp->block_address = block_address;
754         return 0;
755     }
756     if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
757 
758     // loop to skip empty bgzf blocks
759     while (1)
760     {
761         count = hread(fp->fp, header, sizeof(header));
762         if (count == 0) { // no data read
763             if (!fp->last_block_eof && !fp->no_eof_block && !fp->is_gzip) {
764                 fp->no_eof_block = 1;
765                 if (hts_verbose > 1) {
766                     fprintf(stderr, "[W::%s] EOF marker is absent. The input is probably truncated.\n", __func__);
767                 }
768             }
769             fp->block_length = 0;
770             return 0;
771         }
772         int ret;
773         if ( count != sizeof(header) || (ret=check_header(header))==-2 )
774         {
775             fp->errcode |= BGZF_ERR_HEADER;
776             return -1;
777         }
778         if ( ret==-1 )
779         {
780             // GZIP, not BGZF
781             uint8_t *cblock = (uint8_t*)fp->compressed_block;
782             memcpy(cblock, header, sizeof(header));
783             count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
784             int nskip = 10;
785 
786             // Check optional fields to skip: FLG.FNAME,FLG.FCOMMENT,FLG.FHCRC,FLG.FEXTRA
787             // Note: Some of these fields are untested, I did not have appropriate data available
788             if ( header[3] & 0x4 ) // FLG.FEXTRA
789             {
790                 nskip += unpackInt16(&cblock[nskip]) + 2;
791             }
792             if ( header[3] & 0x8 ) // FLG.FNAME
793             {
794                 while ( nskip<count && cblock[nskip] ) nskip++;
795                 nskip++;
796             }
797             if ( header[3] & 0x10 ) // FLG.FCOMMENT
798             {
799                 while ( nskip<count && cblock[nskip] ) nskip++;
800                 nskip++;
801             }
802             if ( header[3] & 0x2 ) nskip += 2;  //  FLG.FHCRC
803 
804             /* FIXME: Should handle this better.  There's no reason why
805                someone shouldn't include a massively long comment in their
806                gzip stream. */
807             if ( nskip >= count )
808             {
809                 fp->errcode |= BGZF_ERR_HEADER;
810                 return -1;
811             }
812 
813             fp->is_gzip = 1;
814             fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
815             int ret = inflateInit2(fp->gz_stream, -15);
816             if (ret != Z_OK)
817             {
818                 if (hts_verbose >= 1) {
819                     fprintf(stderr, "[E::%s] inflateInit2 failed: %s",
820                             __func__, bgzf_zerr(ret, fp->gz_stream));
821                 }
822                 fp->errcode |= BGZF_ERR_ZLIB;
823                 return -1;
824             }
825             fp->gz_stream->avail_in = count - nskip;
826             fp->gz_stream->next_in  = cblock + nskip;
827             count = inflate_gzip_block(fp, 1);
828             if ( count<0 )
829             {
830                 fp->errcode |= BGZF_ERR_ZLIB;
831                 return -1;
832             }
833             fp->block_length = count;
834             fp->block_address = block_address;
835             if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
836             return 0;
837         }
838         size = count;
839         block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
840         if (block_length < BLOCK_HEADER_LENGTH)
841         {
842             fp->errcode |= BGZF_ERR_HEADER;
843             return -1;
844         }
845         compressed_block = (uint8_t*)fp->compressed_block;
846         memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
847         remaining = block_length - BLOCK_HEADER_LENGTH;
848         count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
849         if (count != remaining) {
850             fp->errcode |= BGZF_ERR_IO;
851             return -1;
852         }
853         size += count;
854         if ((count = inflate_block(fp, block_length)) < 0) {
855             if (hts_verbose >= 2) fprintf(stderr, "[E::%s] inflate_block error %d\n", __func__, count);
856             fp->errcode |= BGZF_ERR_ZLIB;
857             return -1;
858         }
859         fp->last_block_eof = (count == 0);
860         if ( count ) break;     // otherwise an empty bgzf block
861     }
862     if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
863     fp->block_address = block_address;
864     fp->block_length = count;
865     if ( fp->idx_build_otf )
866     {
867         bgzf_index_add_block(fp);
868         fp->idx->ublock_addr += count;
869     }
870     cache_block(fp, size);
871     return 0;
872 }
873 
bgzf_read(BGZF * fp,void * data,size_t length)874 ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
875 {
876     ssize_t bytes_read = 0;
877     uint8_t *output = (uint8_t*)data;
878     if (length <= 0) return 0;
879     assert(fp->is_write == 0);
880     while (bytes_read < length) {
881         int copy_length, available = fp->block_length - fp->block_offset;
882         uint8_t *buffer;
883         if (available <= 0) {
884             int ret = bgzf_read_block(fp);
885             if (ret != 0) {
886                 if (hts_verbose >= 2) {
887                     fprintf(stderr, "[E::%s] bgzf_read_block error %d after %zd of %zu bytes\n", __func__, ret, bytes_read, length);
888                 }
889                 fp->errcode |= BGZF_ERR_ZLIB;
890                 return -1;
891             }
892             available = fp->block_length - fp->block_offset;
893             if (available <= 0) break;
894         }
895         copy_length = length - bytes_read < available? length - bytes_read : available;
896         buffer = (uint8_t*)fp->uncompressed_block;
897         memcpy(output, buffer + fp->block_offset, copy_length);
898         fp->block_offset += copy_length;
899         output += copy_length;
900         bytes_read += copy_length;
901 
902         // For raw gzip streams this avoids short reads.
903         if (fp->block_offset == fp->block_length) {
904             fp->block_address = bgzf_htell(fp);
905             fp->block_offset = fp->block_length = 0;
906         }
907     }
908 
909     fp->uncompressed_address += bytes_read;
910 
911     return bytes_read;
912 }
913 
bgzf_raw_read(BGZF * fp,void * data,size_t length)914 ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
915 {
916     ssize_t ret = hread(fp->fp, data, length);
917     if (ret < 0) fp->errcode |= BGZF_ERR_IO;
918     return ret;
919 }
920 
921 #ifdef BGZF_MT
922 
bgzf_encode_func(void * arg)923 void *bgzf_encode_func(void *arg) {
924     bgzf_job *j = (bgzf_job *)arg;
925 
926     j->comp_len = BGZF_MAX_BLOCK_SIZE;
927     int ret = bgzf_compress(j->comp_data, &j->comp_len,
928                             j->uncomp_data, j->uncomp_len,
929                             j->fp->compress_level);
930     if (ret != 0)
931         j->errcode |= BGZF_ERR_ZLIB;
932 
933     return arg;
934 }
935 
936 // Our input block has already been decoded by bgzf_mt_read_block().
937 // We need to split that into a fetch block (compressed) and make this
938 // do the actual decompression step.
bgzf_decode_func(void * arg)939 void *bgzf_decode_func(void *arg) {
940     bgzf_job *j = (bgzf_job *)arg;
941 
942     j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
943     int ret = bgzf_uncompress(j->uncomp_data, &j->uncomp_len,
944                               j->comp_data+18, j->comp_len-18);
945     if (ret != 0)
946         j->errcode |= BGZF_ERR_ZLIB;
947 
948     return arg;
949 }
950 
951 /*
952  * Nul function so we can dispatch a job with the correct serial
953  * to mark failure or to indicate an empty read (EOF).
954  */
bgzf_nul_func(void * arg)955 void *bgzf_nul_func(void *arg) { return arg; }
956 
957 /*
958  * Takes compressed blocks off the results queue and calls hwrite to
959  * punt them to the output stream.
960  *
961  * Returns NULL when no more are left, or -1 on error
962  */
bgzf_mt_writer(void * vp)963 static void *bgzf_mt_writer(void *vp) {
964     BGZF *fp = (BGZF *)vp;
965     mtaux_t *mt = fp->mt;
966     hts_tpool_result *r;
967 
968     // Iterates until result queue is shutdown, where it returns NULL.
969     while ((r = hts_tpool_next_result_wait(mt->out_queue))) {
970         bgzf_job *j = (bgzf_job *)hts_tpool_result_data(r);
971         assert(j);
972 
973         if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len) {
974             fp->errcode |= BGZF_ERR_IO;
975             goto err;
976         }
977 
978         /*
979          * Periodically call hflush (which calls fsync when on a file).
980          * This avoids the fsync being done at the bgzf_close stage,
981          * which can sometimes cause signficant delays.  As this is in
982          * a separate thread, spreading the sync delays throughout the
983          * program execution seems better.
984          * Frequency of 1/512 has been chosen by experimentation
985          * across local XFS, NFS and Lustre tests.
986          */
987         if (++mt->flush_pending % 512 == 0)
988             if (hflush(fp->fp) != 0)
989                 goto err;
990 
991 
992         hts_tpool_delete_result(r, 0);
993 
994         // Also updated by main thread
995         pthread_mutex_lock(&mt->job_pool_m);
996         pool_free(mt->job_pool, j);
997         mt->jobs_pending--;
998         pthread_mutex_unlock(&mt->job_pool_m);
999     }
1000 
1001     if (hflush(fp->fp) != 0)
1002         goto err;
1003 
1004     hts_tpool_process_destroy(mt->out_queue);
1005 
1006     return NULL;
1007 
1008  err:
1009     hts_tpool_process_destroy(mt->out_queue);
1010     return (void *)-1;
1011 }
1012 
1013 
1014 /*
1015  * Reads a compressed block of data using hread and dispatches it to
1016  * the thread pool for decompression.  This is the analogue of the old
1017  * non-threaded bgzf_read_block() function, but without modifying fp
1018  * in any way (except for the read offset).  All output goes via the
1019  * supplied bgzf_job struct.
1020  *
1021  * Returns NULL when no more are left, or -1 on error
1022  */
bgzf_mt_read_block(BGZF * fp,bgzf_job * j)1023 int bgzf_mt_read_block(BGZF *fp, bgzf_job *j)
1024 {
1025     uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1026     int count, size = 0, block_length, remaining;
1027 
1028     // NOTE: Guaranteed to be compressed as we block multi-threading in
1029     // uncompressed mode.  However it may be gzip compression instead
1030     // of bgzf.
1031 
1032     // Reading compressed file
1033     int64_t block_address;
1034     block_address = htell(fp->fp);
1035 
1036     if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1037     count = hpeek(fp->fp, header, sizeof(header));
1038     if (count == 0) // no data read
1039         return -1;
1040     int ret;
1041     if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1042     {
1043         j->errcode |= BGZF_ERR_HEADER;
1044         return -1;
1045     }
1046     if (ret == -1) {
1047         j->errcode |= BGZF_ERR_MT;
1048         return -1;
1049     }
1050 
1051     count = hread(fp->fp, header, sizeof(header));
1052     if (count != sizeof(header)) // no data read
1053         return -1;
1054 
1055     size = count;
1056     block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1057     if (block_length < BLOCK_HEADER_LENGTH) {
1058         j->errcode |= BGZF_ERR_HEADER;
1059         return -1;
1060     }
1061     compressed_block = (uint8_t*)j->comp_data;
1062     memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1063     remaining = block_length - BLOCK_HEADER_LENGTH;
1064     count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1065     if (count != remaining) {
1066         j->errcode |= BGZF_ERR_IO;
1067         return -1;
1068     }
1069     size += count;
1070     j->comp_len = block_length;
1071     j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1072     j->block_address = block_address;
1073     j->fp = fp;
1074     j->errcode = 0;
1075 
1076     return 0;
1077 }
1078 
1079 
bgzf_check_EOF_common(BGZF * fp)1080 static int bgzf_check_EOF_common(BGZF *fp)
1081 {
1082     uint8_t buf[28];
1083     off_t offset = htell(fp->fp);
1084     if (hseek(fp->fp, -28, SEEK_END) < 0) {
1085         if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
1086         else return -1;
1087     }
1088     if ( hread(fp->fp, buf, 28) != 28 ) return -1;
1089     if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
1090     return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
1091 }
1092 
1093 /*
1094  * Checks EOF from the reader thread.
1095  */
bgzf_mt_eof(BGZF * fp)1096 static void bgzf_mt_eof(BGZF *fp) {
1097     mtaux_t *mt = fp->mt;
1098 
1099     pthread_mutex_lock(&mt->job_pool_m);
1100     mt->eof = bgzf_check_EOF_common(fp);
1101     pthread_mutex_unlock(&mt->job_pool_m);
1102     pthread_cond_signal(&mt->command_c);
1103 }
1104 
1105 
1106 /*
1107  * Performs the seek (called by reader thread).
1108  *
1109  * This simply drains the entire queue, throwing away blocks, seeks,
1110  * and starts it up again.  Brute force, but maybe sufficient.
1111  */
bgzf_mt_seek(BGZF * fp)1112 static void bgzf_mt_seek(BGZF *fp) {
1113     mtaux_t *mt = fp->mt;
1114 
1115     hts_tpool_process_reset(mt->out_queue, 0);
1116     pthread_mutex_lock(&mt->job_pool_m);
1117     mt->command = NONE;
1118     mt->errcode = 0;
1119 
1120     if (hseek(fp->fp, mt->block_address, SEEK_SET) < 0)
1121         mt->errcode = BGZF_ERR_IO;
1122 
1123     pthread_mutex_unlock(&mt->job_pool_m);
1124     pthread_cond_signal(&mt->command_c);
1125 }
1126 
bgzf_mt_reader(void * vp)1127 static void *bgzf_mt_reader(void *vp) {
1128     BGZF *fp = (BGZF *)vp;
1129     mtaux_t *mt = fp->mt;
1130 
1131 restart:
1132     pthread_mutex_lock(&mt->job_pool_m);
1133     bgzf_job *j = pool_alloc(mt->job_pool);
1134     pthread_mutex_unlock(&mt->job_pool_m);
1135     j->errcode = 0;
1136     j->uncomp_len = 0;
1137     j->hit_eof = 0;
1138 
1139     while (bgzf_mt_read_block(fp, j) == 0) {
1140         // Dispatch
1141         hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_decode_func, j);
1142 
1143         // Check for command
1144         pthread_mutex_lock(&mt->command_m);
1145         switch (mt->command) {
1146         case SEEK:
1147             bgzf_mt_seek(fp);
1148             pthread_mutex_unlock(&mt->command_m);
1149             goto restart;
1150 
1151         case HAS_EOF:
1152             bgzf_mt_eof(fp);
1153             break;
1154 
1155         case CLOSE:
1156             pthread_cond_signal(&mt->command_c);
1157             pthread_mutex_unlock(&mt->command_m);
1158             hts_tpool_process_destroy(mt->out_queue);
1159             pthread_exit(NULL);
1160 
1161         default:
1162             break;
1163         }
1164         pthread_mutex_unlock(&mt->command_m);
1165 
1166         // Allocate buffer for next block
1167         pthread_mutex_lock(&mt->job_pool_m);
1168         j = pool_alloc(mt->job_pool);
1169         pthread_mutex_unlock(&mt->job_pool_m);
1170         j->errcode = 0;
1171         j->uncomp_len = 0;
1172         j->hit_eof = 0;
1173     }
1174 
1175     if (j->errcode == BGZF_ERR_MT) {
1176         // Attempt to multi-thread decode a raw gzip stream cannot be done.
1177         // We tear down the multi-threaded decoder and revert to the old code.
1178         hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_nul_func, j);
1179         hts_tpool_process_ref_decr(mt->out_queue);
1180         pthread_exit(&j->errcode);
1181     }
1182 
1183     // Dispatch an empty block so EOF is spotted.
1184     // We also use this mechanism for returning errors, in which case
1185     // j->errcode is set already.
1186 
1187     j->hit_eof = 1;
1188     hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_nul_func, j);
1189     if (j->errcode != 0) {
1190         hts_tpool_process_destroy(mt->out_queue);
1191         pthread_exit(&j->errcode);
1192     }
1193 
1194     // We hit EOF so can stop reading, but we may get a subsequent
1195     // seek request.  In this case we need to restart the reader.
1196     //
1197     // To handle this we wait on a condition variable and then
1198     // monitor the command. (This could be either seek or close.)
1199     for (;;) {
1200         pthread_mutex_lock(&mt->command_m);
1201         if (mt->command == NONE)
1202             pthread_cond_wait(&mt->command_c, &mt->command_m);
1203         switch(mt->command) {
1204         default:
1205             pthread_mutex_unlock(&mt->command_m);
1206             break;
1207 
1208         case SEEK:
1209             bgzf_mt_seek(fp);
1210             pthread_mutex_unlock(&mt->command_m);
1211             goto restart;
1212 
1213         case HAS_EOF:
1214             bgzf_mt_eof(fp);
1215             pthread_mutex_unlock(&mt->command_m);
1216             continue;
1217 
1218         case CLOSE:
1219             pthread_cond_signal(&mt->command_c);
1220             pthread_mutex_unlock(&mt->command_m);
1221             hts_tpool_process_destroy(mt->out_queue);
1222             pthread_exit(NULL);
1223         }
1224     }
1225 }
1226 
bgzf_thread_pool(BGZF * fp,hts_tpool * pool,int qsize)1227 int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
1228     // No gain from multi-threading when not compressed
1229     if (!fp->is_compressed)
1230         return 0;
1231 
1232     mtaux_t *mt;
1233     mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
1234     if (!mt) return -1;
1235     fp->mt = mt;
1236 
1237     mt->pool = pool;
1238     mt->n_threads = hts_tpool_size(pool);
1239     if (!qsize)
1240         qsize = mt->n_threads*2;
1241     if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0))) {
1242         free(mt);
1243         return -1;
1244     }
1245     hts_tpool_process_ref_incr(mt->out_queue);
1246 
1247     mt->job_pool = pool_create(sizeof(bgzf_job));
1248 
1249     pthread_mutex_init(&mt->job_pool_m, NULL);
1250     pthread_mutex_init(&mt->command_m, NULL);
1251     pthread_cond_init(&mt->command_c, NULL);
1252     mt->flush_pending = 0;
1253     mt->jobs_pending = 0;
1254     mt->free_block = fp->uncompressed_block; // currently in-use block
1255     pthread_create(&mt->io_task, NULL,
1256                    fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);
1257 
1258     return 0;
1259 }
1260 
bgzf_mt(BGZF * fp,int n_threads,int n_sub_blks)1261 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1262 {
1263     // No gain from multi-threading when not compressed
1264     if (!fp->is_compressed || fp->is_gzip)
1265         return 0;
1266 
1267     if (n_threads < 1) return -1;
1268     hts_tpool *p = hts_tpool_init(n_threads);
1269     if (!p)
1270         return -1;
1271 
1272     if (bgzf_thread_pool(fp, p, 0) != 0) {
1273         hts_tpool_destroy(p);
1274         return -1;
1275     }
1276 
1277     fp->mt->own_pool = 1;
1278 
1279     return 0;
1280 }
1281 
mt_destroy(mtaux_t * mt)1282 static void mt_destroy(mtaux_t *mt)
1283 {
1284     pthread_mutex_lock(&mt->command_m);
1285     mt->command = CLOSE;
1286     pthread_cond_signal(&mt->command_c);
1287     hts_tpool_wake_dispatch(mt->out_queue); // unstick the reader
1288     pthread_mutex_unlock(&mt->command_m);
1289 
1290     // Destroying the queue first forces the writer to exit.
1291     hts_tpool_process_destroy(mt->out_queue);
1292     pthread_join(mt->io_task, NULL);
1293 
1294     pthread_mutex_destroy(&mt->job_pool_m);
1295     pthread_mutex_destroy(&mt->command_m);
1296     pthread_cond_destroy(&mt->command_c);
1297     if (mt->curr_job)
1298         pool_free(mt->job_pool, mt->curr_job);
1299 
1300     if (mt->own_pool)
1301         hts_tpool_destroy(mt->pool);
1302 
1303     pool_destroy(mt->job_pool);
1304 
1305     free(mt);
1306     fflush(stderr);
1307 }
1308 
mt_queue(BGZF * fp)1309 static int mt_queue(BGZF *fp)
1310 {
1311     mtaux_t *mt = fp->mt;
1312 
1313     // Also updated by writer thread
1314     pthread_mutex_lock(&mt->job_pool_m);
1315     bgzf_job *j = pool_alloc(mt->job_pool);
1316     mt->jobs_pending++;
1317     pthread_mutex_unlock(&mt->job_pool_m);
1318 
1319     j->fp = fp;
1320     j->errcode = 0;
1321     j->uncomp_len  = fp->block_offset;
1322     memcpy(j->uncomp_data, fp->uncompressed_block, j->uncomp_len);
1323 
1324     // Need non-block vers & job_pending?
1325     hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_encode_func, j);
1326 
1327     fp->block_offset = 0;
1328     return 0;
1329 }
1330 
mt_flush_queue(BGZF * fp)1331 static int mt_flush_queue(BGZF *fp)
1332 {
1333     mtaux_t *mt = fp->mt;
1334 
1335     // Drain the encoder jobs.
1336     // We cannot use hts_tpool_flush here as it can cause deadlock if
1337     // the queue is full up of decoder tasks.  The best solution would
1338     // be to have one input queue per type of job, but we don't right now.
1339     //hts_tpool_flush(mt->pool);
1340     pthread_mutex_lock(&mt->job_pool_m);
1341     while (mt->jobs_pending != 0) {
1342         pthread_mutex_unlock(&mt->job_pool_m);
1343         usleep(10000); // FIXME: replace by condition variable
1344         pthread_mutex_lock(&mt->job_pool_m);
1345     }
1346     pthread_mutex_unlock(&mt->job_pool_m);
1347 
1348     // Wait on bgzf_mt_writer to drain the queue
1349     if (hts_tpool_process_flush(mt->out_queue) != 0)
1350         return -1;
1351 
1352     return (fp->errcode == 0)? 0 : -1;
1353 }
1354 
lazy_flush(BGZF * fp)1355 static int lazy_flush(BGZF *fp)
1356 {
1357     if (fp->mt)
1358         return fp->block_offset ? mt_queue(fp) : 0;
1359     else
1360         return bgzf_flush(fp);
1361 }
1362 
1363 #else  // ~ #ifdef BGZF_MT
1364 
bgzf_mt(BGZF * fp,int n_threads,int n_sub_blks)1365 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1366 {
1367     return 0;
1368 }
1369 
lazy_flush(BGZF * fp)1370 static inline int lazy_flush(BGZF *fp)
1371 {
1372     return bgzf_flush(fp);
1373 }
1374 
1375 #endif // ~ #ifdef BGZF_MT
1376 
bgzf_flush(BGZF * fp)1377 int bgzf_flush(BGZF *fp)
1378 {
1379     if (!fp->is_write) return 0;
1380 #ifdef BGZF_MT
1381     if (fp->mt) {
1382         int ret = 0;
1383         if (fp->block_offset) ret = mt_queue(fp);
1384         return ret ? ret : mt_flush_queue(fp);
1385     }
1386 #endif
1387     while (fp->block_offset > 0) {
1388         int block_length;
1389         if ( fp->idx_build_otf )
1390         {
1391             bgzf_index_add_block(fp);
1392             fp->idx->ublock_addr += fp->block_offset;
1393         }
1394         block_length = deflate_block(fp, fp->block_offset);
1395         if (block_length < 0) {
1396             if (hts_verbose >= 3) fprintf(stderr, "[E::%s] deflate_block error %d\n", __func__, block_length);
1397             return -1;
1398         }
1399         if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
1400             if (hts_verbose >= 1) fprintf(stderr, "[E::%s] hwrite error (wrong size)\n", __func__);
1401             fp->errcode |= BGZF_ERR_IO; // possibly truncated file
1402             return -1;
1403         }
1404         fp->block_address += block_length;
1405     }
1406     return 0;
1407 }
1408 
bgzf_flush_try(BGZF * fp,ssize_t size)1409 int bgzf_flush_try(BGZF *fp, ssize_t size)
1410 {
1411     if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
1412     return 0;
1413 }
1414 
bgzf_write(BGZF * fp,const void * data,size_t length)1415 ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
1416 {
1417     if ( !fp->is_compressed )
1418         return hwrite(fp->fp, data, length);
1419 
1420     const uint8_t *input = (const uint8_t*)data;
1421     ssize_t remaining = length;
1422     assert(fp->is_write);
1423     while (remaining > 0) {
1424         uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
1425         int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
1426         if (copy_length > remaining) copy_length = remaining;
1427         memcpy(buffer + fp->block_offset, input, copy_length);
1428         fp->block_offset += copy_length;
1429         input += copy_length;
1430         remaining -= copy_length;
1431         if (fp->block_offset == BGZF_BLOCK_SIZE) {
1432             if (lazy_flush(fp) != 0) return -1;
1433         }
1434     }
1435     return length - remaining;
1436 }
1437 
bgzf_block_write(BGZF * fp,const void * data,size_t length)1438 ssize_t bgzf_block_write(BGZF *fp, const void *data, size_t length)
1439 {
1440     if ( !fp->is_compressed )
1441         return hwrite(fp->fp, data, length);
1442     const uint8_t *input = (const uint8_t*)data;
1443     ssize_t remaining = length;
1444     assert(fp->is_write);
1445     uint64_t current_block; //keep track of current block
1446     uint64_t ublock_size; // amount of uncompressed data to be fed into next block
1447     while (remaining > 0) {
1448         current_block = fp->idx->moffs - fp->idx->noffs;
1449         ublock_size = fp->idx->offs[current_block+1].uaddr-fp->idx->offs[current_block].uaddr;
1450         uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
1451         int copy_length = ublock_size - fp->block_offset;
1452         if (copy_length > remaining) copy_length = remaining;
1453         memcpy(buffer + fp->block_offset, input, copy_length);
1454         fp->block_offset += copy_length;
1455         input += copy_length;
1456         remaining -= copy_length;
1457         if (fp->block_offset == ublock_size) {
1458             if (lazy_flush(fp) != 0) return -1;
1459             fp->idx->noffs--;  // decrement noffs to track the blocks
1460         }
1461     }
1462     return length - remaining;
1463 }
1464 
1465 
bgzf_raw_write(BGZF * fp,const void * data,size_t length)1466 ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
1467 {
1468     ssize_t ret = hwrite(fp->fp, data, length);
1469     if (ret < 0) fp->errcode |= BGZF_ERR_IO;
1470     return ret;
1471 }
1472 
bgzf_close(BGZF * fp)1473 int bgzf_close(BGZF* fp)
1474 {
1475     int ret, block_length;
1476     if (fp == 0) return -1;
1477     if (fp->is_write && fp->is_compressed) {
1478         if (bgzf_flush(fp) != 0) return -1;
1479         fp->compress_level = -1;
1480         block_length = deflate_block(fp, 0); // write an empty block
1481         if (block_length < 0) {
1482             if (hts_verbose >= 3) fprintf(stderr, "[E::%s] deflate_block error %d\n", __func__, block_length);
1483             return -1;
1484         }
1485         if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
1486             || hflush(fp->fp) != 0) {
1487             if (hts_verbose >= 1) fprintf(stderr, "[E::%s] file write error\n", __func__);
1488             fp->errcode |= BGZF_ERR_IO;
1489             return -1;
1490         }
1491     }
1492 #ifdef BGZF_MT
1493     if (fp->mt) {
1494         if (!fp->mt->free_block)
1495             fp->uncompressed_block = NULL;
1496         mt_destroy(fp->mt);
1497     }
1498 #endif
1499     if ( fp->is_gzip )
1500     {
1501         if (fp->gz_stream == NULL) ret = Z_OK;
1502         else if (!fp->is_write) ret = inflateEnd(fp->gz_stream);
1503         else ret = deflateEnd(fp->gz_stream);
1504         if (ret != Z_OK && hts_verbose >= 1)
1505             fprintf(stderr, "[E::%s] inflateEnd/deflateEnd failed: %s\n",
1506                     __func__, bgzf_zerr(ret, NULL));
1507         free(fp->gz_stream);
1508     }
1509     ret = hclose(fp->fp);
1510     if (ret != 0) return -1;
1511     bgzf_index_destroy(fp);
1512     free(fp->uncompressed_block);
1513     free_cache(fp);
1514     free(fp);
1515     return 0;
1516 }
1517 
bgzf_set_cache_size(BGZF * fp,int cache_size)1518 void bgzf_set_cache_size(BGZF *fp, int cache_size)
1519 {
1520     if (fp) fp->cache_size = cache_size;
1521 }
1522 
bgzf_check_EOF(BGZF * fp)1523 int bgzf_check_EOF(BGZF *fp) {
1524     int has_eof;
1525 
1526     if (fp->mt) {
1527         pthread_mutex_lock(&fp->mt->command_m);
1528         fp->mt->command = HAS_EOF;
1529         pthread_cond_signal(&fp->mt->command_c);
1530         hts_tpool_wake_dispatch(fp->mt->out_queue);
1531         pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
1532         has_eof = fp->mt->eof;
1533         pthread_mutex_unlock(&fp->mt->command_m);
1534     } else {
1535         has_eof = bgzf_check_EOF_common(fp);
1536     }
1537 
1538     fp->no_eof_block = (has_eof == 0);
1539 
1540     return has_eof;
1541 }
1542 
bgzf_seek(BGZF * fp,int64_t pos,int where)1543 int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
1544 {
1545     int block_offset;
1546     int64_t block_address;
1547 
1548     if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
1549         fp->errcode |= BGZF_ERR_MISUSE;
1550         return -1;
1551     }
1552     block_offset = pos & 0xFFFF;
1553     block_address = pos >> 16;
1554 
1555     if (fp->mt) {
1556         // The reader runs asynchronous and does loops of:
1557         //    Read block
1558         //    Check & process command
1559         //    Dispatch decode job
1560         //
1561         // Once at EOF it then switches to loops of
1562         //    Wait for command
1563         //    Process command (possibly switching back to above loop).
1564         //
1565         // To seek we therefore send the reader thread a SEEK command,
1566         // waking it up if blocked in dispatch and signalling if
1567         // waiting for a command.  We then wait for the response so we
1568         // know the seek succeeded.
1569         pthread_mutex_lock(&fp->mt->command_m);
1570         fp->mt->hit_eof = 0;
1571         fp->mt->command = SEEK;
1572         fp->mt->block_address = block_address;
1573         pthread_cond_signal(&fp->mt->command_c);
1574         hts_tpool_wake_dispatch(fp->mt->out_queue);
1575         pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
1576 
1577         fp->block_length = 0;  // indicates current block has not been loaded
1578         fp->block_address = block_address;
1579         fp->block_offset = block_offset;
1580 
1581         pthread_mutex_unlock(&fp->mt->command_m);
1582     } else {
1583         if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
1584             fp->errcode |= BGZF_ERR_IO;
1585             return -1;
1586         }
1587         fp->block_length = 0;  // indicates current block has not been loaded
1588         fp->block_address = block_address;
1589         fp->block_offset = block_offset;
1590     }
1591 
1592     return 0;
1593 }
1594 
bgzf_is_bgzf(const char * fn)1595 int bgzf_is_bgzf(const char *fn)
1596 {
1597     uint8_t buf[16];
1598     int n;
1599     hFILE *fp;
1600     if ((fp = hopen(fn, "r")) == 0) return 0;
1601     n = hread(fp, buf, 16);
1602     if (hclose(fp) < 0) return 0;
1603     if (n != 16) return 0;
1604     return check_header(buf) == 0? 1 : 0;
1605 }
1606 
bgzf_compression(BGZF * fp)1607 int bgzf_compression(BGZF *fp)
1608 {
1609     return (!fp->is_compressed)? no_compression : (fp->is_gzip)? gzip : bgzf;
1610 }
1611 
bgzf_getc(BGZF * fp)1612 int bgzf_getc(BGZF *fp)
1613 {
1614     if (fp->block_offset+1 < fp->block_length) {
1615         fp->uncompressed_address++;
1616         return ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
1617     }
1618 
1619     int c;
1620     if (fp->block_offset >= fp->block_length) {
1621         if (bgzf_read_block(fp) != 0) return -2; /* error */
1622         if (fp->block_length == 0) return -1; /* end-of-file */
1623     }
1624     c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
1625     if (fp->block_offset == fp->block_length) {
1626         fp->block_address = bgzf_htell(fp);
1627         fp->block_offset = 0;
1628         fp->block_length = 0;
1629     }
1630     fp->uncompressed_address++;
1631     return c;
1632 }
1633 
1634 #ifndef kroundup32
1635 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
1636 #endif
1637 
bgzf_getline(BGZF * fp,int delim,kstring_t * str)1638 int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
1639 {
1640     int l, state = 0;
1641     str->l = 0;
1642     do {
1643         if (fp->block_offset >= fp->block_length) {
1644             if (bgzf_read_block(fp) != 0) { state = -2; break; }
1645             if (fp->block_length == 0) { state = -1; break; }
1646         }
1647         unsigned char *buf = fp->uncompressed_block;
1648         for (l = fp->block_offset; l < fp->block_length && buf[l] != delim; ++l);
1649         if (l < fp->block_length) state = 1;
1650         l -= fp->block_offset;
1651         if (str->l + l + 1 >= str->m) {
1652             str->m = str->l + l + 2;
1653             kroundup32(str->m);
1654             str->s = (char*)realloc(str->s, str->m);
1655         }
1656         memcpy(str->s + str->l, buf + fp->block_offset, l);
1657         str->l += l;
1658         fp->block_offset += l + 1;
1659         if (fp->block_offset >= fp->block_length) {
1660             fp->block_address = bgzf_htell(fp);
1661             fp->block_offset = 0;
1662             fp->block_length = 0;
1663         }
1664     } while (state == 0);
1665     if (str->l == 0 && state < 0) return state;
1666     fp->uncompressed_address += str->l + 1;
1667     if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
1668     str->s[str->l] = 0;
1669     return str->l;
1670 }
1671 
bgzf_index_destroy(BGZF * fp)1672 void bgzf_index_destroy(BGZF *fp)
1673 {
1674     if ( !fp->idx ) return;
1675     free(fp->idx->offs);
1676     free(fp->idx);
1677     fp->idx = NULL;
1678     fp->idx_build_otf = 0;
1679 }
1680 
bgzf_index_build_init(BGZF * fp)1681 int bgzf_index_build_init(BGZF *fp)
1682 {
1683     bgzf_index_destroy(fp);
1684     fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
1685     if ( !fp->idx ) return -1;
1686     fp->idx_build_otf = 1;  // build index on the fly
1687     return 0;
1688 }
1689 
bgzf_index_add_block(BGZF * fp)1690 int bgzf_index_add_block(BGZF *fp)
1691 {
1692     fp->idx->noffs++;
1693     if ( fp->idx->noffs > fp->idx->moffs )
1694     {
1695         fp->idx->moffs = fp->idx->noffs;
1696         kroundup32(fp->idx->moffs);
1697         fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
1698         if ( !fp->idx->offs ) return -1;
1699     }
1700     fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
1701     fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
1702     return 0;
1703 }
1704 
hwrite_uint64(uint64_t x,hFILE * f)1705 static inline int hwrite_uint64(uint64_t x, hFILE *f)
1706 {
1707     if (ed_is_big()) x = ed_swap_8(x);
1708     if (hwrite(f, &x, sizeof(x)) != sizeof(x)) return -1;
1709     return 0;
1710 }
1711 
get_name_suffix(const char * bname,const char * suffix)1712 static char * get_name_suffix(const char *bname, const char *suffix)
1713 {
1714     size_t len = strlen(bname) + strlen(suffix) + 1;
1715     char *buff = malloc(len);
1716     if (!buff) return NULL;
1717     snprintf(buff, len, "%s%s", bname, suffix);
1718     return buff;
1719 }
1720 
bgzf_index_dump_hfile(BGZF * fp,struct hFILE * idx,const char * name)1721 int bgzf_index_dump_hfile(BGZF *fp, struct hFILE *idx, const char *name)
1722 {
1723     // Note that the index contains one extra record when indexing files opened
1724     // for reading. The terminating record is not present when opened for writing.
1725     // This is not a bug.
1726 
1727     int i, save_errno;
1728 
1729     if (!fp->idx) {
1730         if (hts_verbose > 1) {
1731             fprintf(stderr, "[E::%s] Called for BGZF handle with no index",
1732                     __func__);
1733             errno = EINVAL;
1734             return -1;
1735         }
1736     }
1737 
1738     if (bgzf_flush(fp) != 0) return -1;
1739 
1740     if (hwrite_uint64(fp->idx->noffs - 1, idx) < 0) goto fail;
1741     for (i=1; i<fp->idx->noffs; i++)
1742     {
1743         if (hwrite_uint64(fp->idx->offs[i].caddr, idx) < 0) goto fail;
1744         if (hwrite_uint64(fp->idx->offs[i].uaddr, idx) < 0) goto fail;
1745     }
1746     return 0;
1747 
1748  fail:
1749     save_errno = errno;
1750     if (hts_verbose > 1) {
1751         fprintf(stderr, "[E::%s] Error writing to %s : %s\n",
1752                 __func__, name ? name : "index", strerror(errno));
1753     }
1754     errno = save_errno;
1755     return -1;
1756 }
1757 
bgzf_index_dump(BGZF * fp,const char * bname,const char * suffix)1758 int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
1759 {
1760     const char *name = bname, *msg = NULL;
1761     char *tmp = NULL;
1762     hFILE *idx = NULL;
1763     int save_errno;
1764 
1765     if (!fp->idx) {
1766         if (hts_verbose > 1) {
1767             fprintf(stderr, "[E::%s] Called for BGZF handle with no index",
1768                     __func__);
1769             errno = EINVAL;
1770             return -1;
1771         }
1772     }
1773 
1774     if ( suffix )
1775     {
1776         tmp = get_name_suffix(bname, suffix);
1777         if ( !tmp ) return -1;
1778         name = tmp;
1779     }
1780 
1781     idx = hopen(name, "wb");
1782     if ( !idx ) {
1783         msg = "Error opening";
1784         goto fail;
1785     }
1786 
1787     if (bgzf_index_dump_hfile(fp, idx, name) != 0) goto fail;
1788 
1789     if (hclose(idx) < 0)
1790     {
1791         idx = NULL;
1792         msg = "Error on closing";
1793         goto fail;
1794     }
1795 
1796     free(tmp);
1797     return 0;
1798 
1799  fail:
1800     save_errno = errno;
1801     if (hts_verbose > 1 && msg != NULL)
1802     {
1803         fprintf(stderr, "[E::%s] %s %s : %s\n",
1804                 __func__, msg, name, strerror(errno));
1805     }
1806     if (idx) hclose_abruptly(idx);
1807     free(tmp);
1808     errno = save_errno;
1809     return -1;
1810 }
1811 
hread_uint64(uint64_t * xptr,hFILE * f)1812 static inline int hread_uint64(uint64_t *xptr, hFILE *f)
1813 {
1814     if (hread(f, xptr, sizeof(*xptr)) != sizeof(*xptr)) return -1;
1815     if (ed_is_big()) ed_swap_8p(xptr);
1816     return 0;
1817 }
1818 
bgzf_index_load_hfile(BGZF * fp,struct hFILE * idx,const char * name)1819 int bgzf_index_load_hfile(BGZF *fp, struct hFILE *idx, const char *name)
1820 {
1821     int save_errno;
1822     fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
1823     if (fp->idx == NULL) goto fail;
1824     uint64_t x;
1825     if (hread_uint64(&x, idx) < 0) goto fail;
1826 
1827     fp->idx->noffs = fp->idx->moffs = x + 1;
1828     fp->idx->offs  = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
1829     if (fp->idx->offs == NULL) goto fail;
1830     fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
1831 
1832     int i;
1833     for (i=1; i<fp->idx->noffs; i++)
1834     {
1835         if (hread_uint64(&fp->idx->offs[i].caddr, idx) < 0) goto fail;
1836         if (hread_uint64(&fp->idx->offs[i].uaddr, idx) < 0) goto fail;
1837     }
1838 
1839     return 0;
1840 
1841  fail:
1842     save_errno = errno;
1843     if (hts_verbose > 1)
1844     {
1845         fprintf(stderr, "[E::%s] Error reading %s : %s\n",
1846                 __func__, name ? name : "index", strerror(errno));
1847     }
1848     if (fp->idx) {
1849         free(fp->idx->offs);
1850         free(fp->idx);
1851         fp->idx = NULL;
1852     }
1853     errno = save_errno;
1854     return -1;
1855 }
1856 
bgzf_index_load(BGZF * fp,const char * bname,const char * suffix)1857 int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
1858 {
1859     int save_errno;
1860     const char *name = bname, *msg = NULL;
1861     char *tmp = NULL;
1862     hFILE *idx = NULL;
1863     if ( suffix )
1864     {
1865         tmp = get_name_suffix(bname, suffix);
1866         if ( !tmp ) return -1;
1867         name = tmp;
1868     }
1869 
1870     idx = hopen(name, "rb");
1871     if ( !idx ) {
1872         msg = "Error opening";
1873         goto fail;
1874     }
1875 
1876     if (bgzf_index_load_hfile(fp, idx, name) != 0) goto fail;
1877 
1878     if (hclose(idx) != 0) {
1879         idx = NULL;
1880         msg = "Error closing";
1881         goto fail;
1882     }
1883 
1884     free(tmp);
1885     return 0;
1886 
1887  fail:
1888     save_errno = errno;
1889     if (hts_verbose > 1 && msg != NULL) {
1890         fprintf(stderr, "[E::%s] %s %s : %s\n",
1891                 __func__, msg, name, strerror(errno));
1892     }
1893     if (idx) hclose_abruptly(idx);
1894     free(tmp);
1895     errno = save_errno;
1896     return -1;
1897 }
1898 
bgzf_useek(BGZF * fp,long uoffset,int where)1899 int bgzf_useek(BGZF *fp, long uoffset, int where)
1900 {
1901     if ( !fp->is_compressed )
1902     {
1903         if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
1904         {
1905             fp->errcode |= BGZF_ERR_IO;
1906             return -1;
1907         }
1908         fp->block_length = 0;  // indicates current block has not been loaded
1909         fp->block_address = uoffset;
1910         fp->block_offset = 0;
1911         if (bgzf_read_block(fp) < 0) {
1912             fp->errcode |= BGZF_ERR_IO;
1913             return -1;
1914         }
1915         fp->uncompressed_address = uoffset;
1916         return 0;
1917     }
1918 
1919     if ( !fp->idx )
1920     {
1921         fp->errcode |= BGZF_ERR_IO;
1922         return -1;
1923     }
1924 
1925     // binary search
1926     int ilo = 0, ihi = fp->idx->noffs - 1;
1927     while ( ilo<=ihi )
1928     {
1929         int i = (ilo+ihi)*0.5;
1930         if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
1931         else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
1932         else break;
1933     }
1934     int i = ilo-1;
1935     if (hseek(fp->fp, fp->idx->offs[i].caddr, SEEK_SET) < 0)
1936     {
1937         fp->errcode |= BGZF_ERR_IO;
1938         return -1;
1939     }
1940     fp->block_length = 0;  // indicates current block has not been loaded
1941     fp->block_address = fp->idx->offs[i].caddr;
1942     fp->block_offset = 0;
1943     if ( bgzf_read_block(fp) < 0 ) {
1944         fp->errcode |= BGZF_ERR_IO;
1945         return -1;
1946     }
1947     if ( uoffset - fp->idx->offs[i].uaddr > 0 )
1948     {
1949         fp->block_offset = uoffset - fp->idx->offs[i].uaddr;
1950         assert( fp->block_offset <= fp->block_length );     // todo: skipped, unindexed, blocks
1951     }
1952     fp->uncompressed_address = uoffset;
1953     return 0;
1954 }
1955 
bgzf_utell(BGZF * fp)1956 long bgzf_utell(BGZF *fp)
1957 {
1958     return fp->uncompressed_address;    // currently maintained only when reading
1959 }
1960