1 /*
2  * Copyright (c) 2013 Genome Research Ltd.
3  * Author(s): James Bonfield
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *    1. Redistributions of source code must retain the above copyright notice,
9  *       this list of conditions and the following disclaimer.
10  *
11  *    2. Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *
16  *    3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
17  *    Institute nor the names of its contributors may be used to endorse
18  *    or promote products derived from this software without specific
19  *    prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS
22  * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
23  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
24  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH
25  * LTD OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * Multi-threading.
36 
37 Trying to multi-thread BAM input is problematic as the BGZF boundaries
38 share no common ground with the sequence boundaries. Header and
39 sequences may be larger than a BGZF block, and the header may also be
40 in the same block as the sequence data.
41 
42 Therefore the granularity of multi-threading needs to be the
43 compression and uncompression code.
44 
45 Threadable sections:
46 
47 1) zlib portion bgzf_more_output?
48 2) decode (may merge with 1)
49 3) bgzf_write
50 
51 
52 Similary for cram writing:
53 1) Building crecs
54 2) Encode + basic huff
55 3) Block compression (may merge with 2)
56 
57 
58 Ie input is hard to multi-thread
59 Output can be distributed to multiple threads and aggregated together
60 before sending on to FILE *fp.
61  */
62 
63 
64 /*
65  * Author: James Bonfield, Wellcome Trust Sanger Institute. 2013
66  *
67  * A wrapper around SAM, BAM and CRAM I/O to give a unified interface.
68  */
69 
70 #ifdef HAVE_CONFIG_H
71 #include "io_lib_config.h"
72 #endif
73 
74 #include <string.h>
75 #include <assert.h>
76 
77 #include "io_lib/scram.h"
78 
79 #define SCRAM_BUF_SIZE (1024*1024)
80 
81 /*
82  * Expands the input buffer.
83  * Returns 0 on sucess
84  *        -1 on failure
85  */
scram_more_input(scram_fd * fd)86 static int scram_more_input(scram_fd *fd) {
87     size_t avail = fd->alloc - fd->used;
88     size_t l;
89 
90     l = fread(&fd->buf[fd->used], 1, avail, fd->fp);
91     if (l <= 0)
92 	return -1;
93 
94     fd->used += l;
95     return 0;
96 }
97 
98 /*
99  * Consumes a block of data from the input stream and returns a malloced
100  * copy of it. The input buffer is then copied down (FIXME: inefficient).
101  */
scram_input_next_block(scram_fd * fd,size_t max_size,size_t * out_size)102 static unsigned char *scram_input_next_block(scram_fd *fd, size_t max_size,
103 					     size_t *out_size) {
104     ssize_t l = MIN(max_size, fd->used);
105     ssize_t i;
106     unsigned char *r = NULL;
107 
108     if (max_size > fd->used) {
109 	scram_more_input(fd);
110 	if (fd->used == 0)
111 	    return NULL;
112     }
113 
114     if (fd->b->binary) {
115 	uint32_t bsize;
116 
117 	if (l < 19)
118 	    return NULL;
119 	bsize = fd->buf[16] + 256*fd->buf[17] + 1;
120 	fprintf(stderr, "block_size=%d\n", bsize);
121 
122 	l = MIN(bsize, l);
123     } else {
124 	for (i = l-1; i >= 0; i--) {
125 	    while (fd->buf[i] != '\n')
126 		i--;
127 	}
128 	assert(i >= 0);
129 
130 	l = i;
131     }
132 
133     if (!(r = malloc(l)))
134 	return NULL;
135     memcpy(r, fd->buf, l);
136     memcpy(fd->buf, &fd->buf[l], fd->used - l);
137     fd->used -= l;
138 
139     if (out_size)
140 	*out_size = l;
141 
142     return r;
143 }
144 
scram_input_bam_block(scram_fd * fd)145 int scram_input_bam_block(scram_fd *fd) {
146     size_t sz;
147     unsigned char *r;
148 
149     if (!fd->is_bam)
150 	return -1;
151     r = scram_input_next_block(fd, Z_BUFF_SIZE, &sz);
152     if (!r)
153 	return -1;
154 
155 //    if (fd->b->comp_p && fd->b->comp_p != fd->b->comp)
156 //	free(fd->b->comp_p);
157     fd->b->comp_p = r;
158     fd->b->comp_sz = sz;
159 
160     return 0;
161 }
162 
163 /*
164  * Opens filename.
165  * If reading we initially try cram first and then bam/sam if that fails.
166  * The exception is when reading from stdin, where bam/sam is first.
167  *
168  * If writing we look at the mode parameter:
169  *     w  => SAM
170  *     ws => SAM
171  *     wb => BAM
172  *     wc => CRAM
173  *
174  * Returns scram pointer on success
175  *         NULL on failure
176  */
scram_open(const char * filename,const char * mode)177 scram_fd *scram_open(const char *filename, const char *mode) {
178     char mode2[10];
179     scram_fd *fd = calloc(1, sizeof(*fd));
180     if (!fd)
181 	return NULL;
182 
183     fd->eof = 0;
184 
185     /* I/O buffer */
186     fd->fp = NULL;
187     fd->buf = NULL;
188     fd->alloc = fd->used = 0;
189     fd->pool = NULL;
190 
191     if (strcmp(filename, "-") == 0 && mode[0] == 'r'
192 	&& mode[1] != 'b' && mode[1] != 'c' && mode[1] != 's') {
193 	int c;
194 	/*
195 	 * Crude auto-detection.
196 	 * First char @ = sam, 0x1f = bam (gzip), C = cram
197 	 * Headerless SAM will need explicit mode setting.
198 	 */
199 	c = fgetc(stdin);
200 	ungetc(c, stdin);
201 
202 	if (c == '@')
203 	    sprintf(mode2, "rs%.7s", mode+1), mode = mode2;
204 	else if (c == 0x1f)
205 	    sprintf(mode2, "rb%.7s", mode+1), mode = mode2;
206 	else if (c == 'C')
207 	    sprintf(mode2, "rc%.7s", mode+1), mode = mode2;
208     }
209 
210     if (*mode == 'r') {
211 	if (mode[1] != 'b' && mode[1] != 's') {
212 	    if ((fd->c = cram_open(filename, mode))) {
213 		cram_load_reference(fd->c, NULL);
214 		fd->is_bam = 0;
215 		return fd;
216 	    }
217 	}
218 
219 	if ((fd->b = bam_open(filename, mode))) {
220 	    fd->is_bam = 1;
221 	    return fd;
222 	}
223 
224 	free(fd);
225 	return NULL;
226     }
227 
228     /* For writing we cannot auto detect, so create the file type based
229      * on the format in the mode string.
230      */
231     if (strncmp(mode, "wc", 2) == 0) {
232 	if (!(fd->c = cram_open(filename, mode))) {
233 	    free(fd);
234 	    return NULL;
235 	}
236 	fd->is_bam = 0;
237 	return fd;
238     }
239 
240     /* Otherwise assume bam/sam */
241     if (!(fd->b = bam_open(filename, mode))) {
242 	free(fd);
243 	return NULL;
244     }
245     fd->is_bam = 1;
246     return fd;
247 }
248 
249 #if defined(CRAM_IO_CUSTOM_BUFFERING)
250 /*
251  * Open CRAM file for reading via callbacks
252  *
253  * Returns scram pointer on success
254  *         NULL on failure
255  */
scram_open_cram_via_callbacks(char const * filename,cram_io_allocate_read_input_t callback_allocate_function,cram_io_deallocate_read_input_t callback_deallocate_function,size_t const bufsize)256 scram_fd *scram_open_cram_via_callbacks(
257     char const * filename,
258     cram_io_allocate_read_input_t   callback_allocate_function,
259     cram_io_deallocate_read_input_t callback_deallocate_function,
260     size_t const bufsize
261 )
262 {
263     scram_fd *fd = calloc(1, sizeof(*fd));
264     if (!fd)
265 	return NULL;
266 
267     fd->eof = 0;
268 
269     /* I/O buffer */
270     fd->fp = NULL;
271     fd->buf = NULL;
272     fd->alloc = fd->used = 0;
273     fd->pool = NULL;
274 
275     if ((fd->c = cram_open_by_callbacks(filename,
276 					callback_allocate_function,
277 					callback_deallocate_function,
278 					bufsize)))
279     {
280 	cram_load_reference(fd->c, NULL);
281 	fd->is_bam = 0;
282 	return fd;
283     }
284 
285     return NULL;
286 }
287 #endif
288 
scram_close(scram_fd * fd)289 int scram_close(scram_fd *fd) {
290     int r;
291 
292     if (fd->is_bam) {
293 	r = bam_close(fd->b);
294     } else {
295 	r = cram_close(fd->c);
296     }
297 
298     if (fd->pool)
299 	t_pool_destroy(fd->pool, 0);
300 
301 
302     free(fd);
303     return r;
304 }
305 
scram_get_header(scram_fd * fd)306 SAM_hdr *scram_get_header(scram_fd *fd) {
307 #ifdef __INTEL_COMPILER
308     // avoids cmovne generation from icc 2015 (bug)
309     return fd->is_bam && fd->b ? fd->b->header : fd->c->header;
310 #else
311     return fd->is_bam ? fd->b->header : fd->c->header;
312 #endif
313 }
314 
scram_get_refs(scram_fd * fd)315 refs_t *scram_get_refs(scram_fd *fd) {
316     return fd->is_bam ? NULL : fd->c->refs;
317 }
318 
scram_set_refs(scram_fd * fd,refs_t * refs)319 void scram_set_refs(scram_fd *fd, refs_t *refs) {
320     if (fd->is_bam)
321 	return;
322     if (fd->c->refs)
323 	refs_free(fd->c->refs);
324     fd->c->refs = refs;
325     if (refs)
326 	refs->count++;
327 }
328 
scram_set_header(scram_fd * fd,SAM_hdr * sh)329 void scram_set_header(scram_fd *fd, SAM_hdr *sh) {
330     if (fd->is_bam) {
331 	fd->b->header = sh;
332     } else {
333 	fd->c->header = sh;
334     }
335 
336     sam_hdr_incr_ref(sh);
337 }
338 
scram_write_header(scram_fd * fd)339 int scram_write_header(scram_fd *fd) {
340     return fd->is_bam
341 	? bam_write_header(fd->b)
342 	: cram_write_SAM_hdr(fd->c, fd->c->header);
343 }
344 
scram_get_seq(scram_fd * fd,bam_seq_t ** bsp)345 int scram_get_seq(scram_fd *fd, bam_seq_t **bsp) {
346     if (fd->is_bam) {
347 	switch (bam_get_seq(fd->b, bsp)) {
348 	case 1:
349 	    return 0;
350 
351 	case 0:
352 	    // FIXME: if we ever implement range queries for BAM this will
353 	    // need amendments to not claim a sub-range is invalid EOF.
354 	    fd->eof = fd->b->eof_block ? 1 : 2;
355 	    return -1;
356 
357 	default:
358 	    fd->eof = -1; // err
359 	    return -1;
360 	}
361     }
362 
363     if (-1 == cram_get_bam_seq(fd->c, bsp)) {
364 	fd->eof = cram_eof(fd->c);
365 	return -1;
366     }
367     return 0;
368 }
369 
scram_next_seq(scram_fd * fd,bam_seq_t ** bsp)370 int scram_next_seq(scram_fd *fd, bam_seq_t **bsp) {
371     return scram_get_seq(fd, bsp);
372 }
373 
scram_put_seq(scram_fd * fd,bam_seq_t * s)374 int scram_put_seq(scram_fd *fd, bam_seq_t *s) {
375     return fd->is_bam
376 	? bam_put_seq(fd->b, s)
377 	: cram_put_bam_seq(fd->c, s);
378 }
379 
scram_set_option(scram_fd * fd,enum cram_option opt,...)380 int scram_set_option(scram_fd *fd, enum cram_option opt, ...) {
381     int r = 0;
382     va_list args;
383 
384     va_start(args, opt);
385 
386     if (opt == CRAM_OPT_THREAD_POOL) {
387 	t_pool *p = va_arg(args, t_pool *);
388 	if (fd->is_bam)
389 	    return bam_set_option(fd->b, BAM_OPT_THREAD_POOL, p);
390 	else
391 	    return cram_set_option(fd->c, CRAM_OPT_THREAD_POOL, p);
392     } else if (opt == CRAM_OPT_NTHREADS) {
393 	int nthreads = va_arg(args, int);
394 	if (nthreads > 1) {
395 	    if (!(fd->pool = t_pool_init(nthreads*2, nthreads)))
396 		return -1;
397 
398 	    if (fd->is_bam)
399 		return bam_set_option(fd->b, BAM_OPT_THREAD_POOL, fd->pool);
400 	    else
401 		return cram_set_option(fd->c, CRAM_OPT_THREAD_POOL, fd->pool);
402 	} else {
403 	    fd->pool = NULL;
404 	    return 0;
405 	}
406     } else if (opt == CRAM_OPT_BINNING) {
407 	int bin = va_arg(args, int);
408 
409 	return fd->is_bam
410 	    ? bam_set_option (fd->b,  BAM_OPT_BINNING, bin)
411 	    : cram_set_option(fd->c, CRAM_OPT_BINNING, bin);
412     } else if (opt == CRAM_OPT_IGNORE_CHKSUM) {
413 	int chk = va_arg(args, int);
414 
415 	return fd->is_bam
416 	    ? bam_set_option (fd->b,  BAM_OPT_IGNORE_CHKSUM, chk)
417 	    : cram_set_option(fd->c, CRAM_OPT_IGNORE_CHKSUM, chk);
418     } else if (opt == CRAM_OPT_WITH_BGZIP_INDEX) {
419         gzi *idx = va_arg(args, gzi *);
420         if (fd->is_bam)
421 	    return bam_set_option (fd->b,  BAM_OPT_WITH_BGZIP_IDX, idx);
422     } else if (opt == CRAM_OPT_OUTPUT_BGZIP_IDX) {
423         char *idx_fn = va_arg(args, char *);
424         if (fd->is_bam)
425 	    return bam_set_option (fd->b,  BAM_OPT_OUTPUT_BGZIP_IDX, idx_fn);
426     }
427 
428     if (!fd->is_bam) {
429 	r = cram_set_voption(fd->c, opt, args);
430     }
431 
432     va_end(args);
433 
434     return r;
435 }
436 
437 /*! Returns the line number when processing a SAM file
438  *
439  * @return
440  * Returns line number if input is SAM;
441  *         0 for CRAM / BAM input.
442  */
scram_line(scram_fd * fd)443 int scram_line(scram_fd *fd) {
444     if (fd->is_bam)
445 	return fd->b->line;
446     else
447 	return 0;
448 }
449 
450 
451 #ifdef HAVE_MALLOC_H
452 #include <malloc.h>
453 #endif
454 
455 /*! Advises the memory allocator of CRAM usage patterns
456  *
457  * CRAM decoding will typically allocate & deallocate blocks for each
458  * slice.  Under certain conditions this can cause a large number of
459  * page faults where malloc gives a page back to the OS (free) and
460  * then requests it again (the next malloc).  We could write our own
461  * memory cache layer on top of malloc to keep track of previously
462  * freed blocks, but it is complex in a multi-threaded environment and
463  * arguably this is what malloc does anyway.
464  *
465  * Under GNU malloc we can simply tune it to avoid too many page faults.
466  */
scram_init(void)467 void scram_init(void) {
468 #if defined(HAVE_MALLOPT) && defined(M_MMAP_MAX)
469     mallopt(M_MMAP_MAX, 0);
470 #endif
471 }
472