1 /* The MIT License
2
3 Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5 Copyright (C) 2009, 2013-2017 Genome Research Ltd
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
24 */
25
26 #include <config.h>
27
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <errno.h>
32 #include <unistd.h>
33 #include <assert.h>
34 #include <pthread.h>
35 #include <sys/types.h>
36 #include <inttypes.h>
37
38 #include "htslib/hts.h"
39 #include "htslib/bgzf.h"
40 #include "htslib/hfile.h"
41 #include "htslib/thread_pool.h"
42 #include "cram/pooled_alloc.h"
43
44 #define BGZF_CACHE
45 #define BGZF_MT
46
47 #define BLOCK_HEADER_LENGTH 18
48 #define BLOCK_FOOTER_LENGTH 8
49
50
51 /* BGZF/GZIP header (speciallized from RFC 1952; little endian):
52 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
53 | 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN|
54 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
55 BGZF extension:
56 ^ ^ ^ ^
57 | | | |
58 FLG.EXTRA XLEN B C
59
60 BGZF format is compatible with GZIP. It limits the size of each compressed
61 block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
62 records the size.
63
64 */
65 static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
66
67 #ifdef BGZF_CACHE
68 typedef struct {
69 int size;
70 uint8_t *block;
71 int64_t end_offset;
72 } cache_t;
73 #include "htslib/khash.h"
74 KHASH_MAP_INIT_INT64(cache, cache_t)
75 #endif
76
77 #ifdef BGZF_MT
78
79 typedef struct bgzf_job {
80 BGZF *fp;
81 unsigned char comp_data[BGZF_MAX_BLOCK_SIZE];
82 size_t comp_len;
83 unsigned char uncomp_data[BGZF_MAX_BLOCK_SIZE];
84 size_t uncomp_len;
85 int errcode;
86 int64_t block_address;
87 int hit_eof;
88 } bgzf_job;
89
90 enum mtaux_cmd {
91 NONE = 0,
92 SEEK,
93 HAS_EOF,
94 CLOSE,
95 };
96
97 typedef struct bgzf_mtaux_t {
98 // Memory pool for bgzf_job structs, to avoid many malloc/free
99 pool_alloc_t *job_pool;
100 bgzf_job *curr_job;
101
102 // Thread pool
103 int n_threads;
104 int own_pool;
105 hts_tpool *pool;
106
107 // Output queue holding completed bgzf_jobs
108 hts_tpool_process *out_queue;
109
110 // I/O thread.
111 pthread_t io_task;
112 pthread_mutex_t job_pool_m;
113 int jobs_pending; // number of jobs waiting
114 int flush_pending;
115 void *free_block;
116 int hit_eof; // r/w entirely within main thread
117
118 // Message passing to the reader thread; eg seek requests
119 int errcode;
120 uint64_t block_address;
121 int eof;
122 pthread_mutex_t command_m; // Set whenever fp is being updated
123 pthread_cond_t command_c;
124 enum mtaux_cmd command;
125 } mtaux_t;
126 #endif
127
128 typedef struct
129 {
130 uint64_t uaddr; // offset w.r.t. uncompressed data
131 uint64_t caddr; // offset w.r.t. compressed data
132 }
133 bgzidx1_t;
134
135 struct __bgzidx_t
136 {
137 int noffs, moffs; // the size of the index, n:used, m:allocated
138 bgzidx1_t *offs; // offsets
139 uint64_t ublock_addr; // offset of the current block (uncompressed data)
140 };
141
142 void bgzf_index_destroy(BGZF *fp);
143 int bgzf_index_add_block(BGZF *fp);
144 static void mt_destroy(mtaux_t *mt);
145
packInt16(uint8_t * buffer,uint16_t value)146 static inline void packInt16(uint8_t *buffer, uint16_t value)
147 {
148 buffer[0] = value;
149 buffer[1] = value >> 8;
150 }
151
unpackInt16(const uint8_t * buffer)152 static inline int unpackInt16(const uint8_t *buffer)
153 {
154 return buffer[0] | buffer[1] << 8;
155 }
156
packInt32(uint8_t * buffer,uint32_t value)157 static inline void packInt32(uint8_t *buffer, uint32_t value)
158 {
159 buffer[0] = value;
160 buffer[1] = value >> 8;
161 buffer[2] = value >> 16;
162 buffer[3] = value >> 24;
163 }
164
bgzf_zerr(int errnum,z_stream * zs)165 static const char *bgzf_zerr(int errnum, z_stream *zs)
166 {
167 static char buffer[32];
168
169 /* Return zs->msg if available.
170 zlib doesn't set this very reliably. Looking at the source suggests
171 that it may get set to a useful message for deflateInit2, inflateInit2
172 and inflate when it returns Z_DATA_ERROR. For inflate with other
173 return codes, deflate, deflateEnd and inflateEnd it doesn't appear
174 to be useful. For the likely non-useful cases, the caller should
175 pass NULL into zs. */
176
177 if (zs && zs->msg) return zs->msg;
178
179 // gzerror OF((gzFile file, int *errnum)
180 switch (errnum) {
181 case Z_ERRNO:
182 return strerror(errno);
183 case Z_STREAM_ERROR:
184 return "invalid parameter/compression level, or inconsistent stream state";
185 case Z_DATA_ERROR:
186 return "invalid or incomplete IO";
187 case Z_MEM_ERROR:
188 return "out of memory";
189 case Z_BUF_ERROR:
190 return "progress temporarily not possible, or in() / out() returned an error";
191 case Z_VERSION_ERROR:
192 return "zlib version mismatch";
193 case Z_OK: // 0: maybe gzgets error Z_NULL
194 default:
195 snprintf(buffer, sizeof(buffer), "[%d] unknown", errnum);
196 return buffer; // FIXME: Not thread-safe.
197 }
198 }
199
bgzf_read_init(hFILE * hfpr)200 static BGZF *bgzf_read_init(hFILE *hfpr)
201 {
202 BGZF *fp;
203 uint8_t magic[18];
204 ssize_t n = hpeek(hfpr, magic, 18);
205 if (n < 0) return NULL;
206
207 fp = (BGZF*)calloc(1, sizeof(BGZF));
208 if (fp == NULL) return NULL;
209
210 fp->is_write = 0;
211 fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
212 if (fp->uncompressed_block == NULL) { free(fp); return NULL; }
213 fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
214 fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b);
215 fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
216 #ifdef BGZF_CACHE
217 fp->cache = kh_init(cache);
218 #endif
219 return fp;
220 }
221
222 // get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
mode2level(const char * mode)223 static int mode2level(const char *mode)
224 {
225 int i, compress_level = -1;
226 for (i = 0; mode[i]; ++i)
227 if (mode[i] >= '0' && mode[i] <= '9') break;
228 if (mode[i]) compress_level = (int)mode[i] - '0';
229 if (strchr(mode, 'u')) compress_level = -2;
230 return compress_level;
231 }
bgzf_write_init(const char * mode)232 static BGZF *bgzf_write_init(const char *mode)
233 {
234 BGZF *fp;
235 fp = (BGZF*)calloc(1, sizeof(BGZF));
236 if (fp == NULL) goto mem_fail;
237 fp->is_write = 1;
238 int compress_level = mode2level(mode);
239 if ( compress_level==-2 )
240 {
241 fp->is_compressed = 0;
242 return fp;
243 }
244 fp->is_compressed = 1;
245
246 fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
247 if (fp->uncompressed_block == NULL) goto mem_fail;
248 fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
249
250 fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
251 if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
252 if ( strchr(mode,'g') )
253 {
254 // gzip output
255 fp->is_gzip = 1;
256 fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
257 if (fp->gz_stream == NULL) goto mem_fail;
258 fp->gz_stream->zalloc = NULL;
259 fp->gz_stream->zfree = NULL;
260 fp->gz_stream->msg = NULL;
261
262 int ret = deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY);
263 if (ret!=Z_OK) {
264 if (hts_verbose >= 1) {
265 fprintf(stderr, "[E::%s] deflateInit2 failed: %s\n",
266 __func__, bgzf_zerr(ret, fp->gz_stream));
267 }
268 goto fail;
269 }
270 }
271 return fp;
272
273 mem_fail:
274 if (hts_verbose >= 1) {
275 fprintf(stderr, "[E::%s] %s\n", __func__, strerror(errno));
276 }
277 fail:
278 if (fp != NULL) {
279 free(fp->uncompressed_block);
280 free(fp->gz_stream);
281 free(fp);
282 }
283 return NULL;
284 }
285
bgzf_open(const char * path,const char * mode)286 BGZF *bgzf_open(const char *path, const char *mode)
287 {
288 BGZF *fp = 0;
289 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
290 if (strchr(mode, 'r')) {
291 hFILE *fpr;
292 if ((fpr = hopen(path, mode)) == 0) return 0;
293 fp = bgzf_read_init(fpr);
294 if (fp == 0) { hclose_abruptly(fpr); return NULL; }
295 fp->fp = fpr;
296 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
297 hFILE *fpw;
298 if ((fpw = hopen(path, mode)) == 0) return 0;
299 fp = bgzf_write_init(mode);
300 if (fp == NULL) return NULL;
301 fp->fp = fpw;
302 }
303 else { errno = EINVAL; return 0; }
304
305 fp->is_be = ed_is_big();
306 return fp;
307 }
308
bgzf_dopen(int fd,const char * mode)309 BGZF *bgzf_dopen(int fd, const char *mode)
310 {
311 BGZF *fp = 0;
312 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
313 if (strchr(mode, 'r')) {
314 hFILE *fpr;
315 if ((fpr = hdopen(fd, mode)) == 0) return 0;
316 fp = bgzf_read_init(fpr);
317 if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
318 fp->fp = fpr;
319 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
320 hFILE *fpw;
321 if ((fpw = hdopen(fd, mode)) == 0) return 0;
322 fp = bgzf_write_init(mode);
323 if (fp == NULL) return NULL;
324 fp->fp = fpw;
325 }
326 else { errno = EINVAL; return 0; }
327
328 fp->is_be = ed_is_big();
329 return fp;
330 }
331
bgzf_hopen(hFILE * hfp,const char * mode)332 BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
333 {
334 BGZF *fp = NULL;
335 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
336 if (strchr(mode, 'r')) {
337 fp = bgzf_read_init(hfp);
338 if (fp == NULL) return NULL;
339 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
340 fp = bgzf_write_init(mode);
341 if (fp == NULL) return NULL;
342 }
343 else { errno = EINVAL; return 0; }
344
345 fp->fp = hfp;
346 fp->is_be = ed_is_big();
347 return fp;
348 }
349
bgzf_compress(void * _dst,size_t * dlen,const void * src,size_t slen,int level)350 int bgzf_compress(void *_dst, size_t *dlen, const void *src, size_t slen, int level)
351 {
352 uint32_t crc;
353 z_stream zs;
354 uint8_t *dst = (uint8_t*)_dst;
355
356 // compress the body
357 zs.zalloc = NULL; zs.zfree = NULL;
358 zs.msg = NULL;
359 zs.next_in = (Bytef*)src;
360 zs.avail_in = slen;
361 zs.next_out = dst + BLOCK_HEADER_LENGTH;
362 zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
363 int ret = deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); // -15 to disable zlib header/footer
364 if (ret!=Z_OK) {
365 if (hts_verbose >= 1) {
366 fprintf(stderr, "[E::%s] deflateInit2 failed: %s\n",
367 __func__, bgzf_zerr(ret, &zs));
368 }
369 return -1;
370 }
371 if ((ret = deflate(&zs, Z_FINISH)) != Z_STREAM_END) {
372 if (hts_verbose >= 1) {
373 fprintf(stderr, "[E::%s] deflate failed: %s\n",
374 __func__, bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
375 }
376 return -1;
377 }
378 if ((ret = deflateEnd(&zs)) != Z_OK) {
379 if (hts_verbose >= 1) {
380 fprintf(stderr, "[E::%s] deflateEnd failed: %s\n",
381 __func__, bgzf_zerr(ret, NULL));
382 }
383 return -1;
384 }
385 *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
386 // write the header
387 memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
388 packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
389 // write the footer
390 crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
391 packInt32((uint8_t*)&dst[*dlen - 8], crc);
392 packInt32((uint8_t*)&dst[*dlen - 4], slen);
393 return 0;
394 }
395
bgzf_gzip_compress(BGZF * fp,void * _dst,size_t * dlen,const void * src,size_t slen,int level)396 static int bgzf_gzip_compress(BGZF *fp, void *_dst, size_t *dlen, const void *src, size_t slen, int level)
397 {
398 uint8_t *dst = (uint8_t*)_dst;
399 z_stream *zs = fp->gz_stream;
400 int flush = slen ? Z_PARTIAL_FLUSH : Z_FINISH;
401 zs->next_in = (Bytef*)src;
402 zs->avail_in = slen;
403 zs->next_out = dst;
404 zs->avail_out = *dlen;
405 int ret = deflate(zs, flush);
406 if (ret == Z_STREAM_ERROR) {
407 if (hts_verbose >= 1) {
408 fprintf(stderr, "[E::%s] deflate failed: %s\n",
409 __func__, bgzf_zerr(ret, NULL));
410 }
411 return -1;
412 }
413 if (zs->avail_in != 0) {
414 if (hts_verbose >= 1) {
415 fprintf(stderr, "[E::%s] deflate block too large for output buffer:\n",
416 __func__);
417 }
418 return -1;
419 }
420 *dlen = *dlen - zs->avail_out;
421 return 0;
422 }
423
424 // Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
deflate_block(BGZF * fp,int block_length)425 static int deflate_block(BGZF *fp, int block_length)
426 {
427 size_t comp_size = BGZF_MAX_BLOCK_SIZE;
428 int ret;
429 if ( !fp->is_gzip )
430 ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
431 else
432 ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
433
434 if ( ret != 0 )
435 {
436 if (hts_verbose >= 3) {
437 fprintf(stderr, "[E::%s] compression error %d\n", __func__, ret);
438 }
439 fp->errcode |= BGZF_ERR_ZLIB;
440 return -1;
441 }
442 fp->block_offset = 0;
443 return comp_size;
444 }
445
bgzf_uncompress(uint8_t * dst,size_t * dlen,const uint8_t * src,size_t slen)446 static int bgzf_uncompress(uint8_t *dst, size_t *dlen, const uint8_t *src, size_t slen) {
447 z_stream zs;
448 zs.zalloc = NULL;
449 zs.zfree = NULL;
450 zs.msg = NULL;
451 zs.next_in = (Bytef*)src;
452 zs.avail_in = slen;
453 zs.next_out = (Bytef*)dst;
454 zs.avail_out = *dlen;
455
456 int ret = inflateInit2(&zs, -15);
457 if (ret != Z_OK) {
458 if (hts_verbose >= 1) {
459 fprintf(stderr, "[E::%s] inflateInit2 failed: %s\n",
460 __func__, bgzf_zerr(ret, &zs));
461 }
462 return -1;
463 }
464 if ((ret = inflate(&zs, Z_FINISH)) != Z_STREAM_END) {
465 if (hts_verbose >= 1) {
466 fprintf(stderr, "[E::%s] inflate failed: %s\n",
467 __func__, bgzf_zerr(ret, ret == Z_DATA_ERROR ? &zs : NULL));
468 }
469 if ((ret = inflateEnd(&zs)) != Z_OK) {
470 if (hts_verbose >= 2) {
471 fprintf(stderr, "[E::%s] inflateEnd failed: %s\n",
472 __func__, bgzf_zerr(ret, NULL));
473 }
474 }
475 return -1;
476 }
477 if ((ret = inflateEnd(&zs)) != Z_OK) {
478 if (hts_verbose >= 1) {
479 fprintf(stderr, "[E::%s] inflateEnd failed: %s\n",
480 __func__, bgzf_zerr(ret, NULL));
481 }
482 return -1;
483 }
484 *dlen = *dlen - zs.avail_out;
485 return 0;
486 }
487
488 // Inflate the block in fp->compressed_block into fp->uncompressed_block
inflate_block(BGZF * fp,int block_length)489 static int inflate_block(BGZF* fp, int block_length)
490 {
491 size_t dlen = BGZF_MAX_BLOCK_SIZE;
492 int ret = bgzf_uncompress(fp->uncompressed_block, &dlen,
493 (Bytef*)fp->compressed_block + 18, block_length - 18);
494 if (ret < 0) {
495 fp->errcode |= BGZF_ERR_ZLIB;
496 return -1;
497 }
498
499 return dlen;
500 }
501
inflate_gzip_block(BGZF * fp,int cached)502 static int inflate_gzip_block(BGZF *fp, int cached)
503 {
504 int ret = Z_OK;
505 do
506 {
507 if ( !cached && fp->gz_stream->avail_out!=0 )
508 {
509 fp->gz_stream->avail_in = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
510 if ( fp->gz_stream->avail_in<=0 ) return fp->gz_stream->avail_in;
511 if ( fp->gz_stream->avail_in==0 ) break;
512 fp->gz_stream->next_in = fp->compressed_block;
513 }
514 else cached = 0;
515 do
516 {
517 fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
518 fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
519 fp->gz_stream->msg = NULL;
520 ret = inflate(fp->gz_stream, Z_NO_FLUSH);
521 if (ret < 0 && ret != Z_BUF_ERROR) {
522 if (hts_verbose >= 1) {
523 fprintf(stderr, "[E::%s] inflate failed: %s\n",
524 __func__,
525 bgzf_zerr(ret, ret == Z_DATA_ERROR ? fp->gz_stream : NULL));
526 }
527 fp->errcode |= BGZF_ERR_ZLIB;
528 return -1;
529 }
530 unsigned int have = BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
531 if ( have ) return have;
532 }
533 while ( fp->gz_stream->avail_out == 0 );
534 }
535 while (ret != Z_STREAM_END);
536 return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
537 }
538
539 // Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
check_header(const uint8_t * header)540 static int check_header(const uint8_t *header)
541 {
542 if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
543 return ((header[3] & 4) != 0
544 && unpackInt16((uint8_t*)&header[10]) == 6
545 && header[12] == 'B' && header[13] == 'C'
546 && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
547 }
548
549 #ifdef BGZF_CACHE
free_cache(BGZF * fp)550 static void free_cache(BGZF *fp)
551 {
552 khint_t k;
553 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
554 if (fp->is_write) return;
555 for (k = kh_begin(h); k < kh_end(h); ++k)
556 if (kh_exist(h, k)) free(kh_val(h, k).block);
557 kh_destroy(cache, h);
558 }
559
load_block_from_cache(BGZF * fp,int64_t block_address)560 static int load_block_from_cache(BGZF *fp, int64_t block_address)
561 {
562 khint_t k;
563 cache_t *p;
564
565 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
566 k = kh_get(cache, h, block_address);
567 if (k == kh_end(h)) return 0;
568 p = &kh_val(h, k);
569 if (fp->block_length != 0) fp->block_offset = 0;
570 fp->block_address = block_address;
571 fp->block_length = p->size;
572 // FIXME: why BGZF_MAX_BLOCK_SIZE and not p->size?
573 memcpy(fp->uncompressed_block, p->block, BGZF_MAX_BLOCK_SIZE);
574 if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
575 {
576 // todo: move the error up
577 fprintf(stderr,"Could not hseek to %"PRId64"\n", p->end_offset);
578 exit(1);
579 }
580 return p->size;
581 }
582
cache_block(BGZF * fp,int size)583 static void cache_block(BGZF *fp, int size)
584 {
585 int ret;
586 khint_t k;
587 cache_t *p;
588 //fprintf(stderr, "Cache block at %llx\n", (int)fp->block_address);
589 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
590 if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
591 if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
592 /* A better way would be to remove the oldest block in the
593 * cache, but here we remove a random one for simplicity. This
594 * should not have a big impact on performance. */
595 for (k = kh_begin(h); k < kh_end(h); ++k)
596 if (kh_exist(h, k)) break;
597 if (k < kh_end(h)) {
598 free(kh_val(h, k).block);
599 kh_del(cache, h, k);
600 }
601 }
602 k = kh_put(cache, h, fp->block_address, &ret);
603 if (ret == 0) return; // if this happens, a bug!
604 p = &kh_val(h, k);
605 p->size = fp->block_length;
606 p->end_offset = fp->block_address + size;
607 p->block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
608 memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
609 }
610 #else
free_cache(BGZF * fp)611 static void free_cache(BGZF *fp) {}
load_block_from_cache(BGZF * fp,int64_t block_address)612 static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
cache_block(BGZF * fp,int size)613 static void cache_block(BGZF *fp, int size) {}
614 #endif
615
616 /*
617 * Absolute htell in this compressed file.
618 *
619 * Do not confuse with the external bgzf_tell macro which returns the virtual
620 * offset.
621 */
bgzf_htell(BGZF * fp)622 static off_t bgzf_htell(BGZF *fp) {
623 if (fp->mt) {
624 pthread_mutex_lock(&fp->mt->job_pool_m);
625 off_t pos = fp->block_address + fp->block_clength;
626 pthread_mutex_unlock(&fp->mt->job_pool_m);
627 return pos;
628 } else {
629 return htell(fp->fp);
630 }
631 }
632
bgzf_read_block(BGZF * fp)633 int bgzf_read_block(BGZF *fp)
634 {
635 hts_tpool_result *r;
636
637 if (fp->mt) {
638 again:
639 if (fp->mt->hit_eof) {
640 // Further reading at EOF will always return 0
641 fp->block_length = 0;
642 return 0;
643 }
644 r = hts_tpool_next_result_wait(fp->mt->out_queue);
645 bgzf_job *j = r ? (bgzf_job *)hts_tpool_result_data(r) : NULL;
646
647 if (!j || j->errcode == BGZF_ERR_MT) {
648 if (!fp->mt->free_block) {
649 fp->uncompressed_block = malloc(2 * BGZF_MAX_BLOCK_SIZE);
650 if (fp->uncompressed_block == NULL) return -1;
651 fp->compressed_block = (char *)fp->uncompressed_block + BGZF_MAX_BLOCK_SIZE;
652 } // else it's already allocated with malloc, maybe even in-use.
653 mt_destroy(fp->mt);
654 fp->mt = NULL;
655 hts_tpool_delete_result(r, 0);
656
657 goto single_threaded;
658 }
659
660 if (j->errcode) {
661 fp->errcode = j->errcode;
662 return -1;
663 }
664
665 if (j->hit_eof) {
666 if (!fp->last_block_eof && !fp->no_eof_block) {
667 fp->no_eof_block = 1;
668 if (hts_verbose>1)
669 fprintf(stderr, "[W::%s] EOF marker is absent. The input is probably truncated.\n", __func__);
670 }
671 fp->mt->hit_eof = 1;
672 }
673
674 // Zero length blocks in the middle of a file are (wrongly)
675 // considered as EOF by many callers. We work around this by
676 // trying again to see if we hit a genuine EOF.
677 if (!j->hit_eof && j->uncomp_len == 0) {
678 fp->last_block_eof = 1;
679 hts_tpool_delete_result(r, 0);
680 goto again;
681 }
682
683 // block_length=0 and block_offset set by bgzf_seek.
684 if (fp->block_length != 0) fp->block_offset = 0;
685 fp->block_address = j->block_address;
686 fp->block_clength = j->comp_len;
687 fp->block_length = j->uncomp_len;
688 // bgzf_read() can change fp->block_length
689 fp->last_block_eof = (fp->block_length == 0);
690
691 if ( j->uncomp_len && j->fp->idx_build_otf )
692 {
693 bgzf_index_add_block(j->fp);
694 j->fp->idx->ublock_addr += j->uncomp_len;
695 }
696
697 // Steal the data block as it's quicker than a memcpy.
698 // We just need to make sure we delay the pool free.
699 if (fp->mt->curr_job) {
700 pthread_mutex_lock(&fp->mt->job_pool_m);
701 pool_free(fp->mt->job_pool, fp->mt->curr_job);
702 pthread_mutex_unlock(&fp->mt->job_pool_m);
703 }
704 fp->uncompressed_block = j->uncomp_data;
705 fp->mt->curr_job = j;
706 if (fp->mt->free_block) {
707 free(fp->mt->free_block); // clear up last non-mt block
708 fp->mt->free_block = NULL;
709 }
710
711 hts_tpool_delete_result(r, 0);
712 return 0;
713 }
714
715 uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
716 int count, size, block_length, remaining;
717
718 single_threaded:
719 size = 0;
720
721 // Reading an uncompressed file
722 if ( !fp->is_compressed )
723 {
724 count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
725 if (count < 0) // Error
726 {
727 fp->errcode |= BGZF_ERR_IO;
728 return -1;
729 }
730 else if (count == 0) // EOF
731 {
732 fp->block_length = 0;
733 return 0;
734 }
735 if (fp->block_length != 0) fp->block_offset = 0;
736 fp->block_address += count;
737 fp->block_length = count;
738 return 0;
739 }
740
741 // Reading compressed file
742 int64_t block_address;
743 block_address = bgzf_htell(fp);
744 if ( fp->is_gzip && fp->gz_stream ) // is this is an initialized gzip stream?
745 {
746 count = inflate_gzip_block(fp, 0);
747 if ( count<0 )
748 {
749 fp->errcode |= BGZF_ERR_ZLIB;
750 return -1;
751 }
752 fp->block_length = count;
753 fp->block_address = block_address;
754 return 0;
755 }
756 if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
757
758 // loop to skip empty bgzf blocks
759 while (1)
760 {
761 count = hread(fp->fp, header, sizeof(header));
762 if (count == 0) { // no data read
763 if (!fp->last_block_eof && !fp->no_eof_block && !fp->is_gzip) {
764 fp->no_eof_block = 1;
765 if (hts_verbose > 1) {
766 fprintf(stderr, "[W::%s] EOF marker is absent. The input is probably truncated.\n", __func__);
767 }
768 }
769 fp->block_length = 0;
770 return 0;
771 }
772 int ret;
773 if ( count != sizeof(header) || (ret=check_header(header))==-2 )
774 {
775 fp->errcode |= BGZF_ERR_HEADER;
776 return -1;
777 }
778 if ( ret==-1 )
779 {
780 // GZIP, not BGZF
781 uint8_t *cblock = (uint8_t*)fp->compressed_block;
782 memcpy(cblock, header, sizeof(header));
783 count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
784 int nskip = 10;
785
786 // Check optional fields to skip: FLG.FNAME,FLG.FCOMMENT,FLG.FHCRC,FLG.FEXTRA
787 // Note: Some of these fields are untested, I did not have appropriate data available
788 if ( header[3] & 0x4 ) // FLG.FEXTRA
789 {
790 nskip += unpackInt16(&cblock[nskip]) + 2;
791 }
792 if ( header[3] & 0x8 ) // FLG.FNAME
793 {
794 while ( nskip<count && cblock[nskip] ) nskip++;
795 nskip++;
796 }
797 if ( header[3] & 0x10 ) // FLG.FCOMMENT
798 {
799 while ( nskip<count && cblock[nskip] ) nskip++;
800 nskip++;
801 }
802 if ( header[3] & 0x2 ) nskip += 2; // FLG.FHCRC
803
804 /* FIXME: Should handle this better. There's no reason why
805 someone shouldn't include a massively long comment in their
806 gzip stream. */
807 if ( nskip >= count )
808 {
809 fp->errcode |= BGZF_ERR_HEADER;
810 return -1;
811 }
812
813 fp->is_gzip = 1;
814 fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
815 int ret = inflateInit2(fp->gz_stream, -15);
816 if (ret != Z_OK)
817 {
818 if (hts_verbose >= 1) {
819 fprintf(stderr, "[E::%s] inflateInit2 failed: %s",
820 __func__, bgzf_zerr(ret, fp->gz_stream));
821 }
822 fp->errcode |= BGZF_ERR_ZLIB;
823 return -1;
824 }
825 fp->gz_stream->avail_in = count - nskip;
826 fp->gz_stream->next_in = cblock + nskip;
827 count = inflate_gzip_block(fp, 1);
828 if ( count<0 )
829 {
830 fp->errcode |= BGZF_ERR_ZLIB;
831 return -1;
832 }
833 fp->block_length = count;
834 fp->block_address = block_address;
835 if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
836 return 0;
837 }
838 size = count;
839 block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
840 if (block_length < BLOCK_HEADER_LENGTH)
841 {
842 fp->errcode |= BGZF_ERR_HEADER;
843 return -1;
844 }
845 compressed_block = (uint8_t*)fp->compressed_block;
846 memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
847 remaining = block_length - BLOCK_HEADER_LENGTH;
848 count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
849 if (count != remaining) {
850 fp->errcode |= BGZF_ERR_IO;
851 return -1;
852 }
853 size += count;
854 if ((count = inflate_block(fp, block_length)) < 0) {
855 if (hts_verbose >= 2) fprintf(stderr, "[E::%s] inflate_block error %d\n", __func__, count);
856 fp->errcode |= BGZF_ERR_ZLIB;
857 return -1;
858 }
859 fp->last_block_eof = (count == 0);
860 if ( count ) break; // otherwise an empty bgzf block
861 }
862 if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
863 fp->block_address = block_address;
864 fp->block_length = count;
865 if ( fp->idx_build_otf )
866 {
867 bgzf_index_add_block(fp);
868 fp->idx->ublock_addr += count;
869 }
870 cache_block(fp, size);
871 return 0;
872 }
873
bgzf_read(BGZF * fp,void * data,size_t length)874 ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
875 {
876 ssize_t bytes_read = 0;
877 uint8_t *output = (uint8_t*)data;
878 if (length <= 0) return 0;
879 assert(fp->is_write == 0);
880 while (bytes_read < length) {
881 int copy_length, available = fp->block_length - fp->block_offset;
882 uint8_t *buffer;
883 if (available <= 0) {
884 int ret = bgzf_read_block(fp);
885 if (ret != 0) {
886 if (hts_verbose >= 2) {
887 fprintf(stderr, "[E::%s] bgzf_read_block error %d after %zd of %zu bytes\n", __func__, ret, bytes_read, length);
888 }
889 fp->errcode |= BGZF_ERR_ZLIB;
890 return -1;
891 }
892 available = fp->block_length - fp->block_offset;
893 if (available <= 0) break;
894 }
895 copy_length = length - bytes_read < available? length - bytes_read : available;
896 buffer = (uint8_t*)fp->uncompressed_block;
897 memcpy(output, buffer + fp->block_offset, copy_length);
898 fp->block_offset += copy_length;
899 output += copy_length;
900 bytes_read += copy_length;
901
902 // For raw gzip streams this avoids short reads.
903 if (fp->block_offset == fp->block_length) {
904 fp->block_address = bgzf_htell(fp);
905 fp->block_offset = fp->block_length = 0;
906 }
907 }
908
909 fp->uncompressed_address += bytes_read;
910
911 return bytes_read;
912 }
913
bgzf_raw_read(BGZF * fp,void * data,size_t length)914 ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
915 {
916 ssize_t ret = hread(fp->fp, data, length);
917 if (ret < 0) fp->errcode |= BGZF_ERR_IO;
918 return ret;
919 }
920
921 #ifdef BGZF_MT
922
bgzf_encode_func(void * arg)923 void *bgzf_encode_func(void *arg) {
924 bgzf_job *j = (bgzf_job *)arg;
925
926 j->comp_len = BGZF_MAX_BLOCK_SIZE;
927 int ret = bgzf_compress(j->comp_data, &j->comp_len,
928 j->uncomp_data, j->uncomp_len,
929 j->fp->compress_level);
930 if (ret != 0)
931 j->errcode |= BGZF_ERR_ZLIB;
932
933 return arg;
934 }
935
936 // Our input block has already been decoded by bgzf_mt_read_block().
937 // We need to split that into a fetch block (compressed) and make this
938 // do the actual decompression step.
bgzf_decode_func(void * arg)939 void *bgzf_decode_func(void *arg) {
940 bgzf_job *j = (bgzf_job *)arg;
941
942 j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
943 int ret = bgzf_uncompress(j->uncomp_data, &j->uncomp_len,
944 j->comp_data+18, j->comp_len-18);
945 if (ret != 0)
946 j->errcode |= BGZF_ERR_ZLIB;
947
948 return arg;
949 }
950
951 /*
952 * Nul function so we can dispatch a job with the correct serial
953 * to mark failure or to indicate an empty read (EOF).
954 */
bgzf_nul_func(void * arg)955 void *bgzf_nul_func(void *arg) { return arg; }
956
957 /*
958 * Takes compressed blocks off the results queue and calls hwrite to
959 * punt them to the output stream.
960 *
961 * Returns NULL when no more are left, or -1 on error
962 */
bgzf_mt_writer(void * vp)963 static void *bgzf_mt_writer(void *vp) {
964 BGZF *fp = (BGZF *)vp;
965 mtaux_t *mt = fp->mt;
966 hts_tpool_result *r;
967
968 // Iterates until result queue is shutdown, where it returns NULL.
969 while ((r = hts_tpool_next_result_wait(mt->out_queue))) {
970 bgzf_job *j = (bgzf_job *)hts_tpool_result_data(r);
971 assert(j);
972
973 if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len) {
974 fp->errcode |= BGZF_ERR_IO;
975 goto err;
976 }
977
978 /*
979 * Periodically call hflush (which calls fsync when on a file).
980 * This avoids the fsync being done at the bgzf_close stage,
981 * which can sometimes cause signficant delays. As this is in
982 * a separate thread, spreading the sync delays throughout the
983 * program execution seems better.
984 * Frequency of 1/512 has been chosen by experimentation
985 * across local XFS, NFS and Lustre tests.
986 */
987 if (++mt->flush_pending % 512 == 0)
988 if (hflush(fp->fp) != 0)
989 goto err;
990
991
992 hts_tpool_delete_result(r, 0);
993
994 // Also updated by main thread
995 pthread_mutex_lock(&mt->job_pool_m);
996 pool_free(mt->job_pool, j);
997 mt->jobs_pending--;
998 pthread_mutex_unlock(&mt->job_pool_m);
999 }
1000
1001 if (hflush(fp->fp) != 0)
1002 goto err;
1003
1004 hts_tpool_process_destroy(mt->out_queue);
1005
1006 return NULL;
1007
1008 err:
1009 hts_tpool_process_destroy(mt->out_queue);
1010 return (void *)-1;
1011 }
1012
1013
1014 /*
1015 * Reads a compressed block of data using hread and dispatches it to
1016 * the thread pool for decompression. This is the analogue of the old
1017 * non-threaded bgzf_read_block() function, but without modifying fp
1018 * in any way (except for the read offset). All output goes via the
1019 * supplied bgzf_job struct.
1020 *
1021 * Returns NULL when no more are left, or -1 on error
1022 */
bgzf_mt_read_block(BGZF * fp,bgzf_job * j)1023 int bgzf_mt_read_block(BGZF *fp, bgzf_job *j)
1024 {
1025 uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
1026 int count, size = 0, block_length, remaining;
1027
1028 // NOTE: Guaranteed to be compressed as we block multi-threading in
1029 // uncompressed mode. However it may be gzip compression instead
1030 // of bgzf.
1031
1032 // Reading compressed file
1033 int64_t block_address;
1034 block_address = htell(fp->fp);
1035
1036 if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
1037 count = hpeek(fp->fp, header, sizeof(header));
1038 if (count == 0) // no data read
1039 return -1;
1040 int ret;
1041 if ( count != sizeof(header) || (ret=check_header(header))==-2 )
1042 {
1043 j->errcode |= BGZF_ERR_HEADER;
1044 return -1;
1045 }
1046 if (ret == -1) {
1047 j->errcode |= BGZF_ERR_MT;
1048 return -1;
1049 }
1050
1051 count = hread(fp->fp, header, sizeof(header));
1052 if (count != sizeof(header)) // no data read
1053 return -1;
1054
1055 size = count;
1056 block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
1057 if (block_length < BLOCK_HEADER_LENGTH) {
1058 j->errcode |= BGZF_ERR_HEADER;
1059 return -1;
1060 }
1061 compressed_block = (uint8_t*)j->comp_data;
1062 memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
1063 remaining = block_length - BLOCK_HEADER_LENGTH;
1064 count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
1065 if (count != remaining) {
1066 j->errcode |= BGZF_ERR_IO;
1067 return -1;
1068 }
1069 size += count;
1070 j->comp_len = block_length;
1071 j->uncomp_len = BGZF_MAX_BLOCK_SIZE;
1072 j->block_address = block_address;
1073 j->fp = fp;
1074 j->errcode = 0;
1075
1076 return 0;
1077 }
1078
1079
bgzf_check_EOF_common(BGZF * fp)1080 static int bgzf_check_EOF_common(BGZF *fp)
1081 {
1082 uint8_t buf[28];
1083 off_t offset = htell(fp->fp);
1084 if (hseek(fp->fp, -28, SEEK_END) < 0) {
1085 if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
1086 else return -1;
1087 }
1088 if ( hread(fp->fp, buf, 28) != 28 ) return -1;
1089 if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
1090 return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
1091 }
1092
1093 /*
1094 * Checks EOF from the reader thread.
1095 */
bgzf_mt_eof(BGZF * fp)1096 static void bgzf_mt_eof(BGZF *fp) {
1097 mtaux_t *mt = fp->mt;
1098
1099 pthread_mutex_lock(&mt->job_pool_m);
1100 mt->eof = bgzf_check_EOF_common(fp);
1101 pthread_mutex_unlock(&mt->job_pool_m);
1102 pthread_cond_signal(&mt->command_c);
1103 }
1104
1105
1106 /*
1107 * Performs the seek (called by reader thread).
1108 *
1109 * This simply drains the entire queue, throwing away blocks, seeks,
1110 * and starts it up again. Brute force, but maybe sufficient.
1111 */
bgzf_mt_seek(BGZF * fp)1112 static void bgzf_mt_seek(BGZF *fp) {
1113 mtaux_t *mt = fp->mt;
1114
1115 hts_tpool_process_reset(mt->out_queue, 0);
1116 pthread_mutex_lock(&mt->job_pool_m);
1117 mt->command = NONE;
1118 mt->errcode = 0;
1119
1120 if (hseek(fp->fp, mt->block_address, SEEK_SET) < 0)
1121 mt->errcode = BGZF_ERR_IO;
1122
1123 pthread_mutex_unlock(&mt->job_pool_m);
1124 pthread_cond_signal(&mt->command_c);
1125 }
1126
bgzf_mt_reader(void * vp)1127 static void *bgzf_mt_reader(void *vp) {
1128 BGZF *fp = (BGZF *)vp;
1129 mtaux_t *mt = fp->mt;
1130
1131 restart:
1132 pthread_mutex_lock(&mt->job_pool_m);
1133 bgzf_job *j = pool_alloc(mt->job_pool);
1134 pthread_mutex_unlock(&mt->job_pool_m);
1135 j->errcode = 0;
1136 j->uncomp_len = 0;
1137 j->hit_eof = 0;
1138
1139 while (bgzf_mt_read_block(fp, j) == 0) {
1140 // Dispatch
1141 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_decode_func, j);
1142
1143 // Check for command
1144 pthread_mutex_lock(&mt->command_m);
1145 switch (mt->command) {
1146 case SEEK:
1147 bgzf_mt_seek(fp);
1148 pthread_mutex_unlock(&mt->command_m);
1149 goto restart;
1150
1151 case HAS_EOF:
1152 bgzf_mt_eof(fp);
1153 break;
1154
1155 case CLOSE:
1156 pthread_cond_signal(&mt->command_c);
1157 pthread_mutex_unlock(&mt->command_m);
1158 hts_tpool_process_destroy(mt->out_queue);
1159 pthread_exit(NULL);
1160
1161 default:
1162 break;
1163 }
1164 pthread_mutex_unlock(&mt->command_m);
1165
1166 // Allocate buffer for next block
1167 pthread_mutex_lock(&mt->job_pool_m);
1168 j = pool_alloc(mt->job_pool);
1169 pthread_mutex_unlock(&mt->job_pool_m);
1170 j->errcode = 0;
1171 j->uncomp_len = 0;
1172 j->hit_eof = 0;
1173 }
1174
1175 if (j->errcode == BGZF_ERR_MT) {
1176 // Attempt to multi-thread decode a raw gzip stream cannot be done.
1177 // We tear down the multi-threaded decoder and revert to the old code.
1178 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_nul_func, j);
1179 hts_tpool_process_ref_decr(mt->out_queue);
1180 pthread_exit(&j->errcode);
1181 }
1182
1183 // Dispatch an empty block so EOF is spotted.
1184 // We also use this mechanism for returning errors, in which case
1185 // j->errcode is set already.
1186
1187 j->hit_eof = 1;
1188 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_nul_func, j);
1189 if (j->errcode != 0) {
1190 hts_tpool_process_destroy(mt->out_queue);
1191 pthread_exit(&j->errcode);
1192 }
1193
1194 // We hit EOF so can stop reading, but we may get a subsequent
1195 // seek request. In this case we need to restart the reader.
1196 //
1197 // To handle this we wait on a condition variable and then
1198 // monitor the command. (This could be either seek or close.)
1199 for (;;) {
1200 pthread_mutex_lock(&mt->command_m);
1201 if (mt->command == NONE)
1202 pthread_cond_wait(&mt->command_c, &mt->command_m);
1203 switch(mt->command) {
1204 default:
1205 pthread_mutex_unlock(&mt->command_m);
1206 break;
1207
1208 case SEEK:
1209 bgzf_mt_seek(fp);
1210 pthread_mutex_unlock(&mt->command_m);
1211 goto restart;
1212
1213 case HAS_EOF:
1214 bgzf_mt_eof(fp);
1215 pthread_mutex_unlock(&mt->command_m);
1216 continue;
1217
1218 case CLOSE:
1219 pthread_cond_signal(&mt->command_c);
1220 pthread_mutex_unlock(&mt->command_m);
1221 hts_tpool_process_destroy(mt->out_queue);
1222 pthread_exit(NULL);
1223 }
1224 }
1225 }
1226
bgzf_thread_pool(BGZF * fp,hts_tpool * pool,int qsize)1227 int bgzf_thread_pool(BGZF *fp, hts_tpool *pool, int qsize) {
1228 // No gain from multi-threading when not compressed
1229 if (!fp->is_compressed)
1230 return 0;
1231
1232 mtaux_t *mt;
1233 mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
1234 if (!mt) return -1;
1235 fp->mt = mt;
1236
1237 mt->pool = pool;
1238 mt->n_threads = hts_tpool_size(pool);
1239 if (!qsize)
1240 qsize = mt->n_threads*2;
1241 if (!(mt->out_queue = hts_tpool_process_init(mt->pool, qsize, 0))) {
1242 free(mt);
1243 return -1;
1244 }
1245 hts_tpool_process_ref_incr(mt->out_queue);
1246
1247 mt->job_pool = pool_create(sizeof(bgzf_job));
1248
1249 pthread_mutex_init(&mt->job_pool_m, NULL);
1250 pthread_mutex_init(&mt->command_m, NULL);
1251 pthread_cond_init(&mt->command_c, NULL);
1252 mt->flush_pending = 0;
1253 mt->jobs_pending = 0;
1254 mt->free_block = fp->uncompressed_block; // currently in-use block
1255 pthread_create(&mt->io_task, NULL,
1256 fp->is_write ? bgzf_mt_writer : bgzf_mt_reader, fp);
1257
1258 return 0;
1259 }
1260
bgzf_mt(BGZF * fp,int n_threads,int n_sub_blks)1261 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1262 {
1263 // No gain from multi-threading when not compressed
1264 if (!fp->is_compressed || fp->is_gzip)
1265 return 0;
1266
1267 if (n_threads < 1) return -1;
1268 hts_tpool *p = hts_tpool_init(n_threads);
1269 if (!p)
1270 return -1;
1271
1272 if (bgzf_thread_pool(fp, p, 0) != 0) {
1273 hts_tpool_destroy(p);
1274 return -1;
1275 }
1276
1277 fp->mt->own_pool = 1;
1278
1279 return 0;
1280 }
1281
mt_destroy(mtaux_t * mt)1282 static void mt_destroy(mtaux_t *mt)
1283 {
1284 pthread_mutex_lock(&mt->command_m);
1285 mt->command = CLOSE;
1286 pthread_cond_signal(&mt->command_c);
1287 hts_tpool_wake_dispatch(mt->out_queue); // unstick the reader
1288 pthread_mutex_unlock(&mt->command_m);
1289
1290 // Destroying the queue first forces the writer to exit.
1291 hts_tpool_process_destroy(mt->out_queue);
1292 pthread_join(mt->io_task, NULL);
1293
1294 pthread_mutex_destroy(&mt->job_pool_m);
1295 pthread_mutex_destroy(&mt->command_m);
1296 pthread_cond_destroy(&mt->command_c);
1297 if (mt->curr_job)
1298 pool_free(mt->job_pool, mt->curr_job);
1299
1300 if (mt->own_pool)
1301 hts_tpool_destroy(mt->pool);
1302
1303 pool_destroy(mt->job_pool);
1304
1305 free(mt);
1306 fflush(stderr);
1307 }
1308
mt_queue(BGZF * fp)1309 static int mt_queue(BGZF *fp)
1310 {
1311 mtaux_t *mt = fp->mt;
1312
1313 // Also updated by writer thread
1314 pthread_mutex_lock(&mt->job_pool_m);
1315 bgzf_job *j = pool_alloc(mt->job_pool);
1316 mt->jobs_pending++;
1317 pthread_mutex_unlock(&mt->job_pool_m);
1318
1319 j->fp = fp;
1320 j->errcode = 0;
1321 j->uncomp_len = fp->block_offset;
1322 memcpy(j->uncomp_data, fp->uncompressed_block, j->uncomp_len);
1323
1324 // Need non-block vers & job_pending?
1325 hts_tpool_dispatch(mt->pool, mt->out_queue, bgzf_encode_func, j);
1326
1327 fp->block_offset = 0;
1328 return 0;
1329 }
1330
mt_flush_queue(BGZF * fp)1331 static int mt_flush_queue(BGZF *fp)
1332 {
1333 mtaux_t *mt = fp->mt;
1334
1335 // Drain the encoder jobs.
1336 // We cannot use hts_tpool_flush here as it can cause deadlock if
1337 // the queue is full up of decoder tasks. The best solution would
1338 // be to have one input queue per type of job, but we don't right now.
1339 //hts_tpool_flush(mt->pool);
1340 pthread_mutex_lock(&mt->job_pool_m);
1341 while (mt->jobs_pending != 0) {
1342 pthread_mutex_unlock(&mt->job_pool_m);
1343 usleep(10000); // FIXME: replace by condition variable
1344 pthread_mutex_lock(&mt->job_pool_m);
1345 }
1346 pthread_mutex_unlock(&mt->job_pool_m);
1347
1348 // Wait on bgzf_mt_writer to drain the queue
1349 if (hts_tpool_process_flush(mt->out_queue) != 0)
1350 return -1;
1351
1352 return (fp->errcode == 0)? 0 : -1;
1353 }
1354
lazy_flush(BGZF * fp)1355 static int lazy_flush(BGZF *fp)
1356 {
1357 if (fp->mt)
1358 return fp->block_offset ? mt_queue(fp) : 0;
1359 else
1360 return bgzf_flush(fp);
1361 }
1362
1363 #else // ~ #ifdef BGZF_MT
1364
bgzf_mt(BGZF * fp,int n_threads,int n_sub_blks)1365 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
1366 {
1367 return 0;
1368 }
1369
lazy_flush(BGZF * fp)1370 static inline int lazy_flush(BGZF *fp)
1371 {
1372 return bgzf_flush(fp);
1373 }
1374
1375 #endif // ~ #ifdef BGZF_MT
1376
bgzf_flush(BGZF * fp)1377 int bgzf_flush(BGZF *fp)
1378 {
1379 if (!fp->is_write) return 0;
1380 #ifdef BGZF_MT
1381 if (fp->mt) {
1382 int ret = 0;
1383 if (fp->block_offset) ret = mt_queue(fp);
1384 return ret ? ret : mt_flush_queue(fp);
1385 }
1386 #endif
1387 while (fp->block_offset > 0) {
1388 int block_length;
1389 if ( fp->idx_build_otf )
1390 {
1391 bgzf_index_add_block(fp);
1392 fp->idx->ublock_addr += fp->block_offset;
1393 }
1394 block_length = deflate_block(fp, fp->block_offset);
1395 if (block_length < 0) {
1396 if (hts_verbose >= 3) fprintf(stderr, "[E::%s] deflate_block error %d\n", __func__, block_length);
1397 return -1;
1398 }
1399 if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
1400 if (hts_verbose >= 1) fprintf(stderr, "[E::%s] hwrite error (wrong size)\n", __func__);
1401 fp->errcode |= BGZF_ERR_IO; // possibly truncated file
1402 return -1;
1403 }
1404 fp->block_address += block_length;
1405 }
1406 return 0;
1407 }
1408
bgzf_flush_try(BGZF * fp,ssize_t size)1409 int bgzf_flush_try(BGZF *fp, ssize_t size)
1410 {
1411 if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
1412 return 0;
1413 }
1414
bgzf_write(BGZF * fp,const void * data,size_t length)1415 ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
1416 {
1417 if ( !fp->is_compressed )
1418 return hwrite(fp->fp, data, length);
1419
1420 const uint8_t *input = (const uint8_t*)data;
1421 ssize_t remaining = length;
1422 assert(fp->is_write);
1423 while (remaining > 0) {
1424 uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
1425 int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
1426 if (copy_length > remaining) copy_length = remaining;
1427 memcpy(buffer + fp->block_offset, input, copy_length);
1428 fp->block_offset += copy_length;
1429 input += copy_length;
1430 remaining -= copy_length;
1431 if (fp->block_offset == BGZF_BLOCK_SIZE) {
1432 if (lazy_flush(fp) != 0) return -1;
1433 }
1434 }
1435 return length - remaining;
1436 }
1437
bgzf_block_write(BGZF * fp,const void * data,size_t length)1438 ssize_t bgzf_block_write(BGZF *fp, const void *data, size_t length)
1439 {
1440 if ( !fp->is_compressed )
1441 return hwrite(fp->fp, data, length);
1442 const uint8_t *input = (const uint8_t*)data;
1443 ssize_t remaining = length;
1444 assert(fp->is_write);
1445 uint64_t current_block; //keep track of current block
1446 uint64_t ublock_size; // amount of uncompressed data to be fed into next block
1447 while (remaining > 0) {
1448 current_block = fp->idx->moffs - fp->idx->noffs;
1449 ublock_size = fp->idx->offs[current_block+1].uaddr-fp->idx->offs[current_block].uaddr;
1450 uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
1451 int copy_length = ublock_size - fp->block_offset;
1452 if (copy_length > remaining) copy_length = remaining;
1453 memcpy(buffer + fp->block_offset, input, copy_length);
1454 fp->block_offset += copy_length;
1455 input += copy_length;
1456 remaining -= copy_length;
1457 if (fp->block_offset == ublock_size) {
1458 if (lazy_flush(fp) != 0) return -1;
1459 fp->idx->noffs--; // decrement noffs to track the blocks
1460 }
1461 }
1462 return length - remaining;
1463 }
1464
1465
bgzf_raw_write(BGZF * fp,const void * data,size_t length)1466 ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
1467 {
1468 ssize_t ret = hwrite(fp->fp, data, length);
1469 if (ret < 0) fp->errcode |= BGZF_ERR_IO;
1470 return ret;
1471 }
1472
bgzf_close(BGZF * fp)1473 int bgzf_close(BGZF* fp)
1474 {
1475 int ret, block_length;
1476 if (fp == 0) return -1;
1477 if (fp->is_write && fp->is_compressed) {
1478 if (bgzf_flush(fp) != 0) return -1;
1479 fp->compress_level = -1;
1480 block_length = deflate_block(fp, 0); // write an empty block
1481 if (block_length < 0) {
1482 if (hts_verbose >= 3) fprintf(stderr, "[E::%s] deflate_block error %d\n", __func__, block_length);
1483 return -1;
1484 }
1485 if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
1486 || hflush(fp->fp) != 0) {
1487 if (hts_verbose >= 1) fprintf(stderr, "[E::%s] file write error\n", __func__);
1488 fp->errcode |= BGZF_ERR_IO;
1489 return -1;
1490 }
1491 }
1492 #ifdef BGZF_MT
1493 if (fp->mt) {
1494 if (!fp->mt->free_block)
1495 fp->uncompressed_block = NULL;
1496 mt_destroy(fp->mt);
1497 }
1498 #endif
1499 if ( fp->is_gzip )
1500 {
1501 if (fp->gz_stream == NULL) ret = Z_OK;
1502 else if (!fp->is_write) ret = inflateEnd(fp->gz_stream);
1503 else ret = deflateEnd(fp->gz_stream);
1504 if (ret != Z_OK && hts_verbose >= 1)
1505 fprintf(stderr, "[E::%s] inflateEnd/deflateEnd failed: %s\n",
1506 __func__, bgzf_zerr(ret, NULL));
1507 free(fp->gz_stream);
1508 }
1509 ret = hclose(fp->fp);
1510 if (ret != 0) return -1;
1511 bgzf_index_destroy(fp);
1512 free(fp->uncompressed_block);
1513 free_cache(fp);
1514 free(fp);
1515 return 0;
1516 }
1517
bgzf_set_cache_size(BGZF * fp,int cache_size)1518 void bgzf_set_cache_size(BGZF *fp, int cache_size)
1519 {
1520 if (fp) fp->cache_size = cache_size;
1521 }
1522
bgzf_check_EOF(BGZF * fp)1523 int bgzf_check_EOF(BGZF *fp) {
1524 int has_eof;
1525
1526 if (fp->mt) {
1527 pthread_mutex_lock(&fp->mt->command_m);
1528 fp->mt->command = HAS_EOF;
1529 pthread_cond_signal(&fp->mt->command_c);
1530 hts_tpool_wake_dispatch(fp->mt->out_queue);
1531 pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
1532 has_eof = fp->mt->eof;
1533 pthread_mutex_unlock(&fp->mt->command_m);
1534 } else {
1535 has_eof = bgzf_check_EOF_common(fp);
1536 }
1537
1538 fp->no_eof_block = (has_eof == 0);
1539
1540 return has_eof;
1541 }
1542
bgzf_seek(BGZF * fp,int64_t pos,int where)1543 int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
1544 {
1545 int block_offset;
1546 int64_t block_address;
1547
1548 if (fp->is_write || where != SEEK_SET || fp->is_gzip) {
1549 fp->errcode |= BGZF_ERR_MISUSE;
1550 return -1;
1551 }
1552 block_offset = pos & 0xFFFF;
1553 block_address = pos >> 16;
1554
1555 if (fp->mt) {
1556 // The reader runs asynchronous and does loops of:
1557 // Read block
1558 // Check & process command
1559 // Dispatch decode job
1560 //
1561 // Once at EOF it then switches to loops of
1562 // Wait for command
1563 // Process command (possibly switching back to above loop).
1564 //
1565 // To seek we therefore send the reader thread a SEEK command,
1566 // waking it up if blocked in dispatch and signalling if
1567 // waiting for a command. We then wait for the response so we
1568 // know the seek succeeded.
1569 pthread_mutex_lock(&fp->mt->command_m);
1570 fp->mt->hit_eof = 0;
1571 fp->mt->command = SEEK;
1572 fp->mt->block_address = block_address;
1573 pthread_cond_signal(&fp->mt->command_c);
1574 hts_tpool_wake_dispatch(fp->mt->out_queue);
1575 pthread_cond_wait(&fp->mt->command_c, &fp->mt->command_m);
1576
1577 fp->block_length = 0; // indicates current block has not been loaded
1578 fp->block_address = block_address;
1579 fp->block_offset = block_offset;
1580
1581 pthread_mutex_unlock(&fp->mt->command_m);
1582 } else {
1583 if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
1584 fp->errcode |= BGZF_ERR_IO;
1585 return -1;
1586 }
1587 fp->block_length = 0; // indicates current block has not been loaded
1588 fp->block_address = block_address;
1589 fp->block_offset = block_offset;
1590 }
1591
1592 return 0;
1593 }
1594
bgzf_is_bgzf(const char * fn)1595 int bgzf_is_bgzf(const char *fn)
1596 {
1597 uint8_t buf[16];
1598 int n;
1599 hFILE *fp;
1600 if ((fp = hopen(fn, "r")) == 0) return 0;
1601 n = hread(fp, buf, 16);
1602 if (hclose(fp) < 0) return 0;
1603 if (n != 16) return 0;
1604 return check_header(buf) == 0? 1 : 0;
1605 }
1606
bgzf_compression(BGZF * fp)1607 int bgzf_compression(BGZF *fp)
1608 {
1609 return (!fp->is_compressed)? no_compression : (fp->is_gzip)? gzip : bgzf;
1610 }
1611
bgzf_getc(BGZF * fp)1612 int bgzf_getc(BGZF *fp)
1613 {
1614 if (fp->block_offset+1 < fp->block_length) {
1615 fp->uncompressed_address++;
1616 return ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
1617 }
1618
1619 int c;
1620 if (fp->block_offset >= fp->block_length) {
1621 if (bgzf_read_block(fp) != 0) return -2; /* error */
1622 if (fp->block_length == 0) return -1; /* end-of-file */
1623 }
1624 c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
1625 if (fp->block_offset == fp->block_length) {
1626 fp->block_address = bgzf_htell(fp);
1627 fp->block_offset = 0;
1628 fp->block_length = 0;
1629 }
1630 fp->uncompressed_address++;
1631 return c;
1632 }
1633
1634 #ifndef kroundup32
1635 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
1636 #endif
1637
bgzf_getline(BGZF * fp,int delim,kstring_t * str)1638 int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
1639 {
1640 int l, state = 0;
1641 str->l = 0;
1642 do {
1643 if (fp->block_offset >= fp->block_length) {
1644 if (bgzf_read_block(fp) != 0) { state = -2; break; }
1645 if (fp->block_length == 0) { state = -1; break; }
1646 }
1647 unsigned char *buf = fp->uncompressed_block;
1648 for (l = fp->block_offset; l < fp->block_length && buf[l] != delim; ++l);
1649 if (l < fp->block_length) state = 1;
1650 l -= fp->block_offset;
1651 if (str->l + l + 1 >= str->m) {
1652 str->m = str->l + l + 2;
1653 kroundup32(str->m);
1654 str->s = (char*)realloc(str->s, str->m);
1655 }
1656 memcpy(str->s + str->l, buf + fp->block_offset, l);
1657 str->l += l;
1658 fp->block_offset += l + 1;
1659 if (fp->block_offset >= fp->block_length) {
1660 fp->block_address = bgzf_htell(fp);
1661 fp->block_offset = 0;
1662 fp->block_length = 0;
1663 }
1664 } while (state == 0);
1665 if (str->l == 0 && state < 0) return state;
1666 fp->uncompressed_address += str->l + 1;
1667 if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
1668 str->s[str->l] = 0;
1669 return str->l;
1670 }
1671
bgzf_index_destroy(BGZF * fp)1672 void bgzf_index_destroy(BGZF *fp)
1673 {
1674 if ( !fp->idx ) return;
1675 free(fp->idx->offs);
1676 free(fp->idx);
1677 fp->idx = NULL;
1678 fp->idx_build_otf = 0;
1679 }
1680
bgzf_index_build_init(BGZF * fp)1681 int bgzf_index_build_init(BGZF *fp)
1682 {
1683 bgzf_index_destroy(fp);
1684 fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
1685 if ( !fp->idx ) return -1;
1686 fp->idx_build_otf = 1; // build index on the fly
1687 return 0;
1688 }
1689
bgzf_index_add_block(BGZF * fp)1690 int bgzf_index_add_block(BGZF *fp)
1691 {
1692 fp->idx->noffs++;
1693 if ( fp->idx->noffs > fp->idx->moffs )
1694 {
1695 fp->idx->moffs = fp->idx->noffs;
1696 kroundup32(fp->idx->moffs);
1697 fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
1698 if ( !fp->idx->offs ) return -1;
1699 }
1700 fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
1701 fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
1702 return 0;
1703 }
1704
hwrite_uint64(uint64_t x,hFILE * f)1705 static inline int hwrite_uint64(uint64_t x, hFILE *f)
1706 {
1707 if (ed_is_big()) x = ed_swap_8(x);
1708 if (hwrite(f, &x, sizeof(x)) != sizeof(x)) return -1;
1709 return 0;
1710 }
1711
get_name_suffix(const char * bname,const char * suffix)1712 static char * get_name_suffix(const char *bname, const char *suffix)
1713 {
1714 size_t len = strlen(bname) + strlen(suffix) + 1;
1715 char *buff = malloc(len);
1716 if (!buff) return NULL;
1717 snprintf(buff, len, "%s%s", bname, suffix);
1718 return buff;
1719 }
1720
bgzf_index_dump_hfile(BGZF * fp,struct hFILE * idx,const char * name)1721 int bgzf_index_dump_hfile(BGZF *fp, struct hFILE *idx, const char *name)
1722 {
1723 // Note that the index contains one extra record when indexing files opened
1724 // for reading. The terminating record is not present when opened for writing.
1725 // This is not a bug.
1726
1727 int i, save_errno;
1728
1729 if (!fp->idx) {
1730 if (hts_verbose > 1) {
1731 fprintf(stderr, "[E::%s] Called for BGZF handle with no index",
1732 __func__);
1733 errno = EINVAL;
1734 return -1;
1735 }
1736 }
1737
1738 if (bgzf_flush(fp) != 0) return -1;
1739
1740 if (hwrite_uint64(fp->idx->noffs - 1, idx) < 0) goto fail;
1741 for (i=1; i<fp->idx->noffs; i++)
1742 {
1743 if (hwrite_uint64(fp->idx->offs[i].caddr, idx) < 0) goto fail;
1744 if (hwrite_uint64(fp->idx->offs[i].uaddr, idx) < 0) goto fail;
1745 }
1746 return 0;
1747
1748 fail:
1749 save_errno = errno;
1750 if (hts_verbose > 1) {
1751 fprintf(stderr, "[E::%s] Error writing to %s : %s\n",
1752 __func__, name ? name : "index", strerror(errno));
1753 }
1754 errno = save_errno;
1755 return -1;
1756 }
1757
bgzf_index_dump(BGZF * fp,const char * bname,const char * suffix)1758 int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
1759 {
1760 const char *name = bname, *msg = NULL;
1761 char *tmp = NULL;
1762 hFILE *idx = NULL;
1763 int save_errno;
1764
1765 if (!fp->idx) {
1766 if (hts_verbose > 1) {
1767 fprintf(stderr, "[E::%s] Called for BGZF handle with no index",
1768 __func__);
1769 errno = EINVAL;
1770 return -1;
1771 }
1772 }
1773
1774 if ( suffix )
1775 {
1776 tmp = get_name_suffix(bname, suffix);
1777 if ( !tmp ) return -1;
1778 name = tmp;
1779 }
1780
1781 idx = hopen(name, "wb");
1782 if ( !idx ) {
1783 msg = "Error opening";
1784 goto fail;
1785 }
1786
1787 if (bgzf_index_dump_hfile(fp, idx, name) != 0) goto fail;
1788
1789 if (hclose(idx) < 0)
1790 {
1791 idx = NULL;
1792 msg = "Error on closing";
1793 goto fail;
1794 }
1795
1796 free(tmp);
1797 return 0;
1798
1799 fail:
1800 save_errno = errno;
1801 if (hts_verbose > 1 && msg != NULL)
1802 {
1803 fprintf(stderr, "[E::%s] %s %s : %s\n",
1804 __func__, msg, name, strerror(errno));
1805 }
1806 if (idx) hclose_abruptly(idx);
1807 free(tmp);
1808 errno = save_errno;
1809 return -1;
1810 }
1811
hread_uint64(uint64_t * xptr,hFILE * f)1812 static inline int hread_uint64(uint64_t *xptr, hFILE *f)
1813 {
1814 if (hread(f, xptr, sizeof(*xptr)) != sizeof(*xptr)) return -1;
1815 if (ed_is_big()) ed_swap_8p(xptr);
1816 return 0;
1817 }
1818
bgzf_index_load_hfile(BGZF * fp,struct hFILE * idx,const char * name)1819 int bgzf_index_load_hfile(BGZF *fp, struct hFILE *idx, const char *name)
1820 {
1821 int save_errno;
1822 fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
1823 if (fp->idx == NULL) goto fail;
1824 uint64_t x;
1825 if (hread_uint64(&x, idx) < 0) goto fail;
1826
1827 fp->idx->noffs = fp->idx->moffs = x + 1;
1828 fp->idx->offs = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
1829 if (fp->idx->offs == NULL) goto fail;
1830 fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
1831
1832 int i;
1833 for (i=1; i<fp->idx->noffs; i++)
1834 {
1835 if (hread_uint64(&fp->idx->offs[i].caddr, idx) < 0) goto fail;
1836 if (hread_uint64(&fp->idx->offs[i].uaddr, idx) < 0) goto fail;
1837 }
1838
1839 return 0;
1840
1841 fail:
1842 save_errno = errno;
1843 if (hts_verbose > 1)
1844 {
1845 fprintf(stderr, "[E::%s] Error reading %s : %s\n",
1846 __func__, name ? name : "index", strerror(errno));
1847 }
1848 if (fp->idx) {
1849 free(fp->idx->offs);
1850 free(fp->idx);
1851 fp->idx = NULL;
1852 }
1853 errno = save_errno;
1854 return -1;
1855 }
1856
bgzf_index_load(BGZF * fp,const char * bname,const char * suffix)1857 int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
1858 {
1859 int save_errno;
1860 const char *name = bname, *msg = NULL;
1861 char *tmp = NULL;
1862 hFILE *idx = NULL;
1863 if ( suffix )
1864 {
1865 tmp = get_name_suffix(bname, suffix);
1866 if ( !tmp ) return -1;
1867 name = tmp;
1868 }
1869
1870 idx = hopen(name, "rb");
1871 if ( !idx ) {
1872 msg = "Error opening";
1873 goto fail;
1874 }
1875
1876 if (bgzf_index_load_hfile(fp, idx, name) != 0) goto fail;
1877
1878 if (hclose(idx) != 0) {
1879 idx = NULL;
1880 msg = "Error closing";
1881 goto fail;
1882 }
1883
1884 free(tmp);
1885 return 0;
1886
1887 fail:
1888 save_errno = errno;
1889 if (hts_verbose > 1 && msg != NULL) {
1890 fprintf(stderr, "[E::%s] %s %s : %s\n",
1891 __func__, msg, name, strerror(errno));
1892 }
1893 if (idx) hclose_abruptly(idx);
1894 free(tmp);
1895 errno = save_errno;
1896 return -1;
1897 }
1898
bgzf_useek(BGZF * fp,long uoffset,int where)1899 int bgzf_useek(BGZF *fp, long uoffset, int where)
1900 {
1901 if ( !fp->is_compressed )
1902 {
1903 if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
1904 {
1905 fp->errcode |= BGZF_ERR_IO;
1906 return -1;
1907 }
1908 fp->block_length = 0; // indicates current block has not been loaded
1909 fp->block_address = uoffset;
1910 fp->block_offset = 0;
1911 if (bgzf_read_block(fp) < 0) {
1912 fp->errcode |= BGZF_ERR_IO;
1913 return -1;
1914 }
1915 fp->uncompressed_address = uoffset;
1916 return 0;
1917 }
1918
1919 if ( !fp->idx )
1920 {
1921 fp->errcode |= BGZF_ERR_IO;
1922 return -1;
1923 }
1924
1925 // binary search
1926 int ilo = 0, ihi = fp->idx->noffs - 1;
1927 while ( ilo<=ihi )
1928 {
1929 int i = (ilo+ihi)*0.5;
1930 if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
1931 else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
1932 else break;
1933 }
1934 int i = ilo-1;
1935 if (hseek(fp->fp, fp->idx->offs[i].caddr, SEEK_SET) < 0)
1936 {
1937 fp->errcode |= BGZF_ERR_IO;
1938 return -1;
1939 }
1940 fp->block_length = 0; // indicates current block has not been loaded
1941 fp->block_address = fp->idx->offs[i].caddr;
1942 fp->block_offset = 0;
1943 if ( bgzf_read_block(fp) < 0 ) {
1944 fp->errcode |= BGZF_ERR_IO;
1945 return -1;
1946 }
1947 if ( uoffset - fp->idx->offs[i].uaddr > 0 )
1948 {
1949 fp->block_offset = uoffset - fp->idx->offs[i].uaddr;
1950 assert( fp->block_offset <= fp->block_length ); // todo: skipped, unindexed, blocks
1951 }
1952 fp->uncompressed_address = uoffset;
1953 return 0;
1954 }
1955
bgzf_utell(BGZF * fp)1956 long bgzf_utell(BGZF *fp)
1957 {
1958 return fp->uncompressed_address; // currently maintained only when reading
1959 }
1960