1 /*
2 * Copyright (c) 2013 Genome Research Ltd.
3 * Author(s): James Bonfield
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials provided
14 * with the distribution.
15 *
16 * 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger
17 * Institute nor the names of its contributors may be used to endorse
18 * or promote products derived from this software without specific
19 * prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS
22 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
23 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
24 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH
25 * LTD OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 /*
35 * Multi-threading.
36
37 Trying to multi-thread BAM input is problematic as the BGZF boundaries
38 share no common ground with the sequence boundaries. Header and
39 sequences may be larger than a BGZF block, and the header may also be
40 in the same block as the sequence data.
41
42 Therefore the granularity of multi-threading needs to be the
43 compression and uncompression code.
44
45 Threadable sections:
46
47 1) zlib portion bgzf_more_output?
48 2) decode (may merge with 1)
49 3) bgzf_write
50
51
52 Similary for cram writing:
53 1) Building crecs
54 2) Encode + basic huff
55 3) Block compression (may merge with 2)
56
57
58 Ie input is hard to multi-thread
59 Output can be distributed to multiple threads and aggregated together
60 before sending on to FILE *fp.
61 */
62
63
64 /*
65 * Author: James Bonfield, Wellcome Trust Sanger Institute. 2013
66 *
67 * A wrapper around SAM, BAM and CRAM I/O to give a unified interface.
68 */
69
70 #ifdef HAVE_CONFIG_H
71 #include "io_lib_config.h"
72 #endif
73
74 #include <string.h>
75 #include <assert.h>
76
77 #include "io_lib/scram.h"
78
79 #define SCRAM_BUF_SIZE (1024*1024)
80
81 /*
82 * Expands the input buffer.
83 * Returns 0 on sucess
84 * -1 on failure
85 */
scram_more_input(scram_fd * fd)86 static int scram_more_input(scram_fd *fd) {
87 size_t avail = fd->alloc - fd->used;
88 size_t l;
89
90 l = fread(&fd->buf[fd->used], 1, avail, fd->fp);
91 if (l <= 0)
92 return -1;
93
94 fd->used += l;
95 return 0;
96 }
97
98 /*
99 * Consumes a block of data from the input stream and returns a malloced
100 * copy of it. The input buffer is then copied down (FIXME: inefficient).
101 */
scram_input_next_block(scram_fd * fd,size_t max_size,size_t * out_size)102 static unsigned char *scram_input_next_block(scram_fd *fd, size_t max_size,
103 size_t *out_size) {
104 ssize_t l = MIN(max_size, fd->used);
105 ssize_t i;
106 unsigned char *r = NULL;
107
108 if (max_size > fd->used) {
109 scram_more_input(fd);
110 if (fd->used == 0)
111 return NULL;
112 }
113
114 if (fd->b->binary) {
115 uint32_t bsize;
116
117 if (l < 19)
118 return NULL;
119 bsize = fd->buf[16] + 256*fd->buf[17] + 1;
120 fprintf(stderr, "block_size=%d\n", bsize);
121
122 l = MIN(bsize, l);
123 } else {
124 for (i = l-1; i >= 0; i--) {
125 while (fd->buf[i] != '\n')
126 i--;
127 }
128 assert(i >= 0);
129
130 l = i;
131 }
132
133 if (!(r = malloc(l)))
134 return NULL;
135 memcpy(r, fd->buf, l);
136 memcpy(fd->buf, &fd->buf[l], fd->used - l);
137 fd->used -= l;
138
139 if (out_size)
140 *out_size = l;
141
142 return r;
143 }
144
scram_input_bam_block(scram_fd * fd)145 int scram_input_bam_block(scram_fd *fd) {
146 size_t sz;
147 unsigned char *r;
148
149 if (!fd->is_bam)
150 return -1;
151 r = scram_input_next_block(fd, Z_BUFF_SIZE, &sz);
152 if (!r)
153 return -1;
154
155 // if (fd->b->comp_p && fd->b->comp_p != fd->b->comp)
156 // free(fd->b->comp_p);
157 fd->b->comp_p = r;
158 fd->b->comp_sz = sz;
159
160 return 0;
161 }
162
163 /*
164 * Opens filename.
165 * If reading we initially try cram first and then bam/sam if that fails.
166 * The exception is when reading from stdin, where bam/sam is first.
167 *
168 * If writing we look at the mode parameter:
169 * w => SAM
170 * ws => SAM
171 * wb => BAM
172 * wc => CRAM
173 *
174 * Returns scram pointer on success
175 * NULL on failure
176 */
scram_open(const char * filename,const char * mode)177 scram_fd *scram_open(const char *filename, const char *mode) {
178 char mode2[10];
179 scram_fd *fd = calloc(1, sizeof(*fd));
180 if (!fd)
181 return NULL;
182
183 fd->eof = 0;
184
185 /* I/O buffer */
186 fd->fp = NULL;
187 fd->buf = NULL;
188 fd->alloc = fd->used = 0;
189 fd->pool = NULL;
190
191 if (strcmp(filename, "-") == 0 && mode[0] == 'r'
192 && mode[1] != 'b' && mode[1] != 'c' && mode[1] != 's') {
193 int c;
194 /*
195 * Crude auto-detection.
196 * First char @ = sam, 0x1f = bam (gzip), C = cram
197 * Headerless SAM will need explicit mode setting.
198 */
199 c = fgetc(stdin);
200 ungetc(c, stdin);
201
202 if (c == '@')
203 sprintf(mode2, "rs%.7s", mode+1), mode = mode2;
204 else if (c == 0x1f)
205 sprintf(mode2, "rb%.7s", mode+1), mode = mode2;
206 else if (c == 'C')
207 sprintf(mode2, "rc%.7s", mode+1), mode = mode2;
208 }
209
210 if (*mode == 'r') {
211 if (mode[1] != 'b' && mode[1] != 's') {
212 if ((fd->c = cram_open(filename, mode))) {
213 cram_load_reference(fd->c, NULL);
214 fd->is_bam = 0;
215 return fd;
216 }
217 }
218
219 if ((fd->b = bam_open(filename, mode))) {
220 fd->is_bam = 1;
221 return fd;
222 }
223
224 free(fd);
225 return NULL;
226 }
227
228 /* For writing we cannot auto detect, so create the file type based
229 * on the format in the mode string.
230 */
231 if (strncmp(mode, "wc", 2) == 0) {
232 if (!(fd->c = cram_open(filename, mode))) {
233 free(fd);
234 return NULL;
235 }
236 fd->is_bam = 0;
237 return fd;
238 }
239
240 /* Otherwise assume bam/sam */
241 if (!(fd->b = bam_open(filename, mode))) {
242 free(fd);
243 return NULL;
244 }
245 fd->is_bam = 1;
246 return fd;
247 }
248
249 #if defined(CRAM_IO_CUSTOM_BUFFERING)
250 /*
251 * Open CRAM file for reading via callbacks
252 *
253 * Returns scram pointer on success
254 * NULL on failure
255 */
scram_open_cram_via_callbacks(char const * filename,cram_io_allocate_read_input_t callback_allocate_function,cram_io_deallocate_read_input_t callback_deallocate_function,size_t const bufsize)256 scram_fd *scram_open_cram_via_callbacks(
257 char const * filename,
258 cram_io_allocate_read_input_t callback_allocate_function,
259 cram_io_deallocate_read_input_t callback_deallocate_function,
260 size_t const bufsize
261 )
262 {
263 scram_fd *fd = calloc(1, sizeof(*fd));
264 if (!fd)
265 return NULL;
266
267 fd->eof = 0;
268
269 /* I/O buffer */
270 fd->fp = NULL;
271 fd->buf = NULL;
272 fd->alloc = fd->used = 0;
273 fd->pool = NULL;
274
275 if ((fd->c = cram_open_by_callbacks(filename,
276 callback_allocate_function,
277 callback_deallocate_function,
278 bufsize)))
279 {
280 cram_load_reference(fd->c, NULL);
281 fd->is_bam = 0;
282 return fd;
283 }
284
285 return NULL;
286 }
287 #endif
288
scram_close(scram_fd * fd)289 int scram_close(scram_fd *fd) {
290 int r;
291
292 if (fd->is_bam) {
293 r = bam_close(fd->b);
294 } else {
295 r = cram_close(fd->c);
296 }
297
298 if (fd->pool)
299 t_pool_destroy(fd->pool, 0);
300
301
302 free(fd);
303 return r;
304 }
305
scram_get_header(scram_fd * fd)306 SAM_hdr *scram_get_header(scram_fd *fd) {
307 #ifdef __INTEL_COMPILER
308 // avoids cmovne generation from icc 2015 (bug)
309 return fd->is_bam && fd->b ? fd->b->header : fd->c->header;
310 #else
311 return fd->is_bam ? fd->b->header : fd->c->header;
312 #endif
313 }
314
scram_get_refs(scram_fd * fd)315 refs_t *scram_get_refs(scram_fd *fd) {
316 return fd->is_bam ? NULL : fd->c->refs;
317 }
318
scram_set_refs(scram_fd * fd,refs_t * refs)319 void scram_set_refs(scram_fd *fd, refs_t *refs) {
320 if (fd->is_bam)
321 return;
322 if (fd->c->refs)
323 refs_free(fd->c->refs);
324 fd->c->refs = refs;
325 if (refs)
326 refs->count++;
327 }
328
scram_set_header(scram_fd * fd,SAM_hdr * sh)329 void scram_set_header(scram_fd *fd, SAM_hdr *sh) {
330 if (fd->is_bam) {
331 fd->b->header = sh;
332 } else {
333 fd->c->header = sh;
334 }
335
336 sam_hdr_incr_ref(sh);
337 }
338
scram_write_header(scram_fd * fd)339 int scram_write_header(scram_fd *fd) {
340 return fd->is_bam
341 ? bam_write_header(fd->b)
342 : cram_write_SAM_hdr(fd->c, fd->c->header);
343 }
344
scram_get_seq(scram_fd * fd,bam_seq_t ** bsp)345 int scram_get_seq(scram_fd *fd, bam_seq_t **bsp) {
346 if (fd->is_bam) {
347 switch (bam_get_seq(fd->b, bsp)) {
348 case 1:
349 return 0;
350
351 case 0:
352 // FIXME: if we ever implement range queries for BAM this will
353 // need amendments to not claim a sub-range is invalid EOF.
354 fd->eof = fd->b->eof_block ? 1 : 2;
355 return -1;
356
357 default:
358 fd->eof = -1; // err
359 return -1;
360 }
361 }
362
363 if (-1 == cram_get_bam_seq(fd->c, bsp)) {
364 fd->eof = cram_eof(fd->c);
365 return -1;
366 }
367 return 0;
368 }
369
scram_next_seq(scram_fd * fd,bam_seq_t ** bsp)370 int scram_next_seq(scram_fd *fd, bam_seq_t **bsp) {
371 return scram_get_seq(fd, bsp);
372 }
373
scram_put_seq(scram_fd * fd,bam_seq_t * s)374 int scram_put_seq(scram_fd *fd, bam_seq_t *s) {
375 return fd->is_bam
376 ? bam_put_seq(fd->b, s)
377 : cram_put_bam_seq(fd->c, s);
378 }
379
scram_set_option(scram_fd * fd,enum cram_option opt,...)380 int scram_set_option(scram_fd *fd, enum cram_option opt, ...) {
381 int r = 0;
382 va_list args;
383
384 va_start(args, opt);
385
386 if (opt == CRAM_OPT_THREAD_POOL) {
387 t_pool *p = va_arg(args, t_pool *);
388 if (fd->is_bam)
389 return bam_set_option(fd->b, BAM_OPT_THREAD_POOL, p);
390 else
391 return cram_set_option(fd->c, CRAM_OPT_THREAD_POOL, p);
392 } else if (opt == CRAM_OPT_NTHREADS) {
393 int nthreads = va_arg(args, int);
394 if (nthreads > 1) {
395 if (!(fd->pool = t_pool_init(nthreads*2, nthreads)))
396 return -1;
397
398 if (fd->is_bam)
399 return bam_set_option(fd->b, BAM_OPT_THREAD_POOL, fd->pool);
400 else
401 return cram_set_option(fd->c, CRAM_OPT_THREAD_POOL, fd->pool);
402 } else {
403 fd->pool = NULL;
404 return 0;
405 }
406 } else if (opt == CRAM_OPT_BINNING) {
407 int bin = va_arg(args, int);
408
409 return fd->is_bam
410 ? bam_set_option (fd->b, BAM_OPT_BINNING, bin)
411 : cram_set_option(fd->c, CRAM_OPT_BINNING, bin);
412 } else if (opt == CRAM_OPT_IGNORE_CHKSUM) {
413 int chk = va_arg(args, int);
414
415 return fd->is_bam
416 ? bam_set_option (fd->b, BAM_OPT_IGNORE_CHKSUM, chk)
417 : cram_set_option(fd->c, CRAM_OPT_IGNORE_CHKSUM, chk);
418 } else if (opt == CRAM_OPT_WITH_BGZIP_INDEX) {
419 gzi *idx = va_arg(args, gzi *);
420 if (fd->is_bam)
421 return bam_set_option (fd->b, BAM_OPT_WITH_BGZIP_IDX, idx);
422 } else if (opt == CRAM_OPT_OUTPUT_BGZIP_IDX) {
423 char *idx_fn = va_arg(args, char *);
424 if (fd->is_bam)
425 return bam_set_option (fd->b, BAM_OPT_OUTPUT_BGZIP_IDX, idx_fn);
426 }
427
428 if (!fd->is_bam) {
429 r = cram_set_voption(fd->c, opt, args);
430 }
431
432 va_end(args);
433
434 return r;
435 }
436
437 /*! Returns the line number when processing a SAM file
438 *
439 * @return
440 * Returns line number if input is SAM;
441 * 0 for CRAM / BAM input.
442 */
scram_line(scram_fd * fd)443 int scram_line(scram_fd *fd) {
444 if (fd->is_bam)
445 return fd->b->line;
446 else
447 return 0;
448 }
449
450
451 #ifdef HAVE_MALLOC_H
452 #include <malloc.h>
453 #endif
454
455 /*! Advises the memory allocator of CRAM usage patterns
456 *
457 * CRAM decoding will typically allocate & deallocate blocks for each
458 * slice. Under certain conditions this can cause a large number of
459 * page faults where malloc gives a page back to the OS (free) and
460 * then requests it again (the next malloc). We could write our own
461 * memory cache layer on top of malloc to keep track of previously
462 * freed blocks, but it is complex in a multi-threaded environment and
463 * arguably this is what malloc does anyway.
464 *
465 * Under GNU malloc we can simply tune it to avoid too many page faults.
466 */
scram_init(void)467 void scram_init(void) {
468 #if defined(HAVE_MALLOPT) && defined(M_MMAP_MAX)
469 mallopt(M_MMAP_MAX, 0);
470 #endif
471 }
472