1 /***************************************************************
2
3 The Subread software package is free software package:
4 you can redistribute it and/or modify it under the terms
5 of the GNU General Public License as published by the
6 Free Software Foundation, either version 3 of the License,
7 or (at your option) any later version.
8
9 Subread is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty
11 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12
13 See the GNU General Public License for more details.
14
15 Authors: Drs Yang Liao and Wei Shi
16
17 ***************************************************************/
18
19 /***************************************************************
20
21 The SamBam_reg2bin function was derived from the BAM
22 specification. (The SAM Format Specication Working
23 Group, September 7, 2011)
24
25 ***************************************************************/
26
27
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <assert.h>
32 #include <ctype.h>
33 #include <unistd.h>
34 #include "subread.h"
35 #include "core.h"
36 #include "HelperFunctions.h"
37 #include "gene-algorithms.h"
38 #include "sambam-file.h"
39 #include "input-files.h"
40 #define Z_DEFAULT_MEM_LEVEL 8
41
SamBam_fetch_next_chunk(SamBam_FILE * fp)42 int SamBam_fetch_next_chunk(SamBam_FILE *fp)
43 {
44 int x1, room = SAMBAM_INPUT_STREAM_SIZE - ( fp -> input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr);
45
46 if(room < 65536)
47 return -1;
48
49
50 for(x1=0; x1 < fp->input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr; x1 ++)
51 {
52 fp -> input_binary_stream_buffer [x1] = fp -> input_binary_stream_buffer [x1 + fp->input_binary_stream_read_ptr - fp -> input_binary_stream_buffer_start_ptr];
53 }
54 fp -> input_binary_stream_buffer_start_ptr = fp->input_binary_stream_read_ptr;
55
56 char * in_buff = malloc(65537);
57 unsigned int real_len = 0;
58 int ret, have = 0;
59
60 while (1){
61 int nchunk = 0;
62 ret = PBam_get_next_zchunk(fp -> os_file, in_buff, 65536, & real_len);
63 if(ret > 0)
64 nchunk = SamBam_unzip(fp -> input_binary_stream_buffer + fp->input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr + have, 65536 , in_buff , ret, 0);
65 else if(ret == -2){
66 SUBREADputs("ERROR: BAM format is broken.");
67 return -2;
68 }
69
70 //printf("RET=%d; CHK=%d\n", ret, nchunk);
71
72 if(nchunk>0)
73 have += nchunk;
74 if(have > 3000) break;
75 if(feof(fp->os_file)){
76 fp->is_eof=1;
77 break;
78 }
79 }
80 free(in_buff);
81
82 fp -> input_binary_stream_write_ptr += have;
83
84 return have;
85
86 }
87
is_paired_end_BAM(char * fn)88 int is_paired_end_BAM(char * fn){
89 FILE * fp = fopen(fn, "rb");
90 if(!fp) return 0;
91 unsigned char h2[2];
92 int retvv = fread(h2, 1,2,fp);
93 if(retvv <2) return 0;
94 int is_bam = (h2[0]==31) && (h2[1]==139);
95 fclose(fp);
96
97 SamBam_FILE * sambam_reader;
98 char fline[3000];
99 sambam_reader = SamBam_fopen(fn, is_bam?SAMBAM_FILE_BAM:SAMBAM_FILE_SAM);
100 while(1){
101 char * is_ret = SamBam_fgets(sambam_reader, fline, 2999, 1);
102 if(!is_ret) return 0;
103 if(fline[0]!='@') break;
104 }
105 char * tok = NULL;
106 strtok_r(fline, "\t", &tok);
107 int flags = atoi(strtok_r(NULL, "\t", &tok));
108 SamBam_fclose(sambam_reader);
109 return flags & 1;
110 }
SamBam_fopen(char * fname,int file_type)111 SamBam_FILE * SamBam_fopen(char * fname , int file_type)
112 {
113 SamBam_FILE * ret = (SamBam_FILE *)malloc(sizeof(SamBam_FILE));
114 memset(ret, 0, sizeof(SamBam_FILE));
115 ret -> file_type = file_type;
116
117 if(file_type ==SAMBAM_FILE_SAM)
118 {
119 ret -> os_file = f_subr_open(fname, "rb");
120 if(!ret -> os_file)
121 {
122 free(ret);
123 return NULL;
124 }
125
126 ret -> header_length = 0;
127 char nch0=-1, nchold=-1;
128
129 while(!feof(ret -> os_file)){
130 nchold = nch0;
131 nch0=fgetc(ret -> os_file);
132
133 if(nchold=='\n' && nch0!='@') break;
134
135 if(nch0!='@' && ret -> header_length == 0){
136 ret -> header_length = 0;
137 break;
138 }
139 ret -> header_length++;
140 }
141 //SUBREADprintf("The SAM header has %d bytes\n", ret -> header_length);
142 fseek(ret -> os_file,0,SEEK_SET);
143 }
144 else
145 {
146 ret -> os_file = f_subr_open(fname, "rb");
147 if(ret -> os_file == NULL)
148 {
149 free(ret);
150 return NULL;
151 }
152 unsigned char first_ch = fgetc(ret->os_file);
153 unsigned char second_ch = fgetc(ret->os_file);
154
155 if(first_ch!=31 || second_ch!=139)
156 {
157 free(ret);
158 SUBREADprintf("Not a BAM file! %d %d\n", first_ch, second_ch);
159 return NULL;
160 }
161
162 fseek(ret->os_file, 0, SEEK_SET);
163
164 ret -> input_binary_stream_buffer = (char *)malloc(SAMBAM_INPUT_STREAM_SIZE);
165 ret -> input_binary_stream_read_ptr = 0;
166 ret -> input_binary_stream_write_ptr = 0;
167 ret -> input_binary_stream_buffer_start_ptr = 0;
168
169 ret -> bam_file_stage = BAM_FILE_STAGE_HEADER;
170 ret -> is_eof = 0;
171
172 SB_FETCH(ret);
173
174 if(SB_EOF(ret))
175 {
176 free(ret->input_binary_stream_buffer);
177 free(ret);
178 SUBREADprintf("FEOF 0.\n");
179 return NULL;
180
181 }
182
183 int magic_4 = 0;
184 memcpy(&magic_4 , SB_READ(ret), 4);
185 SB_RINC(ret, 4);
186
187 if(magic_4 != 21840194) // this number is the four bytes of "BAM1"
188 {
189 free(ret->input_binary_stream_buffer);
190 free(ret);
191 SUBREADprintf("FEOF 4 == %d.\n", magic_4);
192 return NULL;
193 }
194
195
196 int l_text = 0;
197 memcpy(&l_text, SB_READ(ret), 4);
198 SB_RINC(ret, 4);
199
200 ret -> bam_file_next_section_start = ret -> input_binary_stream_read_ptr + l_text;
201 ret -> header_length = ret -> bam_file_next_section_start;
202 }
203 return ret;
204 }
205
cigar_op_char(int ch)206 char cigar_op_char(int ch)
207 {
208 if(ch<9)
209 return "MIDNSHP=X"[ch];
210 else
211 {
212 SUBREADprintf("Unknwon opt was found in the CIGAR string: '%c'.\n", ch);
213 return 'M';
214 }
215 }
216
read_int_char(int ch)217 char read_int_char(int ch)
218 {
219 assert(ch<16);
220 return "=ACMGRSVTWYHKDBN"[ch];
221 }
222
SamBam_fclose(SamBam_FILE * fp)223 void SamBam_fclose(SamBam_FILE * fp)
224 {
225 if(fp->file_type==SAMBAM_FILE_SAM)
226 {
227 fclose(fp->os_file);
228 free(fp);
229 }
230 else
231 {
232 fclose(fp->os_file);
233 free(fp -> input_binary_stream_buffer);
234 free(fp -> bam_chro_table);
235 free(fp);
236 }
237 }
238
SamBam_feof(SamBam_FILE * fp)239 int SamBam_feof(SamBam_FILE * fp)
240 {
241 if(fp->file_type ==SAMBAM_FILE_SAM)
242 return feof(fp->os_file);
243 else return SB_EOF(fp);
244 }
245
SamBam_read_ref_info(SamBam_FILE * ret)246 void SamBam_read_ref_info(SamBam_FILE * ret)
247 {
248 unsigned int ref_info_size = 0;
249 ret ->bam_chro_table_size = 0;
250 //printf("CKK0\n");
251
252 SB_FETCH(ret);
253 if(SB_EOF(ret))
254 return;
255
256 //printf("CKK1\n");
257
258 memcpy(&ref_info_size, SB_READ(ret),4);
259 SB_RINC(ret, 4);
260
261 int xk1;
262 ret -> bam_chro_table = malloc(sizeof(SamBam_Reference_Info) * ref_info_size);
263 for(xk1=0;xk1<ref_info_size;xk1++)
264 {
265 SB_FETCH(ret);
266
267 if(SB_EOF(ret))
268 break;
269
270 int ref_name_len = 0;
271 memcpy(&ref_name_len, SB_READ(ret),4);
272 SB_RINC(ret, 4);
273
274 int ref_readin_len = min(ref_name_len, BAM_MAX_CHROMOSOME_NAME_LEN-1);
275 int ref_skip_len = ref_name_len - ref_readin_len;
276
277 memcpy(ret -> bam_chro_table[xk1].chro_name, SB_READ(ret), ref_readin_len);
278 ret -> bam_chro_table[xk1].chro_name[ref_readin_len] = 0;
279 SB_RINC(ret, ref_readin_len + ref_skip_len);
280
281 memcpy(&(ret -> bam_chro_table[xk1].chro_length), SB_READ(ret),4);
282 SB_RINC(ret, 4);
283
284 //SUBREADprintf("CHRO[%d] : %s [%d]\n", xk1+1, ret -> bam_chro_table[xk1].chro_name , ret -> bam_chro_table[xk1].chro_length);
285 }
286 ret ->bam_chro_table_size = ref_info_size;
287 }
288
SamBam_fgets(SamBam_FILE * fp,char * buff,int buff_len,int seq_needed)289 char * SamBam_fgets(SamBam_FILE * fp, char * buff , int buff_len, int seq_needed)
290 {
291 if(fp->file_type==SAMBAM_FILE_SAM){
292 char * ret = fgets(buff, buff_len, fp->os_file);
293 int strlenbuff = 0;
294 if(ret){
295 strlenbuff = strlen(buff);
296 ///if(buff[0]!='@')
297 // if(strlenbuff < 100)SUBREADprintf("WRONG LOAD: '%s'\n", buff);
298 }
299 if(strlenbuff < 1 || ret == NULL) return NULL;
300 else{
301 if(ret[strlenbuff-1]!='\n')
302 {
303 while(1)
304 {
305 int ch = getc(fp->os_file);
306 if(ch == EOF || ch == '\n')break;
307 }
308 ret[strlenbuff-1] = '\n';
309 }
310 if(fp -> is_paired_end < 10){
311 if(buff[0]!='@'){
312 int tabs = 0,x1=0, tmpi=0;
313 for(x1 = 0; x1 < strlenbuff; x1++){
314 if(buff[x1] == '\t'){
315 if(tabs == 1){
316 // SUBREADprintf("TMPI_SAM = %d\n", tmpi);
317 fp -> is_paired_end = 10 + (tmpi & 1);
318 break;
319 } else tabs ++;
320 }else{
321 if(tabs == 1)tmpi = tmpi * 10 + buff[x1]-'0';
322 }
323 }
324 }
325 }
326
327 if(buff[0] == '@'){
328 fp -> header_length = ftello(fp->os_file) + strlenbuff+1;
329 }
330 return ret;
331 }
332 }
333 else
334 {
335 int xk1;
336 // decrypt the BAM mess.
337 if(fp-> bam_file_stage == BAM_FILE_STAGE_HEADER)
338 {
339 char nch;
340 xk1=0;
341 SB_FETCH(fp);
342 if(SB_EOF(fp))
343 return NULL;
344
345 while(1)
346 {
347 if(fp -> input_binary_stream_read_ptr >= fp -> bam_file_next_section_start)
348 break;
349
350 SB_FETCH(fp);
351 nch = *(SB_READ(fp));
352 SB_RINC(fp,1);
353
354 //printf("%c", nch);
355 if(nch == '\r')continue;
356 if(SB_EOF(fp)||nch == '\n' || nch <0) break;
357 if(xk1 < buff_len-2)
358 {
359 buff[xk1]=nch;
360 xk1++;
361 }
362 }
363
364 buff[xk1]='\n';
365 buff[xk1+1]=0;
366
367 //SUBREADprintf("BUFF=%s\n========================================================================\n\n", buff);
368 //printf("RL=%d , PTR %d >? RECORD_START %d\n\n\n", xk1, fp -> input_binary_stream_read_ptr , fp -> bam_file_next_section_start);
369
370 if(fp -> input_binary_stream_read_ptr >= fp -> bam_file_next_section_start)
371 {
372 SamBam_read_ref_info(fp);
373 fp -> bam_file_stage = BAM_FILE_STAGE_ALIGNMENT;
374 fp -> header_length = fp-> input_binary_stream_read_ptr;
375 }
376 return buff;
377 }
378 else
379 {
380 SamBam_Alignment *aln = &fp->aln_buff;
381 int chunk_ptr = 0;
382 SB_FETCH(fp);
383 if(SB_EOF(fp)) return NULL;
384
385 fp -> is_paired_end = 10 + ((*(fp -> input_binary_stream_buffer + fp -> input_binary_stream_read_ptr - fp -> input_binary_stream_buffer_start_ptr + 18)) & 1);
386 //SUBREADprintf("FLAG=%d\n", *(fp -> input_binary_stream_buffer + fp -> input_binary_stream_read_ptr - fp -> input_binary_stream_buffer_start_ptr + 18) );
387 int text_len = PBam_chunk_gets(SB_READ(fp) , &chunk_ptr, fp -> input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr , fp -> bam_chro_table, buff , buff_len, aln, seq_needed);
388 SB_RINC(fp, chunk_ptr);
389
390 if(text_len>0) return buff;
391 return NULL;
392 }
393 }
394 }
395
396
397
PBam_get_next_zchunk(FILE * bam_fp,char * buffer,int buffer_length,unsigned int * real_len)398 int PBam_get_next_zchunk(FILE * bam_fp, char * buffer, int buffer_length, unsigned int * real_len)
399 {
400 unsigned char ID1, ID2, CM, FLG;
401 unsigned short XLEN;
402 int BSIZE=-1, rlen, is_file_broken = 0, rrtv;
403
404 if(feof(bam_fp)) return -1;
405
406 rrtv = fread(&ID1, 1, 1, bam_fp);
407 if(rrtv <1) return -1;
408 rrtv = fread(&ID2, 1, 1, bam_fp);
409 if(rrtv <1) return -1;
410 rrtv = fread(&CM, 1, 1, bam_fp);
411 if(rrtv <1) return -1;
412 rrtv = fread(&FLG, 1, 1, bam_fp);
413 if(rrtv <1) return -1;
414 if(feof(bam_fp)) return -1;
415
416 if(ID1!=31 || ID2!=139 || CM!=8 || FLG!=4)
417 {
418 //SUBREADprintf("4CHR = %d, %d, %d, %d\n", ID1, ID2, CM, FLG);
419 return -1;
420 }
421 fseeko(bam_fp, 6, SEEK_CUR);
422 rrtv = fread(&XLEN,1, 2, bam_fp );
423 if(rrtv < 2) return -1;
424
425 int XLEN_READ = 0;
426 while(1)
427 {
428 unsigned char SI1, SI2;
429 unsigned short SLEN, BSIZE_MID;
430
431 rrtv = fread(&SI1, 1, 1, bam_fp);
432 if(rrtv <1) return -1;
433 rrtv = fread(&SI2, 1, 1, bam_fp);
434 if(rrtv <1) return -1;
435 rlen = fread(&SLEN, 2, 1, bam_fp);
436 if(rlen < 1) is_file_broken = 1;
437
438 if(SI1==66 && SI2== 67 && SLEN == 2)
439 {
440 rrtv = fread(&BSIZE_MID, 1,2 , bam_fp);
441 if(rrtv <2) return -1;
442 BSIZE = BSIZE_MID;
443 }
444 else fseeko(bam_fp, SLEN, SEEK_CUR);
445 XLEN_READ += SLEN + 4;
446 if(XLEN_READ>=XLEN) break;
447 }
448
449 if(BSIZE>19)
450 {
451 int CDATA_LEN = BSIZE - XLEN - 19;
452 int CDATA_READING = min(CDATA_LEN, buffer_length);
453 rrtv = fread(buffer, 1, CDATA_READING, bam_fp);
454 if(rrtv < CDATA_READING) return -1;
455 if(CDATA_READING<CDATA_LEN){
456 SUBREADputs("ERROR: buffer insufficient");
457 return -1;
458 }
459 fseeko(bam_fp, 4, SEEK_CUR);
460 rlen = fread(&real_len, 4, 1, bam_fp);//Input SIZE (length of uncompressed data);uint32_t
461 if(rlen < 1) is_file_broken = 1;
462
463 // SUBREADprintf("read_data=%u\n", CDATA_LEN);
464 if(is_file_broken){
465 SUBREADputs("ERROR: the input BAM file is broken.");
466 }
467 return is_file_broken?-2:CDATA_READING;
468 }
469 else
470 return -1;
471 }
472
473
474 // returns 0 if the header finished.
475 // returns 1 if the header is going on.
476 // returns -1 if error.
PBam_chunk_headers(char * chunk,int * chunk_ptr,int chunk_len,SamBam_Reference_Info ** bam_chro_table,int * table_size,int * table_items,int * state,int * header_txt_remainder,int * reminder_byte_len)477 int PBam_chunk_headers(char * chunk, int *chunk_ptr, int chunk_len, SamBam_Reference_Info ** bam_chro_table, int * table_size, int * table_items, int * state, int * header_txt_remainder, int * reminder_byte_len)
478 {
479
480 if((*state) == 0)
481 {
482 unsigned int header_txt_len ;
483 if(0!=memcmp("BAM\x1",chunk + (*chunk_ptr),4))
484 return -1;
485 (*chunk_ptr)+=4; // MAGIC
486 (*state) = 1;
487
488 memcpy(&header_txt_len, chunk + (*chunk_ptr),4);
489 (*chunk_ptr)+=4;
490 if(header_txt_len + 8 < chunk_len)
491 {
492 (* state) = 2;
493 (*chunk_ptr) += header_txt_len;
494 }
495 else
496 {
497 (* state) = 1;
498 (* header_txt_remainder) = header_txt_len - (chunk_len - 8);
499 return 1;
500 }
501 }
502
503 if((*state) == 1)
504 {
505 if((*header_txt_remainder)<chunk_len)
506 {
507 (*state) = 2;
508 (*chunk_ptr) += (*header_txt_remainder);
509 }
510 else if((*header_txt_remainder)==chunk_len)
511 {
512 (*state) = 2;
513 return 1;
514 }
515 else
516 {
517 (* header_txt_remainder) -= (chunk_len);
518 return 1;
519 }
520 }
521
522 if((*state) == 2 || (*state == 3))
523 {
524 int chrs, remainder_chrs;
525 if((*state)==2)
526 {
527 memcpy(&chrs, chunk + (*chunk_ptr),4);
528 (*chunk_ptr)+=4;
529
530 remainder_chrs = chrs;
531 }
532 else remainder_chrs = (*header_txt_remainder);
533
534 while((*chunk_ptr) < chunk_len && remainder_chrs>0)
535 {
536 int chro_name_len;
537 unsigned int chro_len;
538 (*reminder_byte_len) = chunk_len - (*chunk_ptr);
539
540 if( (*chunk_ptr) < chunk_len-4)
541 {
542 memcpy(&chro_name_len, chunk + (*chunk_ptr),4);
543 (*chunk_ptr)+=4;
544 if( (*chunk_ptr) <= chunk_len-chro_name_len-4)
545 {
546 char * chro_name = chunk + (*chunk_ptr);
547 (*chunk_ptr)+=chro_name_len;
548 memcpy(&chro_len, chunk + (*chunk_ptr),4);
549 (*chunk_ptr)+=4;
550
551 (*reminder_byte_len) =0;
552
553 //todo: insert item
554 if(0==(* table_items))
555 {
556 (*table_size) = 50;
557 (*bam_chro_table) = malloc(sizeof(SamBam_Reference_Info)*50);
558 }
559 else if((*table_size) <= (* table_items))
560 {
561 (*table_size) *= 2;
562 (*bam_chro_table) = realloc((*bam_chro_table),sizeof(SamBam_Reference_Info)*(*table_size));
563 }
564
565 SamBam_Reference_Info * new_event = (*bam_chro_table) + (* table_items);
566 if(strlen(chro_name)>=BAM_MAX_CHROMOSOME_NAME_LEN) chro_name[BAM_MAX_CHROMOSOME_NAME_LEN-1]=0;
567 strcpy(new_event->chro_name, chro_name);
568 new_event -> chro_length = chro_len;
569
570 (* table_items)++;
571 //SUBREADprintf("CHRO %d/%d added\n", (* table_items),(remainder_chrs));
572 remainder_chrs --;
573 }
574 else break;
575 }
576 else break;
577
578 }
579
580 if(remainder_chrs)
581 {
582 (*state) = 3;
583 (*header_txt_remainder) = remainder_chrs;
584 return 1;
585 }
586 else{
587 (*state) = 4;
588 return 0;
589 }
590 }
591 return -1;
592 }
593
convert_BAM_binary_to_SAM(SamBam_Reference_Info * chro_table,char * bam_bin,char * sam_txt)594 int convert_BAM_binary_to_SAM( SamBam_Reference_Info * chro_table, char * bam_bin, char * sam_txt ){
595 int bin_len = 0;
596 memcpy(&bin_len, bam_bin, 4);
597 bin_len += 4;
598
599 int sam_ptr = 0, tmpint = 0;
600 sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", bam_bin+36);
601
602 memcpy(&tmpint, bam_bin + 16 ,4);
603 sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", (tmpint >> 16) & 0xffff);
604 int cigar_opts = tmpint & 0xffff;
605
606 memcpy(&tmpint, bam_bin + 4 ,4);
607 int r1chro = tmpint;
608 sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", tmpint<0?"*":chro_table[tmpint].chro_name);
609 memcpy(&tmpint, bam_bin + 8 ,4);
610 sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", tmpint+1);
611 memcpy(&tmpint, bam_bin + 12 ,4);
612 sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", (tmpint >> 8) & 0xff);
613 int name_len = tmpint & 0xff;
614 int cigar_i;
615 for(cigar_i = 0; cigar_i < cigar_opts; cigar_i ++){
616 unsigned int cigarint=0;
617 memcpy(&cigarint, bam_bin + name_len + 36 + cigar_i * 4,4);
618 sam_ptr += sprintf(sam_txt + sam_ptr, "%u%c", cigarint >> 4, "MIDNSHP=X"[cigarint&0xf]);
619 }
620 sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", cigar_i<1?"*":"");
621
622 memcpy(&tmpint, bam_bin + 24, 4);
623 //SUBREADprintf("CHRO_IDX=%d\n", tmpint);
624 sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", tmpint<0?"*":((tmpint == r1chro)?"=":chro_table[tmpint].chro_name));
625
626 memcpy(&tmpint, bam_bin + 28, 4);
627 sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", tmpint+1);
628
629 memcpy(&tmpint, bam_bin + 32, 4);
630 sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", tmpint);
631
632 int seq_len;
633 memcpy(&seq_len, bam_bin + 20,4);
634 int seqi, flex_ptr=name_len + 36 + cigar_opts * 4;
635 for(seqi=0; seqi<seq_len; seqi++){
636 sam_txt[sam_ptr++]="=ACMGRSVTWYHKDBN"[ (bam_bin[flex_ptr] >> ( 4*!(seqi %2) )) & 15 ];
637 if(seqi %2) flex_ptr++;
638 }
639 sam_txt[sam_ptr++]='\t';
640 if(seqi %2) flex_ptr++;
641 for(seqi=0; seqi<seq_len; seqi++){
642 unsigned char nch = (unsigned char) bam_bin[flex_ptr++];
643 if(nch!=0xff||seqi == 0)
644 sam_txt[sam_ptr++]=nch==0xff?'*':(nch+33);
645 }
646 sam_txt[sam_ptr++]='\t';
647
648 while(flex_ptr < bin_len){
649 sam_txt[sam_ptr++]=bam_bin[flex_ptr++];
650 sam_txt[sam_ptr++]=bam_bin[flex_ptr++];
651 sam_txt[sam_ptr++]=':';
652 char tagtype = bam_bin[flex_ptr++];
653
654 if(tagtype == 'B'){
655 char elemtype = bam_bin[flex_ptr++];
656 int elem_no=0, is_int_type = 0, type_bytes = 0, is_signed = 0;
657 memcpy(&elem_no, bam_bin + flex_ptr, 4);
658 flex_ptr += 4;
659 sam_txt[sam_ptr++]='B';
660 sam_txt[sam_ptr++]=':';
661 sam_txt[sam_ptr++]=elemtype;
662 sam_txt[sam_ptr++]=',';
663
664 if(elemtype == 'i' || elemtype == 'I'){
665 is_int_type = 1;
666 type_bytes = 4;
667 is_signed = elemtype == 'i' ;
668 }else if(elemtype == 's' || elemtype == 'S'){
669 is_int_type = 1;
670 type_bytes = 2;
671 is_signed = elemtype == 's' ;
672 }else if(elemtype == 'c' || elemtype == 'C'){
673 is_int_type = 1;
674 type_bytes = 1;
675 is_signed = elemtype == 's' ;
676 }else if(elemtype == 'f'){
677 type_bytes = 4;
678 }
679
680 int elemi;
681 for(elemi =0; elemi < elem_no; elemi++){
682 if(is_int_type){
683 int tagval = 0;
684 memcpy(&tagval, bam_bin + flex_ptr, type_bytes);
685 long long printv = is_signed?tagval:( (unsigned int) tagval );
686 if(elemtype == 'i') sam_ptr += sprintf(sam_txt + sam_ptr, "%d,", (int)printv);
687 if(elemtype == 'I') sam_ptr += sprintf(sam_txt + sam_ptr, "%u,", (unsigned int)printv);
688
689 if(elemtype == 's') sam_ptr += sprintf(sam_txt + sam_ptr, "%d,", (short)printv);
690 if(elemtype == 'S') sam_ptr += sprintf(sam_txt + sam_ptr, "%u,", (unsigned short)printv);
691
692 if(elemtype == 'c') sam_ptr += sprintf(sam_txt + sam_ptr, "%d,", (char)printv);
693 if(elemtype == 'C') sam_ptr += sprintf(sam_txt + sam_ptr, "%u,", (unsigned char)printv);
694 }else{
695 float tagval = 0;
696 memcpy(&tagval, bam_bin + flex_ptr, type_bytes);
697 sam_ptr += sprintf(sam_txt + sam_ptr, "%f,", tagval);
698 }
699 flex_ptr += type_bytes;
700 }
701
702 sam_txt[sam_ptr-1] = '\t';
703 sam_txt[sam_ptr] = 0;
704 }else{
705 int is_int_type = 0, is_float_type = 0, type_bytes = 0, is_string_type = 0, is_char_type = 0, is_signed = 0;
706 if(tagtype == 'i' || tagtype == 'I'){
707 is_int_type = 1;
708 type_bytes = 4;
709 is_signed = tagtype == 'i' ;
710 }else if(tagtype == 's' || tagtype == 'S'){
711 is_int_type = 1;
712 type_bytes = 2;
713 is_signed = tagtype == 's' ;
714 }else if(tagtype == 'c' || tagtype == 'C'){
715 is_int_type = 1;
716 type_bytes = 1;
717 is_signed = tagtype == 's' ;
718 }else if(tagtype == 'f'){
719 is_float_type = 1;
720 type_bytes = 4;
721 }else if(tagtype == 'Z' || tagtype == 'H'){
722 is_string_type = 1;
723 while(bam_bin[flex_ptr+(type_bytes ++)]);
724 }else if(tagtype == 'A'){
725 is_char_type = 1;
726 type_bytes = 1;
727 }
728
729
730 sam_txt[sam_ptr++]=is_int_type?'i':tagtype;
731 sam_txt[sam_ptr++]=':';
732
733 if(is_int_type){
734 int tagval = 0;
735 memcpy(&tagval, bam_bin + flex_ptr, type_bytes);
736 long long printv = is_signed?tagval:( (unsigned int) tagval );
737 #ifdef __MINGW32__
738 sam_ptr += sprintf(sam_txt + sam_ptr, "%I64d\t", printv);
739 #else
740 sam_ptr += sprintf(sam_txt + sam_ptr, "%lld\t", printv);
741 #endif
742 }else if(is_string_type){
743 // type_bytes includes \0
744 memcpy(sam_txt + sam_ptr, bam_bin + flex_ptr, type_bytes -1);
745 sam_txt[ sam_ptr + type_bytes -1 ] = '\t';
746
747 //sam_txt[ sam_ptr + type_bytes +1]=0;
748 //SUBREADprintf("STR_LEN=%d\tSTR=%s\n", type_bytes-1, sam_txt + sam_ptr);
749 sam_ptr += type_bytes;
750 }else if(is_float_type){
751 float tagval = 0;
752 memcpy(&tagval, bam_bin + flex_ptr, type_bytes);
753 sam_ptr += sprintf(sam_txt + sam_ptr, "%f\t", tagval);
754 }else if(is_char_type){
755 sam_txt[ sam_ptr++ ] = bam_bin[flex_ptr];
756 sam_txt[ sam_ptr++ ] = '\t';
757 }
758 flex_ptr += type_bytes;
759 }
760 }
761
762 sam_txt[sam_ptr-1]=0; //last '\t'
763 return sam_ptr-1;
764 }
765
PBam_chunk_gets(char * chunk,int * chunk_ptr,int chunk_limit,SamBam_Reference_Info * bam_chro_table,char * buff,int buff_len,SamBam_Alignment * aln,int seq_needed)766 int PBam_chunk_gets(char * chunk, int *chunk_ptr, int chunk_limit, SamBam_Reference_Info * bam_chro_table, char * buff , int buff_len, SamBam_Alignment*aln, int seq_needed)
767 {
768 int xk1;
769 // decrypt the BAM mess.
770 unsigned int block_size;
771 if((*chunk_ptr) +4> chunk_limit) return -1;
772
773 memcpy(&block_size, chunk+(*chunk_ptr), 4);
774 //SUBREADprintf("PBSIZE=%u\n", block_size);
775 (*chunk_ptr)+=4;
776 unsigned int next_start = block_size+(*chunk_ptr);
777
778 int ref_id;
779 memcpy(&ref_id, chunk+(*chunk_ptr), 4);
780 (*chunk_ptr)+=4;
781
782 if(ref_id == -1) aln -> chro_name = NULL;
783 else aln -> chro_name = bam_chro_table[ref_id].chro_name;
784
785 memcpy(&(aln -> chro_offset), chunk+(*chunk_ptr), 4);
786 (*chunk_ptr)+=4;
787
788 unsigned int comb1;
789 memcpy(&comb1, chunk+(*chunk_ptr), 4);
790 (*chunk_ptr)+=4;
791
792 aln -> mapping_quality = 0xff & (comb1 >> 8);
793
794 unsigned int comb2;
795 memcpy(&comb2, chunk+(*chunk_ptr), 4);
796 (*chunk_ptr)+=4;
797
798 aln -> flags = 0xffff&(comb2 >> 16);
799
800 unsigned int read_len;
801 memcpy(&read_len, chunk+(*chunk_ptr), 4);
802
803 (*chunk_ptr)+=4;
804
805 unsigned int mate_ref_id;
806 memcpy(&mate_ref_id, chunk+(*chunk_ptr), 4);
807 (*chunk_ptr)+=4;
808
809 if(mate_ref_id == -1) aln -> mate_chro_name = NULL;
810 else aln -> mate_chro_name = bam_chro_table[mate_ref_id].chro_name;
811
812 memcpy(&(aln -> mate_chro_offset), chunk+(*chunk_ptr), 4);
813 (*chunk_ptr)+=4;
814
815 memcpy(&(aln -> templete_length), chunk+(*chunk_ptr), 4);
816 (*chunk_ptr)+=4;
817
818 int read_name_len = comb1 & 0xff;
819 assert(read_name_len < BAM_MAX_READ_NAME_LEN);
820
821 memcpy(aln -> read_name, chunk+(*chunk_ptr), read_name_len);
822 aln -> read_name[read_name_len] = 0;
823 (*chunk_ptr)+=read_name_len;
824
825 int cigar_ops = comb2 & 0xffff;
826 aln -> cigar[0]=0;
827 for(xk1=0; xk1<cigar_ops;xk1++)
828 {
829 char cigar_piece_buf[BAM_MAX_CIGAR_LEN];
830 unsigned int cigar_piece;
831
832 if((*chunk_ptr) +4 > chunk_limit) return -1;
833 memcpy(&cigar_piece, chunk+(*chunk_ptr),4);
834 (*chunk_ptr)+=4;
835
836 sprintf(cigar_piece_buf, "%u%c", cigar_piece>>4, cigar_op_char(cigar_piece&0xf));
837 if(strlen(cigar_piece_buf)+strlen(aln->cigar)<BAM_MAX_CIGAR_LEN-1)
838 strcat(aln->cigar, cigar_piece_buf);
839 else
840 {
841 SUBREADprintf("WARNING: cigar string is too long to the buffer.\n");
842 SUBREADprintf("Please only use the compressed BAM format.\n");
843 assert(0);
844 return -1;
845 }
846 }
847
848 char read_2_seq = 0;
849 int seq_qual_bytes = read_len + (read_len /2)+(read_len%2);
850
851 if(seq_needed)
852 memcpy( aln-> buff_for_seq, chunk+(*chunk_ptr), seq_qual_bytes);
853 (*chunk_ptr) += seq_qual_bytes;
854
855 char extra_tags [CORE_ADDITIONAL_INFO_LENGTH];
856 extra_tags[0]=0;
857 int extra_len = 0;
858 while( (*chunk_ptr) < next_start)
859 {
860 char extag[2];
861 char extype;
862 int delta, need_tag = 1;
863 memcpy(extag, chunk+(*chunk_ptr), 2);
864 extype = chunk[2+(*chunk_ptr)];
865 (*chunk_ptr)+=3;
866 //fprintf(stderr, "COL_EXTYPE: %c\n", extype);
867 if(extype == 'Z' || extype == 'H')
868 {
869 delta = 0;
870 // 'Z' columns are NULL-terminated.
871 while(chunk[delta + (*chunk_ptr)]) delta++;
872 delta += 1;
873 }
874 else if(extype == 'A' || extype == 'c' || extype=='C') delta=1;
875 else if(extype == 'i' || extype=='I' || extype == 'f') delta=4;
876 else if(extype == 's' || extype=='S') delta=2;
877 else if(extype == 'B')
878 {
879 extype = chunk[(*chunk_ptr)];
880 // fprintf(stderr, "B_EXTYPE: %c\n", extype);
881
882 (*chunk_ptr)++;
883 if(extype == 'A' || extype=='Z') delta=1;
884 else if(extype == 'c' || extype=='C') delta=1;
885 else if(extype == 'i' || extype=='I' || extype == 'f') delta=4;
886 else if(extype == 's' || extype=='S') delta=2;
887 else break;
888
889 int array_len;
890 need_tag = 0;
891 memcpy(&array_len, chunk+(*chunk_ptr), 4);
892 (*chunk_ptr)+=4;
893 delta *= array_len;
894 }
895 else{
896 // fprintf(stderr, "NO_EXTYPE: %c\n", extype);
897 break;
898 }
899
900 if(need_tag){
901 if(extype == 'c' || extype=='C' || extype == 'i' || extype=='I' || extype == 's' || extype=='S'){
902 int tmpi = 0;
903 memcpy(&tmpi, chunk+(*chunk_ptr),delta);
904 if(tmpi >= 0 && extra_len < CORE_ADDITIONAL_INFO_LENGTH - 18){
905 int sret = sprintf(extra_tags + strlen(extra_tags), "\t%c%c:i:%d", extag[0], extag[1], tmpi);
906 extra_len += sret;
907 }
908 }else if(extype == 'Z'){
909 if(extra_len < CORE_ADDITIONAL_INFO_LENGTH - 7 - delta){
910 sprintf(extra_tags + strlen(extra_tags), "\t%c%c:Z:", extag[0], extag[1]);
911 extra_len += 6;
912 *(extra_tags + strlen(extra_tags)+delta-1) = 0;
913 memcpy(extra_tags + strlen(extra_tags), chunk + (*chunk_ptr), delta - 1);
914 extra_len += delta - 1;
915 }
916 }else if(extype == 'A'){
917 if(extra_len < CORE_ADDITIONAL_INFO_LENGTH - 8){
918 int sret = sprintf(extra_tags + strlen(extra_tags), "\t%c%c:A:%c", extag[0], extag[1], *(chunk + *chunk_ptr) );
919 extra_len += sret;
920 }
921 }
922 }
923
924 if((*chunk_ptr) + delta > chunk_limit) return -1;
925 (*chunk_ptr)+=delta;
926
927 }
928
929 if(next_start > chunk_limit) return -1;
930 (*chunk_ptr) = next_start;
931
932 if(seq_needed)
933 {
934 for(xk1=0;xk1<read_len;xk1++)
935 {
936 if(xk1 %2 == 0){
937 read_2_seq = aln-> buff_for_seq[xk1/2];
938 }
939 if(xk1 < BAM_MAX_READ_LEN)
940 aln -> sequence[xk1] = read_int_char(0xf&(read_2_seq >> (xk1%2?0:4)));
941 }
942 aln -> sequence[min(BAM_MAX_READ_LEN-1,read_len)] = 0;
943 if(read_len >= BAM_MAX_READ_LEN-1)
944 SUBREADprintf("WARNING: read is too long to the buffer\n");
945
946
947 for(xk1=0;xk1<read_len;xk1++)
948 {
949 read_2_seq = aln -> buff_for_seq[(read_len /2)+(read_len%2) + xk1] ;
950 if(xk1 < BAM_MAX_READ_LEN)
951 aln -> seq_quality[xk1] = 33+read_2_seq;
952 }
953 aln -> seq_quality[min(BAM_MAX_READ_LEN-1,read_len)] = 0;
954 if(aln -> seq_quality[0]==' ')
955 strcpy(aln -> seq_quality, "*");
956 }
957 else
958 {
959 aln -> sequence[0]='N';
960 aln -> sequence[1]=0;
961 aln -> seq_quality[0]='#';
962 aln -> seq_quality[1]=0;
963 }
964
965 char * chro_name = "*";
966 char * cigar = "*";
967 unsigned int chro_offset = 0;
968
969 if(aln -> chro_name){
970 chro_name = aln -> chro_name;
971 chro_offset = aln -> chro_offset+1;
972 if(aln -> cigar[0])
973 cigar = aln -> cigar;
974 }
975
976 char * mate_chro_name = "*";
977 unsigned int mate_chro_offset = 0;
978 if(aln -> mate_chro_name)
979 {
980 if(aln -> mate_chro_name == chro_name) mate_chro_name = "=";
981 else
982 mate_chro_name = aln -> mate_chro_name;
983 mate_chro_offset = aln -> mate_chro_offset+1;
984 }
985
986 long long int templete_length = aln -> templete_length;
987
988
989 //fprintf(stderr, "HN_TAG=%d\n", nh_val );
990
991 #ifdef __MINGW32__
992 int plen = snprintf(buff, buff_len-1, "%s\t%u\t%s\t%u\t%d\t%s\t%s\t%u\t%I64d\t%s\t%s%s\n%c", aln -> read_name, aln -> flags , chro_name, chro_offset, aln -> mapping_quality, cigar, mate_chro_name, mate_chro_offset, templete_length, aln -> sequence , aln -> seq_quality, extra_tags, 0);
993 #else
994 int plen = snprintf(buff, buff_len-1, "%s\t%u\t%s\t%u\t%d\t%s\t%s\t%u\t%lld\t%s\t%s%s\n%c", aln -> read_name, aln -> flags , chro_name, chro_offset, aln -> mapping_quality, cigar, mate_chro_name, mate_chro_offset, templete_length, aln -> sequence , aln -> seq_quality, extra_tags, 0);
995 #endif
996 //fprintf(stderr,"%s", buff);
997
998 return plen;
999 }
1000
1001
PBum_load_header(FILE * bam_fp,SamBam_Reference_Info ** chro_tab,char * remainder_reads_data,int * remainder_reads_data_len)1002 int PBum_load_header(FILE * bam_fp, SamBam_Reference_Info** chro_tab, char * remainder_reads_data , int * remainder_reads_data_len)
1003 {
1004 char * CDATA = malloc(80010);
1005 char * PDATA = malloc(1000000);
1006
1007 int chro_tab_size = 0, chro_tab_items = 0, chro_tab_state = 0, header_remainder = 0, remainder_byte_len = 0, bam_is_broken = 0;
1008 z_stream strm;
1009 while(1)
1010 {
1011 unsigned int real_len = 0;
1012 int rlen = PBam_get_next_zchunk(bam_fp,CDATA,80000, & real_len);
1013 if(rlen<0){
1014 bam_is_broken = (rlen == -2);
1015 if(bam_is_broken){
1016 SUBREADprintf("BAM file format error.\n");
1017 free(CDATA);
1018 free(PDATA);
1019 return -1;
1020 }
1021 break;
1022 }
1023
1024 strm.zalloc = Z_NULL;
1025 strm.zfree = Z_NULL;
1026 strm.opaque = Z_NULL;
1027 strm.avail_in = 0;
1028 strm.next_in = Z_NULL;
1029 int ret = inflateInit2(&strm, SAMBAM_GZIP_WINDOW_BITS);
1030 if (ret != Z_OK)
1031 {
1032 free(CDATA);
1033 free(PDATA);
1034 return -1;
1035 }
1036 strm.avail_in = (unsigned int)rlen;
1037 strm.next_in = (unsigned char *)CDATA;
1038
1039
1040 strm.avail_out = 1000000 - remainder_byte_len;
1041 strm.next_out = (unsigned char *)(PDATA + remainder_byte_len);
1042 ret = inflate(&strm, Z_FINISH);
1043 int have = 1000000 - strm.avail_out;
1044 int PDATA_ptr=0;
1045
1046 inflateEnd(&strm);
1047
1048 ret = PBam_chunk_headers(PDATA, &PDATA_ptr, have, chro_tab, &chro_tab_size, &chro_tab_items, &chro_tab_state, &header_remainder,&remainder_byte_len);
1049 memcpy(PDATA , PDATA + have - remainder_byte_len, remainder_byte_len);
1050 if(ret<0)
1051 {
1052 SUBREADprintf("Header error.\n");
1053 free(CDATA);
1054 free(PDATA);
1055 return -1;
1056 }
1057 else if(ret == 0)
1058 {
1059 //SUBREADprintf("Header loaded = %d\n", (chro_tab_items));
1060 remainder_byte_len=0;
1061 }
1062 if(chro_tab_state>3){
1063 if(remainder_reads_data && PDATA_ptr < have)
1064 {
1065 memcpy(remainder_reads_data , PDATA + PDATA_ptr, have - PDATA_ptr);
1066 (*remainder_reads_data_len) = have - PDATA_ptr ;
1067 }
1068 break;
1069 }
1070 }
1071 free(CDATA);
1072 free(PDATA);
1073 return 0;
1074 }
1075
test_bamview(int argc,char ** argv)1076 int test_bamview(int argc, char ** argv)
1077 {
1078 if(argc>1)
1079 {
1080 SamBam_FILE * fp = SamBam_fopen(argv[1], SAMBAM_FILE_BAM);
1081 assert(fp);
1082 /*
1083 while(1)
1084 {
1085 char buf[3000];
1086 char * buf2 = SamBam_fgets(fp,buf, 3000);
1087 //SUBREADprintf(">>%s<<\n",buf);
1088 //if(buf2)
1089 // fwrite(buf,strlen(buf), 1, stdout);
1090 //else break;
1091 }
1092 */
1093 SamBam_fclose(fp);
1094 }
1095 return 0;
1096 }
1097
SamBam_writer_create(SamBam_Writer * writer,char * BAM_fname,int threads,int sort_reads_by_coord,int is_temp_BAM_file,char * tmpfname)1098 int SamBam_writer_create(SamBam_Writer * writer, char * BAM_fname, int threads, int sort_reads_by_coord, int is_temp_BAM_file, char * tmpfname)
1099 {
1100 memset(writer, 0, sizeof(SamBam_Writer));
1101
1102 if(BAM_fname)
1103 {
1104 writer -> bam_fp = f_subr_open(BAM_fname, "wb");
1105 if(sort_reads_by_coord){
1106 char tname[MAX_FILE_NAME_LENGTH];
1107 sprintf(tname, "%s.bai", BAM_fname);
1108 writer -> BAI_fp = f_subr_open(tname, "wb");
1109 worker_master_mutex_init(&writer->sorted_notifier, threads);
1110 }
1111 if(!writer -> bam_fp) return -1;
1112 }
1113 #ifdef MAKE_STANDALONE
1114 else
1115 writer -> bam_fp = stdout;
1116 #endif
1117
1118 writer -> threads = threads;
1119 writer -> sort_reads_by_coord = sort_reads_by_coord;
1120 writer -> fastest_compression = is_temp_BAM_file;
1121 writer -> compressed_chunk_buffer = malloc(70000);
1122 writer -> chunk_buffer = malloc(70000);
1123 writer -> chunk_buffer_max_size = 70000;
1124 strcpy(writer -> tmpf_prefix, tmpfname);
1125
1126 if(threads>= 2){
1127 int x1;
1128 writer -> threads_chunk_buffer = malloc(sizeof(char *) * threads) ;
1129 writer -> threads_chunk_buffer_compressed = malloc(sizeof(char *) * threads) ;
1130 writer -> threads_chunk_buffer_used = malloc(sizeof(long long) * threads);
1131 writer -> threads_output_stream = malloc(sizeof(z_stream) * threads);
1132 writer -> threads_chunk_buffer_max_size = malloc(sizeof(long long) * threads);
1133 memset(writer -> threads_chunk_buffer_used, 0, sizeof(long long) * threads);
1134 for(x1 = 0; x1 < threads ; x1++){
1135 writer -> threads_chunk_buffer [x1] = malloc(70000);
1136 writer -> threads_chunk_buffer_compressed [x1] = malloc(70000);
1137 writer -> threads_chunk_buffer_max_size[x1] = 70000;
1138 }
1139 subread_init_lock(&writer -> thread_bam_lock);
1140 }
1141 writer -> chromosome_name_table = HashTableCreate(1603);
1142 writer -> chromosome_id_table = HashTableCreate(1603);
1143 writer -> chromosome_len_table = HashTableCreate(1603);
1144 writer -> header_plain_text_buffer = malloc(100000000);
1145 writer -> header_plain_text_buffer_max = 100000000;
1146 writer -> header_plain_text_buffer_used = 0;
1147
1148 //memset(writer -> header_plain_text_buffer , 0 , 100000000);
1149 HashTableSetHashFunction(writer -> chromosome_name_table , fc_chro_hash);
1150 HashTableSetKeyComparisonFunction(writer -> chromosome_name_table , fc_strcmp_chro);
1151 HashTableSetDeallocationFunctions(writer -> chromosome_name_table , free, NULL);
1152
1153 return 0;
1154 }
1155
SamBam_writer_chunk_header(SamBam_Writer * writer,int compressed_size)1156 void SamBam_writer_chunk_header(SamBam_Writer * writer, int compressed_size)
1157 {
1158
1159 // the four magic characters
1160 fputc(31, writer -> bam_fp);
1161 fputc(139, writer -> bam_fp);
1162 fputc(8, writer -> bam_fp);
1163 fputc(4, writer -> bam_fp);
1164
1165 time_t time_now = 0;
1166 fwrite(&time_now,4,1, writer -> bam_fp);
1167
1168 int tmp_i;
1169 // Extra flags and OS
1170 fputc(0, writer -> bam_fp);
1171 fputc(0xff, writer -> bam_fp);
1172
1173 // Extra length
1174 tmp_i = 6;
1175 fwrite(&tmp_i,2,1, writer -> bam_fp);
1176
1177
1178 // SI1 and SI2 magic numbers, and SLEN
1179 fputc(66, writer -> bam_fp);
1180 fputc(67, writer -> bam_fp);
1181 tmp_i = 2;
1182 fwrite(&tmp_i,2,1, writer -> bam_fp);
1183 tmp_i = compressed_size + 19 + 6;
1184 fwrite(&tmp_i,2,1, writer -> bam_fp);
1185 }
1186
SamBam_CRC32(char * dat,int len)1187 unsigned int SamBam_CRC32(char * dat, int len)
1188 {
1189 unsigned int crc0 = crc32(0, NULL, 0);
1190 unsigned int ret = crc32(crc0, (unsigned char *)dat, len);
1191 return ret;
1192 }
1193
SamBam_writer_add_chunk(SamBam_Writer * writer,int thread_no)1194 void SamBam_writer_add_chunk(SamBam_Writer * writer, int thread_no)
1195 {
1196 int compressed_size ;
1197 unsigned int CRC32;
1198 z_stream * this_stream = thread_no < 0 ? &writer ->output_stream:writer -> threads_output_stream + thread_no;
1199 long long * this_buffer_used = thread_no < 0 ? &writer ->chunk_buffer_used : writer -> threads_chunk_buffer_used + thread_no;
1200 char * this_buffer = thread_no < 0 ? writer ->chunk_buffer: writer -> threads_chunk_buffer[thread_no];
1201 char * this_compressed_chunk_buffer = thread_no < 0 ? writer ->compressed_chunk_buffer : writer -> threads_chunk_buffer_compressed[thread_no];
1202
1203 if(*this_buffer_used < 1){
1204 // magic block with 0 byte data(EOF)
1205 subread_lock_occupy(&writer -> thread_bam_lock);
1206 fwrite( "\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00",1,28,writer -> bam_fp);
1207 writer -> current_BAM_pos = ftello(writer -> bam_fp);
1208 subread_lock_release(&writer -> thread_bam_lock);
1209 return;
1210 }
1211
1212 this_stream -> avail_out = 70000;
1213 this_stream -> avail_in = (* this_buffer_used);
1214 CRC32 = SamBam_CRC32(this_buffer , * this_buffer_used);
1215
1216 this_stream -> zalloc = Z_NULL;
1217 this_stream -> zfree = Z_NULL;
1218 this_stream -> opaque = Z_NULL;
1219
1220 deflateInit2(this_stream, writer -> fastest_compression? SAMBAM_COMPRESS_LEVEL_FASTEST : SAMBAM_COMPRESS_LEVEL_NORMAL, Z_DEFLATED,
1221 SAMBAM_GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY);
1222
1223 this_stream -> next_in = (unsigned char *)this_buffer;
1224 this_stream -> next_out = (unsigned char *)this_compressed_chunk_buffer;
1225
1226 deflate(this_stream, Z_FINISH);
1227 deflateEnd(this_stream);
1228
1229 compressed_size = 70000 - this_stream -> avail_out;
1230
1231 subread_lock_occupy(&writer -> thread_bam_lock);
1232 SamBam_writer_chunk_header(writer, compressed_size);
1233 int chunk_write_size = fwrite(this_compressed_chunk_buffer, 1, compressed_size, writer -> bam_fp);
1234 fwrite(&CRC32 , 4, 1, writer -> bam_fp);
1235 fwrite(this_buffer_used , 4, 1, writer -> bam_fp);
1236 writer -> current_BAM_pos = ftello(writer -> bam_fp);
1237 subread_lock_release(&writer -> thread_bam_lock);
1238
1239 if(chunk_write_size < compressed_size){
1240 if(!writer -> is_internal_error)SUBREADputs("ERROR: no space left in the output directory.");
1241 writer -> is_internal_error = 1;
1242 }
1243
1244 (* this_buffer_used) = 0;
1245 }
1246
1247 double sambam_t1 = 0;
1248
SamBam_writer_write_header(SamBam_Writer * writer)1249 void SamBam_writer_write_header(SamBam_Writer * writer)
1250 {
1251 int header_ptr=0, header_block_start = 0;
1252 while(header_ptr < writer->header_plain_text_buffer_used)
1253 {
1254 if(( header_ptr - header_block_start > 55000 || header_ptr >= writer->header_plain_text_buffer_used-1) && writer -> header_plain_text_buffer[header_ptr] == '\n')
1255 {
1256 writer -> chunk_buffer_used = 0;
1257 if(header_block_start == 0) // the very first block
1258 {
1259 memcpy(writer -> chunk_buffer, "BAM\1",4);
1260 writer -> chunk_buffer_used = 4;
1261 memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used, &writer -> header_plain_text_buffer_used, 4);
1262 writer -> chunk_buffer_used += 4;
1263
1264 }
1265
1266 memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used , writer -> header_plain_text_buffer + header_block_start, header_ptr - header_block_start+1);
1267 writer -> chunk_buffer_used += header_ptr - header_block_start + 1;
1268 SamBam_writer_add_chunk(writer, -1);
1269 header_block_start = header_ptr + 1;
1270 }
1271 header_ptr ++;
1272 }
1273
1274 free(writer -> header_plain_text_buffer);
1275 writer -> header_plain_text_buffer = NULL;
1276
1277 // reference sequences
1278 writer -> chunk_buffer_used = 0;
1279 memcpy(writer -> chunk_buffer, & writer -> chromosome_name_table -> numOfElements, 4);
1280 writer -> chunk_buffer_used = 4;
1281
1282 for( header_ptr=0 ; header_ptr < writer -> chromosome_name_table -> numOfElements ; header_ptr ++)
1283 {
1284 //printf("D=%d\n", writer -> chromosome_id_table -> numOfElements);
1285 char * chro_name = HashTableGet(writer -> chromosome_id_table, NULL + 1 + header_ptr);
1286 unsigned int chro_len = HashTableGet(writer -> chromosome_len_table, NULL + 1 + header_ptr) - NULL - 1;
1287 assert(chro_name);
1288 int chro_name_len = strlen(chro_name)+1;
1289
1290 memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used , &chro_name_len, 4);
1291 writer -> chunk_buffer_used += 4;
1292
1293 strcpy(writer -> chunk_buffer + writer -> chunk_buffer_used , chro_name);
1294 writer -> chunk_buffer_used += chro_name_len;
1295
1296 memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used , &chro_len, 4);
1297 writer -> chunk_buffer_used += 4;
1298
1299 if(header_ptr == writer -> chromosome_name_table -> numOfElements - 1 || writer -> chunk_buffer_used > 55000)
1300 {
1301 SamBam_writer_add_chunk(writer, -1);
1302 writer -> chunk_buffer_used = 0;
1303 }
1304 }
1305
1306 }
1307
1308 void SamBam_writer_finalise_one_thread(SamBam_Writer * writer);
1309 void SamBam_writer_sort_bins_to_BAM(SamBam_Writer * writer);
1310
SamBam_writer_close(SamBam_Writer * writer)1311 int SamBam_writer_close(SamBam_Writer * writer)
1312 {
1313 if(writer -> writer_state == 0) // no reads were added
1314 {
1315 if(writer -> header_plain_text_buffer)
1316 SamBam_writer_write_header(writer);
1317 } else SamBam_writer_finalise_one_thread(writer);
1318
1319 if(writer -> sort_reads_by_coord){
1320 SamBam_writer_sort_bins_to_BAM(writer);
1321 worker_master_mutex_destroy(&writer->sorted_notifier);
1322 }
1323
1324 writer -> chunk_buffer_used = 0;
1325 SamBam_writer_add_chunk(writer, -1);
1326 // fputc(0, writer -> bam_fp);
1327
1328 writer -> output_stream.next_in= NULL;
1329 writer -> output_stream.avail_in= 0;
1330 writer -> output_stream.next_out= NULL;
1331 writer -> output_stream.avail_out= 0;
1332
1333 free(writer -> chunk_buffer);
1334 free(writer -> compressed_chunk_buffer);
1335 if(writer -> threads >=2){
1336 int x1;
1337 for(x1 = 0 ; x1 < writer -> threads; x1 ++){
1338 free(writer -> threads_chunk_buffer[x1]);
1339 free(writer -> threads_chunk_buffer_compressed[x1]);
1340 }
1341 free(writer -> threads_output_stream);
1342 free(writer -> threads_chunk_buffer);
1343 free(writer -> threads_chunk_buffer_compressed);
1344 free(writer -> threads_chunk_buffer_used);
1345 }
1346 HashTableDestroy(writer -> chromosome_name_table);
1347 HashTableDestroy(writer -> chromosome_id_table);
1348 HashTableDestroy(writer -> chromosome_len_table);
1349 #ifdef MAKE_STANDALONE
1350 if(stdout != writer -> bam_fp)
1351 #endif
1352 fclose(writer -> bam_fp);
1353 if(writer -> BAI_fp!=NULL) fclose(writer -> BAI_fp);
1354
1355 return 0;
1356 }
1357
SamBam_writer_add_header(SamBam_Writer * writer,char * header_text,int add_chro)1358 int SamBam_writer_add_header(SamBam_Writer * writer, char * header_text, int add_chro)
1359 {
1360 int new_text_len = strlen(header_text);
1361
1362 if(writer -> header_plain_text_buffer_max <= writer -> header_plain_text_buffer_used + new_text_len + 1)
1363 {
1364 //return 0;
1365 writer -> header_plain_text_buffer_max *=2;
1366 writer -> header_plain_text_buffer = realloc(writer -> header_plain_text_buffer , writer -> header_plain_text_buffer_max);
1367 //printf("REAL: %d : %llX\n",writer -> header_plain_text_buffer_max, (long long ) writer -> header_plain_text_buffer);
1368 }
1369
1370 strcpy(writer -> header_plain_text_buffer + writer -> header_plain_text_buffer_used, header_text);
1371 writer -> header_plain_text_buffer_used += new_text_len;
1372 strcpy(writer -> header_plain_text_buffer + writer -> header_plain_text_buffer_used, "\n");
1373 writer -> header_plain_text_buffer_used ++;
1374
1375 // SUBREADprintf("ADJHEADERXCD BIN=%s\n", header_text);
1376 if(add_chro && memcmp(header_text, "@SQ",3)==0)
1377 {
1378 char * chro = NULL;
1379 int chro_len = -1;
1380 char * toktmp = NULL;
1381 char * ret_tmp = strtok_r(header_text, "\t", &toktmp);
1382
1383 while(1){
1384 if(!ret_tmp) break;
1385
1386 if(memcmp(ret_tmp,"SN:", 3)==0) chro = ret_tmp + 3;
1387 else if(memcmp(ret_tmp,"LN:", 3)==0) chro_len = atoi(ret_tmp + 3);
1388
1389 ret_tmp = strtok_r(NULL, "\t", &toktmp);
1390 }
1391
1392 if(chro && (chro_len>0))
1393 SamBam_writer_add_chromosome(writer, chro, chro_len, 0);
1394
1395 }
1396
1397 //if(writer -> header_plain_text_buffer_used %97==0) printf("MV=%d\n",writer -> header_plain_text_buffer_used);
1398
1399 return 0;
1400 }
1401
SamBam_writer_add_chromosome(SamBam_Writer * writer,char * chro_name,unsigned int chro_length,int add_header)1402 int SamBam_writer_add_chromosome(SamBam_Writer * writer, char * chro_name, unsigned int chro_length, int add_header)
1403 {
1404 unsigned int chro_id = writer -> chromosome_name_table -> numOfElements;
1405
1406 //SUBREADprintf("ADJHEADER_CHRO %s of %u\n", chro_name, chro_length);
1407 //assert(strlen(chro_name) < 30);
1408
1409 char * chro_name_space = malloc(strlen(chro_name)+1);
1410 strcpy(chro_name_space , chro_name);
1411 HashTablePut(writer -> chromosome_name_table, chro_name_space, NULL+1+chro_id);
1412 HashTablePut(writer -> chromosome_id_table, NULL+1+chro_id, chro_name_space);
1413 HashTablePut(writer -> chromosome_len_table, NULL+1+chro_id, NULL + 1 + chro_length);
1414
1415 if(add_header)
1416 {
1417 char * line_buf = malloc(1000);
1418 snprintf(line_buf,999, "@SQ\tSN:%s\tLN:%u", chro_name , chro_length);
1419 SamBam_writer_add_header(writer, line_buf, 0);
1420 free(line_buf);
1421 }
1422
1423 return 0;
1424 }
1425
1426
SamBam_compress_cigar(char * cigar,int * cigar_int,int * ret_coverage,int max_secs)1427 int SamBam_compress_cigar(char * cigar, int * cigar_int, int * ret_coverage, int max_secs)
1428 {
1429 int tmp_int=0;
1430 int cigar_cursor = 0, num_opt = 0;
1431 int coverage_len = 0;
1432 (* ret_coverage) = 0;
1433
1434 if(cigar[0]=='*') return 0;
1435
1436 while(1)
1437 {
1438 char nch = cigar[cigar_cursor++];
1439 if(!nch)break;
1440 if(isdigit(nch))
1441 {
1442 tmp_int = tmp_int*10+(nch-'0');
1443 }
1444 else
1445 {
1446 int int_opt=0;
1447 if(nch == 'M' || nch == 'N' || nch == 'D') coverage_len += tmp_int;
1448 //if(nch == 'M' ||nch == 'D' || nch == '=' || nch == 'X') coverage_len += tmp_int;
1449 for(; int_opt<8; int_opt++) if("MIDNSHP=X"[int_opt] == nch)break;
1450 cigar_int[num_opt ++] = (tmp_int << 4) | int_opt;
1451 tmp_int = 0;
1452 //SUBREADprintf("CIGARCOM: %d-th is %c\n", num_opt, nch);
1453 if(num_opt>=max_secs)break;
1454 }
1455 }
1456
1457 (*ret_coverage) = coverage_len;
1458 return num_opt;
1459 }
1460
SamBam_read2bin(char * read_txt,char * read_bin)1461 void SamBam_read2bin(char * read_txt, char * read_bin)
1462 {
1463 int bin_cursor = 0, txt_cursor = 0;
1464
1465 while(1)
1466 {
1467 char nch = read_txt[txt_cursor++];
1468 if(!nch)break;
1469 int fourbit;
1470 for(fourbit=0;fourbit<15;fourbit++) if("=ACMGRSVTWYHKDBN"[fourbit] == nch)break;
1471
1472 if(bin_cursor %2 == 0) read_bin[bin_cursor/2] = fourbit<<4;
1473 else read_bin[bin_cursor/2] |= fourbit;
1474
1475 bin_cursor++;
1476 }
1477 }
1478
SamBam_compress_additional(char * additional_columns,char * bin)1479 int SamBam_compress_additional(char * additional_columns, char * bin)
1480 {
1481 int col_cursor = 0 , col_len = strlen(additional_columns);
1482 int bin_cursor = 0;
1483
1484 while(col_cursor<col_len)
1485 {
1486 if(col_cursor==0 || additional_columns[col_cursor]=='\t')
1487 {
1488 if(additional_columns[col_cursor]=='\t') col_cursor++;
1489
1490 bin[bin_cursor] = additional_columns[col_cursor];
1491 bin[bin_cursor+1] = additional_columns[col_cursor+1];
1492
1493 char datatype = additional_columns[col_cursor+3];
1494 if(datatype=='i' || datatype == 'f')
1495 {
1496 int dig_len =0;
1497 while(additional_columns[dig_len+col_cursor+5] != '\t' && additional_columns[dig_len+col_cursor+5]) dig_len++;
1498 int val = 0;
1499 float fval = 0;
1500 if(datatype=='i') val = atoi(additional_columns+col_cursor+5);
1501 else val = atof(additional_columns+col_cursor+5);
1502
1503 bin[bin_cursor+2]=datatype;
1504 memcpy(bin+bin_cursor+3, (datatype=='i')? ((void *)&val):((void *)&fval),4);
1505 bin_cursor += 3 + 4;
1506 col_cursor += 5 + dig_len;
1507 }
1508 else if(datatype=='Z' || datatype == 'H')
1509 {
1510 bin[bin_cursor+2]=datatype;
1511 bin_cursor +=3;
1512 int str_len = 0;
1513 col_cursor +=5;
1514 while(additional_columns[str_len+col_cursor] != '\t' && additional_columns[str_len+col_cursor])
1515 {
1516 bin[bin_cursor + str_len] = additional_columns[str_len+col_cursor];
1517 str_len++;
1518 if(bin_cursor + str_len > 780) break;
1519 }
1520
1521 bin[bin_cursor + str_len] =0;
1522
1523 bin_cursor += str_len + 1;
1524 col_cursor += str_len;
1525 }
1526 else if(datatype=='A')
1527 {
1528 bin[bin_cursor+2]='A';
1529 bin[bin_cursor+3]=additional_columns[col_cursor+5];
1530 col_cursor += 6;
1531 bin_cursor += 4;
1532 }
1533 else if(datatype=='B')
1534 //array
1535 {
1536 char celltype = additional_columns[col_cursor+5];
1537 int * items = (int *)(&bin[bin_cursor+4]);
1538
1539 bin[bin_cursor+2]='B';
1540 bin[bin_cursor+3]=celltype;
1541 bin_cursor += 4 + 4;
1542 col_cursor += 7;
1543
1544 (*items) = 0;
1545
1546 int last_cursor = col_cursor;
1547 while(1){
1548 if(additional_columns[col_cursor] == ',' || additional_columns[col_cursor] == '\t' || additional_columns[col_cursor] == 0)
1549 { // add new item
1550
1551 char cell_buff [30];
1552 if((col_cursor - last_cursor) < 29)
1553 {
1554 memcpy(cell_buff, additional_columns + last_cursor, (col_cursor - last_cursor));
1555 cell_buff[(col_cursor - last_cursor)] = 0;
1556 int intv = 0; float fltv = 0;
1557 if(celltype == 'i')intv = atoi(cell_buff);
1558 else fltv = atof(cell_buff);
1559 if(bin_cursor < 780){
1560 memcpy(bin + bin_cursor, (celltype == 'i')?(void *)&intv:(void *)&fltv, 4);
1561 bin_cursor += 4;
1562 (*items) ++;
1563 }
1564 }
1565 last_cursor = col_cursor+1;
1566 }
1567 if(additional_columns[col_cursor] == '\t' || additional_columns[col_cursor] == 0)
1568 break;
1569
1570 col_cursor++;
1571
1572 }
1573
1574 }
1575
1576 if(bin_cursor>750) break;
1577 continue;
1578 }
1579
1580 col_cursor++;
1581 }
1582 return bin_cursor;
1583 }
1584
SamBam_reg2bin(int beg,int end)1585 int SamBam_reg2bin(int beg, int end)
1586 {
1587 --end;
1588 if (beg>>14 == end>>14) return ((1<<15)-1)/7 + (beg>>14);
1589 if (beg>>17 == end>>17) return ((1<<12)-1)/7 + (beg>>17);
1590 if (beg>>20 == end>>20) return ((1<<9)-1)/7 + (beg>>20);
1591 if (beg>>23 == end>>23) return ((1<<6)-1)/7 + (beg>>23);
1592 if (beg>>26 == end>>26) return ((1<<3)-1)/7 + (beg>>26);
1593 return 0;
1594 }
1595
SamBam_writer_finish_header(SamBam_Writer * writer)1596 void SamBam_writer_finish_header( SamBam_Writer * writer ){
1597 if(writer -> writer_state == 0){
1598 if(writer -> header_plain_text_buffer)
1599 SamBam_writer_write_header(writer);
1600 writer -> writer_state = 10;
1601 }
1602 }
1603
1604 #define FC_MAX_CIGAR_SECTIONS 96
1605
1606 // The caller has to free the returned pointer.
1607 // It returns NULL if no such field.
duplicate_TAB_record_field(char * rline,int fno,int to_end)1608 char * duplicate_TAB_record_field(char * rline, int fno, int to_end){
1609 int i, fldi = 0, start_pos = -1;
1610 if(fno >=1)
1611 for(i=0;rline[i] && rline[i] != '\n'; i++){
1612 int nch = rline[i];
1613 if(nch == '\t'){
1614 fldi++;
1615 if(fldi == fno) start_pos = i+1;
1616 if(fldi == fno+1) break;
1617 }
1618 }
1619 else{
1620 start_pos = rline[0]>0?0:-1;
1621 if(start_pos == 0) for(i=0;rline[i] && rline[i] != '\n' && rline[i] != '\t'; i++);
1622 else i=-1;
1623 }
1624 int end_pos =i;
1625 if(to_end){
1626 end_pos = strlen(rline);
1627 if(end_pos<1) return NULL;
1628
1629 if(rline[end_pos-1]=='\n') end_pos--;
1630 }
1631
1632 if(start_pos<0 || end_pos <= start_pos) return NULL;
1633 char * ret = malloc(end_pos - start_pos +1);
1634 memcpy(ret, rline+start_pos, end_pos -start_pos);
1635 ret[end_pos -start_pos] = 0;
1636 return ret;
1637 }
1638
SamBam_writer_add_read_bin(SamBam_Writer * writer,int thread_no,char * rbin,int committable)1639 int SamBam_writer_add_read_bin(SamBam_Writer * writer, int thread_no, char * rbin, int committable){
1640 char * this_chunk_buffer=NULL;
1641 srInt_64 *this_chunk_buffer_used=NULL;
1642 if(thread_no >= 0){
1643 if(writer -> sort_reads_by_coord && writer -> threads_chunk_buffer_max_size[thread_no] < writer -> threads_chunk_buffer_used[thread_no] + 12000){
1644 writer -> threads_chunk_buffer_max_size[thread_no] = writer -> threads_chunk_buffer_max_size[thread_no] * 7/4;
1645 writer -> threads_chunk_buffer[ thread_no ] = realloc(writer -> threads_chunk_buffer[ thread_no ], writer -> threads_chunk_buffer_max_size[thread_no]);
1646 }
1647 this_chunk_buffer = writer -> threads_chunk_buffer[ thread_no ];
1648 this_chunk_buffer_used = writer -> threads_chunk_buffer_used + thread_no;
1649 }else{
1650 if(writer -> sort_reads_by_coord && writer -> chunk_buffer_max_size < writer -> chunk_buffer_used + 12000){
1651 //SUBREADprintf("0THR_REALLOCATE MEM : %d -> %d\n", writer -> chunk_buffer_max_size, writer -> chunk_buffer_max_size * 7/4);
1652 writer -> chunk_buffer_max_size = writer -> chunk_buffer_max_size * 7/4;
1653 writer -> chunk_buffer = realloc(writer -> chunk_buffer, writer -> chunk_buffer_max_size);
1654 }
1655 this_chunk_buffer = writer -> chunk_buffer;
1656 this_chunk_buffer_used = &writer -> chunk_buffer_used;
1657 }
1658 //SUBREADprintf("WRTXBIN: PTR %p of USED %lld at read %p\n", this_chunk_buffer, *this_chunk_buffer_used, rbin);
1659 int reclen=0;
1660 memcpy(&reclen, rbin,4);
1661 memcpy(this_chunk_buffer+(*this_chunk_buffer_used), rbin, reclen+4);
1662 (*this_chunk_buffer_used)+= reclen+4;
1663
1664 if((*this_chunk_buffer_used)>55000 && committable && !writer -> sort_reads_by_coord)
1665 SamBam_writer_add_chunk(writer, thread_no);
1666 //SUBREADprintf("WRTXBIN: FIN WTR %p of USED %lld\n", this_chunk_buffer, *this_chunk_buffer_used);
1667 return 0;
1668 }
SamBam_writer_add_read_line(SamBam_Writer * writer,int thread_no,char * rline,int committable)1669 int SamBam_writer_add_read_line(SamBam_Writer * writer, int thread_no, char * rline, int committable){
1670 char * read_name, * flag_str, *chro_name, *chro_position_str, * mapping_quality_str, * cigar, * next_chro_name, *next_chro_position_str, *temp_len_str, *read_text, *qual_text, *additional_columns;
1671 read_name = duplicate_TAB_record_field(rline, 0,0);
1672 flag_str = duplicate_TAB_record_field(rline, 1,0);
1673 chro_name = duplicate_TAB_record_field(rline, 2,0);
1674 chro_position_str = duplicate_TAB_record_field(rline, 3,0);
1675 mapping_quality_str = duplicate_TAB_record_field(rline, 4,0);
1676 cigar = duplicate_TAB_record_field(rline, 5,0);
1677 next_chro_name = duplicate_TAB_record_field(rline, 6,0);
1678 next_chro_position_str = duplicate_TAB_record_field(rline, 7,0);
1679 temp_len_str = duplicate_TAB_record_field(rline, 8,0);
1680 read_text = duplicate_TAB_record_field(rline, 9,0);
1681 qual_text = duplicate_TAB_record_field(rline, 10,0);
1682 additional_columns = duplicate_TAB_record_field(rline, 11,1);
1683 if(qual_text==NULL){
1684 SUBREADprintf("FATAL ERROR : bad read format: %s, %s, %s, %s\n", read_name, flag_str, chro_name, rline);
1685 return -1;
1686 }
1687
1688 SamBam_writer_add_read(writer, thread_no, read_name, atoi(flag_str), chro_name, atoi(chro_position_str), atoi(mapping_quality_str), cigar, next_chro_name, atoi(next_chro_position_str), atoi(temp_len_str), strlen(read_text), read_text, qual_text, additional_columns, committable);
1689
1690 if(additional_columns) free(additional_columns);
1691 free(qual_text);
1692 free(read_text);
1693 free(temp_len_str);
1694 free(next_chro_position_str);
1695 free(next_chro_name);
1696 free(cigar);
1697 free(mapping_quality_str);
1698 free(chro_position_str);
1699 free(chro_name);
1700 free(flag_str);
1701 free(read_name);
1702 return 0;
1703 }
1704
SamBam_writer_add_read(SamBam_Writer * writer,int thread_no,char * read_name,unsigned int flags,char * chro_name,unsigned int chro_position,int mapping_quality,char * cigar,char * next_chro_name,unsigned int next_chro_position,int temp_len,int read_len,char * read_text,char * qual_text,char * additional_columns,int committable)1705 int SamBam_writer_add_read(SamBam_Writer * writer, int thread_no, char * read_name, unsigned int flags, char * chro_name, unsigned int chro_position, int mapping_quality, char * cigar, char * next_chro_name, unsigned int next_chro_position, int temp_len, int read_len, char * read_text, char * qual_text, char * additional_columns, int committable)
1706 {
1707 assert(writer -> writer_state!=0);
1708 if(!qual_text || !read_text)
1709 {
1710 SUBREADprintf("ERROR: sam file is incomplete.\n");
1711 return 1;
1712 }
1713
1714 char additional_bin[1000];
1715 int cigar_opts[FC_MAX_CIGAR_SECTIONS], xk1, cover_length = 0;
1716 int cigar_opt_len = SamBam_compress_cigar(cigar, cigar_opts, & cover_length, FC_MAX_CIGAR_SECTIONS);
1717 int read_name_len = 1+strlen(read_name) ;
1718 int additional_bin_len = 0;
1719 if(additional_columns) additional_bin_len = SamBam_compress_additional(additional_columns, additional_bin);
1720 int record_length = 4 + 4 + 4 + 4 + /* l_seq: */ 4 + 4 + 4 + 4 + /* read_name:*/ read_name_len + cigar_opt_len * 4 + (read_len + 1) /2 + read_len + additional_bin_len;
1721
1722
1723 char * this_chunk_buffer;
1724 srInt_64 * this_chunk_buffer_used;
1725
1726 if(thread_no >= 0){
1727 if(writer -> sort_reads_by_coord && writer -> threads_chunk_buffer_max_size[thread_no] < writer -> threads_chunk_buffer_used[thread_no] + 12000){
1728 writer -> threads_chunk_buffer_max_size[thread_no] = writer -> threads_chunk_buffer_max_size[thread_no] * 7/4;
1729 writer -> threads_chunk_buffer[ thread_no ] = realloc(writer -> threads_chunk_buffer[ thread_no ], writer -> threads_chunk_buffer_max_size[thread_no]);
1730 }
1731 this_chunk_buffer = writer -> threads_chunk_buffer[ thread_no ];
1732 this_chunk_buffer_used = writer -> threads_chunk_buffer_used + thread_no;
1733 }else{
1734 if(writer -> sort_reads_by_coord && writer -> chunk_buffer_max_size < writer -> chunk_buffer_used + 12000){
1735 //SUBREADprintf("REALLOCATE MEM : %d -> %d\n", writer -> chunk_buffer_max_size, writer -> chunk_buffer_max_size * 7/4);
1736 writer -> chunk_buffer_max_size = writer -> chunk_buffer_max_size * 7/4;
1737 writer -> chunk_buffer = realloc(writer -> chunk_buffer, writer -> chunk_buffer_max_size);
1738 }
1739 this_chunk_buffer = writer -> chunk_buffer;
1740 this_chunk_buffer_used = &writer -> chunk_buffer_used;
1741 }
1742
1743 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & record_length , 4);
1744 (*this_chunk_buffer_used) += 4;
1745
1746 int bin = SamBam_reg2bin(chro_position -1, chro_position-1+cover_length);
1747 //if(chro_position>0)SUBREADprintf("CREATE_BIN=%d\n", bin);
1748
1749 int refID = HashTableGet(writer -> chromosome_name_table, chro_name) - NULL - 1;
1750 int bin_mq_nl = (bin<<16) | (mapping_quality << 8) | read_name_len ;
1751 int fag_nc = (flags<<16) | cigar_opt_len;
1752 int nextRefID = -1;
1753
1754 if(next_chro_name[0] != '*' && next_chro_name[0]!='=')
1755 nextRefID = HashTableGet(writer -> chromosome_name_table, next_chro_name) - NULL - 1;
1756 else if(next_chro_name[0] == '=')
1757 nextRefID = refID;
1758
1759
1760 chro_position--;
1761 next_chro_position--;
1762
1763 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & refID , 4);
1764 (*this_chunk_buffer_used) += 4;
1765 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & chro_position , 4);
1766 (*this_chunk_buffer_used) += 4;
1767 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & bin_mq_nl , 4);
1768 (*this_chunk_buffer_used) += 4;
1769 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & fag_nc , 4);
1770 (*this_chunk_buffer_used) += 4;
1771 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & read_len , 4);
1772 (*this_chunk_buffer_used) += 4;
1773 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & nextRefID , 4);
1774 (*this_chunk_buffer_used) += 4;
1775 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & next_chro_position , 4);
1776 (*this_chunk_buffer_used) += 4;
1777 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & temp_len , 4);
1778 (*this_chunk_buffer_used) += 4;
1779 strcpy(this_chunk_buffer + (*this_chunk_buffer_used) , read_name);
1780 (*this_chunk_buffer_used) += read_name_len;
1781 memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , cigar_opts, 4*cigar_opt_len);
1782 (*this_chunk_buffer_used) += 4*cigar_opt_len;
1783 SamBam_read2bin(read_text , this_chunk_buffer + (*this_chunk_buffer_used));
1784 (*this_chunk_buffer_used) += (read_len + 1) /2;
1785 memcpy(this_chunk_buffer + (*this_chunk_buffer_used), qual_text, read_len);
1786 for(xk1=0; xk1<read_len; xk1++)
1787 this_chunk_buffer[(*this_chunk_buffer_used)+xk1] -= 33;
1788
1789 (*this_chunk_buffer_used) += read_len;
1790 memcpy(this_chunk_buffer + (*this_chunk_buffer_used), additional_bin, additional_bin_len);
1791 (*this_chunk_buffer_used) += additional_bin_len;
1792
1793
1794 if((*this_chunk_buffer_used)>55000 && committable && !writer -> sort_reads_by_coord)
1795 SamBam_writer_add_chunk(writer, thread_no);
1796
1797 return 0;
1798 }
1799
SamBam_unzip(char * out,int outlen,char * in,int inlen,int sync_only)1800 int SamBam_unzip(char * out, int outlen , char * in , int inlen, int sync_only)
1801 {
1802 z_stream strm;
1803 strm.zalloc = Z_NULL;
1804 strm.zfree = Z_NULL;
1805 strm.opaque = Z_NULL;
1806 strm.avail_in = 0;
1807 strm.next_in = Z_NULL;
1808 int ret = inflateInit2(&strm, SAMBAM_GZIP_WINDOW_BITS);
1809 if (ret != Z_OK)
1810 return -1;
1811
1812 strm.avail_in = (unsigned int)inlen;
1813 strm.next_in = (unsigned char *)in;
1814
1815 strm.avail_out = outlen;
1816 strm.next_out = (unsigned char *)out;
1817 ret = inflate(&strm, sync_only?Z_SYNC_FLUSH:Z_FINISH);
1818 if(ret != Z_STREAM_END && ret!=0)
1819 {
1820 inflateEnd(&strm);
1821 SUBREADprintf("DATA ERROR! code=%d\n", ret);
1822 assert(0);
1823 return -1;
1824 }
1825 int have = outlen - strm.avail_out;
1826
1827 inflateEnd(&strm);
1828 //SUBREADprintf("DECOMPRESS GENERATED=%d\n", have);
1829
1830 return have;
1831 }
1832
1833
SamBam_writer_sort_buff_one_compare(void * Lbin,void * Rbin,ArrayList * me)1834 int SamBam_writer_sort_buff_one_compare(void * Lbin, void * Rbin, ArrayList * me){
1835 unsigned long long * Lv = Lbin, *Rv = Rbin;
1836 if(*Lv > *Rv) return 1;
1837 else if(*Lv < *Rv) return -1;
1838 return 0;
1839 }
1840 // return total reads in bin.
SamBam_writer_sort_buff_one_write(SamBam_Writer * writer,char * bin,int binlen,int thread_id)1841 int SamBam_writer_sort_buff_one_write(SamBam_Writer * writer, char * bin, int binlen, int thread_id){
1842 int bin_cursor = 0;
1843
1844 ArrayList* sort_linear_pos = ArrayListCreate(1000000);
1845 ArrayListSetDeallocationFunction(sort_linear_pos, free);
1846
1847 int ii_reads = 0;
1848 while(bin_cursor < binlen){
1849 int this_block_len = 0;
1850 memcpy(&this_block_len, bin + bin_cursor, 4);
1851
1852 char * key_binpos = malloc(12);
1853 memcpy(key_binpos, bin + bin_cursor+8, 4);
1854 memcpy(key_binpos+4, bin + bin_cursor+4, 4);
1855 memcpy(key_binpos+8, &bin_cursor, 4);
1856
1857 ArrayListPush(sort_linear_pos, key_binpos);
1858 bin_cursor +=4 + this_block_len;
1859
1860 ii_reads ++;
1861 }
1862 ArrayListSort(sort_linear_pos, SamBam_writer_sort_buff_one_compare);
1863
1864 char * nbin = NULL;
1865 if(binlen >0 && binlen < 0x7fffffff)nbin=malloc(binlen);
1866 int nb_cursor = 0, xx, wlen=0;
1867 for(xx=0; xx<ii_reads;xx++){
1868 int * binpos = ArrayListGet(sort_linear_pos, xx);
1869 int block_len = 0;
1870 memcpy(&block_len, bin + binpos[2] , 4);
1871 assert(block_len < 10000);
1872 memcpy(nbin + nb_cursor, bin + binpos[2], 4+block_len);
1873 nb_cursor += 4+block_len;
1874 }
1875 assert(binlen == nb_cursor);
1876 memcpy(bin, nbin, binlen);
1877 ArrayListDestroy(sort_linear_pos);
1878
1879 char tmpfname[MAX_FILE_NAME_LENGTH+40];
1880 if(writer -> threads>1) subread_lock_occupy(&writer -> thread_bam_lock);
1881 sprintf(tmpfname, "%s-%06d.sortedbin", writer -> tmpf_prefix, writer -> sorted_batch_id++);
1882 if(writer -> threads>1) subread_lock_release(&writer -> thread_bam_lock);
1883 FILE * tofp = fopen(tmpfname, "wb");
1884 if(tofp){
1885 if(binlen>0)wlen = fwrite(nbin, binlen,1, tofp);
1886 fclose(tofp);
1887 }
1888 free(nbin);
1889 if(wlen < 1 && binlen>0) {
1890 SUBREADprintf("ERROR: no space (%d bytes) in the temp directory (%s).\nThe program cannot run properly.\n", binlen, tmpfname);
1891 writer ->is_internal_error = 1;
1892 return -1;
1893 }
1894 return ii_reads;
1895 }
1896
SamBam_writer_sort_bins_to_BAM_FP_pos(FILE * fp)1897 unsigned long long SamBam_writer_sort_bins_to_BAM_FP_pos(FILE * fp){
1898 int intvals[3];
1899 unsigned long long ret = 0;
1900 int rlen = fread(intvals, sizeof(int),3,fp);
1901 if(rlen>0 && intvals[0]<10000){
1902 int chro_no, poss;
1903 chro_no=intvals[1];
1904 poss=intvals[2];
1905 ret =((1LLU*chro_no) << 32)| poss;
1906 if(ret == SUBREAD_MAX_ULONGLONG) ret = ret- 10 ;
1907 fseek(fp, -12, SEEK_CUR);
1908 return ret;
1909 }
1910 return SUBREAD_MAX_ULONGLONG;
1911 }
1912
1913
SamBam_writer_calc_cigar_span(char * bin)1914 int SamBam_writer_calc_cigar_span(char * bin){
1915 int cops = 0, rname_len = 0, ret = 0;
1916 memcpy(&cops, bin+12, 4);
1917 memcpy(&rname_len, bin+8, 4);
1918 rname_len = rname_len & 0xff;
1919 cops = cops & 0xffff;
1920 int ii;
1921 for(ii = 0; ii < cops ; ii++){
1922 unsigned int copt = 0;
1923 memcpy(&copt, bin+32+rname_len+4*ii, 4);
1924 int copt_char = copt & 0xf;
1925 unsigned int copt_len = copt >> 4;
1926 if(copt_char == 0 || copt_char == 2 || copt_char == 3 || copt_char == 7 || copt_char == 8) ret += copt_len;
1927 }
1928
1929 return ret;
1930 }
1931
1932 #define MAX_ALLOWED_GAP_IN_BAI_CHUNK 5 // the number of blocks, not the file positions.
1933
SamBam_writer_sort_bins_to_BAM_test_bins(SamBam_Writer * writer,HashTable * bin_tab,ArrayList * bin_list,ArrayList * win16k_list,int block_len,void *** last_chunk_ptr,int chro_no)1934 void SamBam_writer_sort_bins_to_BAM_test_bins(SamBam_Writer * writer, HashTable * bin_tab, ArrayList * bin_list, ArrayList * win16k_list, int block_len, void ***last_chunk_ptr, int chro_no){
1935 int inbin_pos = writer -> chunk_buffer_used - block_len ; // point to the byte AFTER "block_len" int.
1936 int pos=0, bin_mq_nl = 0, binno=0;
1937
1938 memcpy(&pos, writer -> chunk_buffer + inbin_pos + 4, 4);
1939 memcpy(&bin_mq_nl, writer -> chunk_buffer + inbin_pos + 8,4);
1940 binno = bin_mq_nl>>16;
1941
1942 int cigar_span = SamBam_writer_calc_cigar_span(writer -> chunk_buffer + inbin_pos);
1943
1944 int this_w16_no = (pos + cigar_span) >>14; // WIN is calculated on 0-based pos.
1945 unsigned long long this_Vpos = writer -> this_bam_block_no<<16 | (inbin_pos-4);
1946
1947 // if this read is after the maximum coordinate in the win16k list: all elements before last one and this one starts at this read.
1948 if(this_w16_no >=win16k_list->numOfElements){
1949 int bbi;
1950 for(bbi = win16k_list->numOfElements; bbi <=this_w16_no; bbi++)
1951 ArrayListPush(win16k_list, NULL+ this_Vpos);
1952 }
1953
1954 // a read only belongs to ONE bin which is the smallest bin that can FULLY cover the full length of this read.
1955 ArrayList * this_bin_chunks = HashTableGet(bin_tab, NULL+binno+1);
1956 if(NULL == this_bin_chunks){
1957 this_bin_chunks = ArrayListCreate(5);
1958 HashTablePut(bin_tab, NULL+binno+1, this_bin_chunks);
1959 ArrayListPush(bin_list, NULL+binno);
1960 }
1961
1962 int found = 0;
1963 // a bin is not necessarily continuous. Say, a top-level bin only contains a few reads (most reads a in low-level bins), but their locations are everywhere.
1964 if(this_bin_chunks -> numOfElements > 0){
1965 //SUBREADprintf("RESLOCS : %ld == %ld\n", this_Vpos, this_bin_chunks -> elementList [ this_bin_chunks -> numOfElements - 1 ] - NULL);
1966 long long diff = this_Vpos >>16;
1967 diff -=(this_bin_chunks -> elementList [ this_bin_chunks -> numOfElements - 1] - NULL)>>16;
1968 if(diff < MAX_ALLOWED_GAP_IN_BAI_CHUNK){
1969 this_bin_chunks -> elementList [ this_bin_chunks -> numOfElements -1] = NULL+this_Vpos + block_len+4;
1970 *last_chunk_ptr = this_bin_chunks -> elementList + this_bin_chunks -> numOfElements -1;
1971 found = 1;
1972 }
1973 }
1974
1975 // if the last chunk in this bin isn't good to be extended (too far from the file location of the new read), a new chunk is created.
1976 if(!found){
1977 ArrayListPush(this_bin_chunks, NULL + this_Vpos);
1978 ArrayListPush(this_bin_chunks, NULL + this_Vpos + block_len+4);
1979 *last_chunk_ptr = this_bin_chunks -> elementList + this_bin_chunks -> numOfElements -1;
1980 }
1981 }
1982
SamBam_writer_sorted_compress(void * vptr0)1983 void * SamBam_writer_sorted_compress(void * vptr0){
1984 void ** vptr = vptr0;
1985 SamBam_Writer * writer = vptr[0];
1986 int this_thread_no = vptr[1]-NULL;
1987 subread_lock_t * initlock = vptr[2];
1988
1989 worker_thread_start(&writer->sorted_notifier, this_thread_no);
1990 subread_lock_release(initlock);
1991 free(vptr0);
1992 struct SamBam_sorted_compressor_st * me = writer -> writer_threads + this_thread_no;
1993
1994 while(1){
1995 int termed = worker_wait_for_job(&writer->sorted_notifier, this_thread_no);
1996 if(termed)break;
1997
1998 me->CRC32_plain = SamBam_CRC32(me->plain_text, me->text_size);
1999 me->strm.next_in = (unsigned char*)me->plain_text;
2000 me->strm.avail_in = me->text_size;
2001 me->strm.next_out = (unsigned char*)me->zipped_bin;
2002 me->strm.avail_out = 70000;
2003 int deret = deflate(&me->strm,Z_FINISH);
2004 if(deret >=0){
2005 deflateReset(&me->strm);
2006 me->bin_size = 70000 - me->strm.avail_out;
2007 me->last_job_done = 1;
2008 }else{
2009 SUBREADprintf("Error: cannot compress BAM block #%d , which is %llu, had %d => 70000 [ %d ] bytes , return = %d\n", this_thread_no, me -> bam_block_no, me->text_size, me->bin_size, deret);
2010 }
2011 }
2012 return NULL;
2013 }
2014
SamBam_thread_wait_merge_write(SamBam_Writer * writer,int thread_no)2015 void SamBam_thread_wait_merge_write(SamBam_Writer * writer, int thread_no){
2016 master_wait_for_job_done(&writer -> sorted_notifier, thread_no);
2017
2018 if(writer -> writer_threads[thread_no].last_job_done){
2019 srInt_64 fpos = ftello(writer -> bam_fp);
2020 HashTablePut(writer -> block_no_p1_to_vpos_tab, NULL+1+writer -> writer_threads[thread_no].bam_block_no, NULL+fpos);
2021
2022 SamBam_writer_chunk_header(writer, writer -> writer_threads[thread_no].bin_size);
2023 int rlen = fwrite( writer -> writer_threads[thread_no].zipped_bin ,1, writer -> writer_threads[thread_no].bin_size, writer -> bam_fp);
2024 if(rlen != writer -> writer_threads[thread_no].bin_size){
2025 SUBREADprintf("ERROR: cannot write output files.\n");
2026 assert(0);
2027 }
2028 rlen = fwrite(&writer -> writer_threads[thread_no].CRC32_plain , 4, 1, writer -> bam_fp);
2029 rlen = fwrite(&writer -> writer_threads[thread_no].text_size , 4, 1, writer -> bam_fp);
2030
2031 writer -> writer_threads[thread_no].text_size = 0;
2032 writer -> writer_threads[thread_no].bin_size = 0;
2033 writer -> writer_threads[thread_no].bam_block_no = -1;
2034 writer -> writer_threads[thread_no].last_job_done = 0;
2035 }
2036 }
2037
SamBam_thread_set_new_job(SamBam_Writer * writer,int thread_no)2038 void SamBam_thread_set_new_job(SamBam_Writer * writer, int thread_no){
2039 memcpy(writer -> writer_threads[thread_no].plain_text, writer ->chunk_buffer, writer->chunk_buffer_used);
2040 writer -> writer_threads[thread_no].text_size = writer->chunk_buffer_used;
2041 writer -> writer_threads[thread_no].bam_block_no = writer -> this_bam_block_no;
2042 writer -> chunk_buffer_used = 0;
2043 master_notify_worker(&writer -> sorted_notifier, thread_no);
2044 }
2045
SamBam_writer_submit_sorted_compressing_task(SamBam_Writer * writer)2046 void SamBam_writer_submit_sorted_compressing_task(SamBam_Writer * writer){
2047 SamBam_thread_wait_merge_write(writer, writer -> sorted_compress_this_thread_no);
2048 SamBam_thread_set_new_job(writer, writer -> sorted_compress_this_thread_no);
2049
2050 writer -> sorted_compress_this_thread_no++;
2051 if(writer -> sorted_compress_this_thread_no == writer -> threads) writer -> sorted_compress_this_thread_no=0;
2052 writer -> this_bam_block_no ++;
2053 }
2054
SamBam_writer_sort_bins_to_BAM_write_1R(SamBam_Writer * writer,FILE * fp,HashTable * bin_tab,ArrayList * bin_list,ArrayList * win16k_list,int chro_no)2055 void SamBam_writer_sort_bins_to_BAM_write_1R(SamBam_Writer * writer, FILE * fp, HashTable * bin_tab, ArrayList * bin_list, ArrayList * win16k_list, int chro_no){
2056 int block_len=0;
2057 int rlen = fread(&block_len, 4,1,fp);
2058 //SUBREADprintf("WBIN=%d\n", block_len);
2059 if(rlen<1 || block_len >= 10000){
2060 SUBREADprintf("ERROR: sorted bin files are broken. RLEN=%d , BLKLEN=%d\n", rlen, block_len);
2061 assert(rlen >=1);
2062 }
2063
2064 memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used, &block_len, 4);
2065 writer ->chunk_buffer_used += 4;
2066 rlen = fread(writer -> chunk_buffer + writer -> chunk_buffer_used, 1, block_len ,fp);
2067 if(rlen < block_len){
2068 SUBREADprintf("ERROR: sorted bin files are broken.\n");
2069 assert(rlen >=block_len);
2070 }
2071 writer -> chunk_buffer_used+= rlen;
2072
2073 void ** last_chunk_ptr = NULL;
2074 SamBam_writer_sort_bins_to_BAM_test_bins(writer, bin_tab, bin_list, win16k_list, block_len, &last_chunk_ptr, chro_no);
2075
2076 if(writer -> chunk_buffer_used>55000){
2077 assert(writer -> chunk_buffer_used< 65000);
2078 SamBam_writer_submit_sorted_compressing_task(writer);
2079 }
2080 }
2081
2082 #define SAMBAM_MERGE_MAX_FPS (350)
2083
SamBam_writer_one_thread_merge_sortedbins(SamBam_Writer * writer)2084 void SamBam_writer_one_thread_merge_sortedbins(SamBam_Writer * writer){
2085 int new_bins = 0;
2086
2087 if(writer -> sorted_batch_id > SAMBAM_MERGE_MAX_FPS){
2088 int merge_i;
2089 for(merge_i = 0; merge_i < writer -> sorted_batch_id; merge_i+= SAMBAM_MERGE_MAX_FPS){
2090 char tfp[MAX_FILE_NAME_LENGTH+50];
2091 int this_size = min(SAMBAM_MERGE_MAX_FPS, writer -> sorted_batch_id - merge_i);
2092 if(this_size < 2) break;
2093
2094 //SUBREADprintf("CREATE_MERGE %d\n", new_bins + writer -> sorted_batch_id);
2095 sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, writer -> sorted_batch_id + (new_bins++) );
2096 FILE * outbinfp = fopen(tfp,"wb");
2097
2098 FILE ** sb_fps = malloc(sizeof(FILE *) * this_size);
2099 unsigned long long * current_min_fps = malloc(sizeof(unsigned long long) * this_size);
2100 unsigned long long current_min = SUBREAD_MAX_ULONGLONG;
2101 int current_min_batch = -1, bii;
2102
2103 for(bii = 0; bii < this_size; bii++){
2104 char tfpx[MAX_FILE_NAME_LENGTH+50];
2105 current_min_fps[bii] = SUBREAD_MAX_ULONGLONG;
2106
2107 sprintf(tfpx , "%s-%06d.sortedbin", writer -> tmpf_prefix, bii + merge_i);
2108 sb_fps[bii] = fopen(tfpx,"rb");
2109 current_min_fps[bii] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[bii]);
2110 if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2111 current_min = current_min_fps[bii];
2112 current_min_batch = bii;
2113 }
2114 }
2115
2116 char * mtmp = malloc(10000);
2117 while(1){
2118 if(current_min_batch>-1){
2119 int blk_len = 0;
2120 int rrlen = fread(&blk_len, 4, 1, sb_fps[current_min_batch]);
2121 if(rrlen <1) assert(rrlen >0);
2122 assert(blk_len < 10000);
2123 rrlen = fread(mtmp, blk_len, 1, sb_fps[current_min_batch]);
2124 if(rrlen <1) assert(rrlen >0);
2125 fwrite(&blk_len,4,1, outbinfp);
2126 fwrite(mtmp, blk_len,1, outbinfp);
2127
2128 current_min_fps[current_min_batch] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[current_min_batch]);
2129 current_min = SUBREAD_MAX_ULONGLONG;
2130 current_min_batch = -1;
2131 }else break;
2132
2133 for(bii = 0; bii < this_size; bii++){
2134 if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2135 current_min = current_min_fps[bii];
2136 current_min_batch = bii;
2137 }
2138 }
2139 }
2140 free(mtmp);
2141
2142 for(bii = 0; bii < this_size; bii++){
2143 fclose(sb_fps[bii]);
2144 sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, merge_i + bii);
2145 //SUBREADprintf(" DEL_MERGE %d\n", bii);
2146 unlink(tfp);
2147 }
2148 fclose(outbinfp);
2149 }
2150
2151 writer -> sorted_batch_id += new_bins;
2152 }
2153 }
2154
2155
2156 #define SAMBAM_renew_BAItabs { if(bin_chunks_table != NULL) { HashTableDestroy(bin_chunks_table); ArrayListDestroy(bins_list); ArrayListDestroy(win16k_list) ; } \
2157 bin_chunks_table = HashTableCreate(10000); bins_list = ArrayListCreate(10000); win16k_list = ArrayListCreate(10000);\
2158 HashTableSetDeallocationFunctions(bin_chunks_table, NULL, (void(*)(void *))ArrayListDestroy); }
2159
2160 #define SAMBAM_write_empty_chros(nSTART, nEND) {int iikk; for(iikk=(nSTART); iikk<(nEND);iikk++){ int wwwlen = fwrite("\0\0\0\0\0\0\0\0", 1,8,writer -> BAI_fp); if(wwwlen < 8){ assert(wwwlen==8);}} }
2161
2162 int level_min_binno[] = {0, 1, 9, 73, 585, 4681};
2163
SamBam_writer_merge_chunks_compare(void * vL,void * vR,ArrayList * me)2164 int SamBam_writer_merge_chunks_compare(void * vL, void * vR, ArrayList * me){
2165 long long * lL = vL;
2166 long long * lR = vR;
2167 if( (*lL) > (*lR)) return 1;
2168 if( (*lL) < (*lR)) return -1;
2169 return 0;
2170 }
2171
SamBam_writer_merge_chunks(ArrayList * chs)2172 void SamBam_writer_merge_chunks(ArrayList * chs){
2173 ArrayList * intvs = ArrayListCreate(chs -> numOfElements/2);
2174 ArrayListSetDeallocationFunction(intvs, free);
2175 int i;
2176 for(i=0; i<chs -> numOfElements; i+=2){
2177 long long * ll = malloc(sizeof(long long)*2);
2178 ll[0] = ArrayListGet(chs, i)-NULL;
2179 ll[1] = ArrayListGet(chs, i+1)-NULL;
2180 ArrayListPush(intvs, ll);
2181 }
2182 chs -> numOfElements = 0;
2183
2184 ArrayListSort(intvs, SamBam_writer_merge_chunks_compare);
2185 long long * ll0 = ArrayListGet(intvs, 0);
2186 ArrayListPush(chs, NULL+ll0[0]);
2187 ArrayListPush(chs, NULL+ll0[1]);
2188
2189 for(i=1; i<intvs-> numOfElements; i++){
2190 long long last_end = ArrayListGet(chs, chs -> numOfElements-1) -NULL;
2191 long long * ll_this = ArrayListGet(intvs, i);
2192
2193 long long dist = ll_this[0]>>16;
2194 dist -= last_end>>16;
2195
2196 if(dist < MAX_ALLOWED_GAP_IN_BAI_CHUNK)
2197 chs-> elementList[chs -> numOfElements-1] = NULL+max(ll_this[1],last_end);
2198 else{
2199 ArrayListPush(chs, NULL+ll_this[0]);
2200 ArrayListPush(chs, NULL+ll_this[1]);
2201 }
2202 }
2203
2204 //SUBREADprintf("MERGE CHUNKS : %ld -> %ld\n", intvs-> numOfElements, chs -> numOfElements/2);
2205 ArrayListDestroy(intvs);
2206 }
2207
SamBam_writer_optimize_bins_level(HashTable * bin_tab,ArrayList * bin_arr,HashTable * new_tab,ArrayList * new_arrs,int this_level)2208 void SamBam_writer_optimize_bins_level(HashTable *bin_tab, ArrayList *bin_arr, HashTable * new_tab, ArrayList * new_arrs, int this_level){
2209 int i,j, my_min_binno = level_min_binno[this_level], my_parent_min_binno = -1, my_child_min_binno = 999999;
2210 if(this_level>0) my_parent_min_binno = level_min_binno[this_level-1];
2211 if(this_level<5) my_child_min_binno = level_min_binno[this_level+1];
2212
2213
2214 // copy all parents and ancestors and children and descendant bins into new table & list
2215 for(i=0; i< bin_arr -> numOfElements; i++){
2216 int binno = ArrayListGet(bin_arr, i)-NULL;
2217 if(binno < my_min_binno || binno >= my_child_min_binno){
2218 ArrayList * old_bin_arr = HashTableGet(bin_tab, NULL+1+binno);
2219 if(old_bin_arr -> numOfElements > 1){
2220 HashTablePut(new_tab, NULL+1+binno, ArrayListDuplicate(old_bin_arr));
2221 ArrayListPush(new_arrs, NULL+binno);
2222 }
2223 }
2224 }
2225
2226 // scan this_level bins. If small enough -> put to parent bin and delete it.
2227 // Otherwise put it into new tab&list.
2228 for(i=0; i< bin_arr -> numOfElements; i++){
2229 int binno = ArrayListGet(bin_arr, i)-NULL;
2230 if(binno < my_min_binno || binno >= my_child_min_binno )continue;
2231 ArrayList * small_bin_arr = HashTableGet(bin_tab, NULL+1+binno);
2232 if(small_bin_arr -> numOfElements < 2) continue;
2233
2234 long long min_start_Voff = SUBREAD_MAX_LONGLONG;
2235 long long max_end_Voff = -1;
2236
2237 for(j =0 ; j< small_bin_arr -> numOfElements; j+=2){
2238 long long this_start = ArrayListGet(small_bin_arr, j)-NULL;
2239 long long this_end = ArrayListGet(small_bin_arr, j+1)-NULL;
2240 min_start_Voff = min(min_start_Voff, this_start);
2241 max_end_Voff = max(max_end_Voff, this_end);
2242 }
2243
2244 long long dist = max_end_Voff >> 16;
2245 dist -= min_start_Voff >>16;
2246 if(dist < MAX_ALLOWED_GAP_IN_BAI_CHUNK) {
2247 int parent_binno = my_parent_min_binno +((binno - my_min_binno)>>3);
2248 ArrayList * parent_bin_arr = HashTableGet(new_tab, NULL+1+parent_binno);
2249 if(NULL == parent_bin_arr){
2250 parent_bin_arr = ArrayListCreate(10);
2251 HashTablePut(new_tab, NULL+1+parent_binno, parent_bin_arr);
2252 ArrayListPush(new_arrs, NULL+parent_binno);
2253 }
2254 for(j =0 ; j< small_bin_arr -> numOfElements; j++)
2255 ArrayListPush(parent_bin_arr, ArrayListGet(small_bin_arr, j));
2256
2257 } else {
2258 HashTablePut(new_tab, NULL+1+binno, ArrayListDuplicate(small_bin_arr));
2259 ArrayListPush(new_arrs, NULL + binno);
2260 }
2261 }
2262
2263 // look into all parent_level bins, reduce its size
2264 for(i=0; i< new_arrs -> numOfElements; i++){
2265 int binno = ArrayListGet(new_arrs, i)-NULL;
2266 if(binno >=my_min_binno || binno < my_parent_min_binno) continue;
2267 ArrayList * large_bin_arr = HashTableGet(new_tab, NULL+1+binno);
2268 SamBam_writer_merge_chunks(large_bin_arr);
2269 }
2270
2271 //SUBREADprintf("Tab size : %ld => %ld\n",bin_tab -> numOfElements, new_tab -> numOfElements);
2272 HashTableDestroy(bin_tab);
2273 ArrayListDestroy(bin_arr);
2274 }
2275
SamBam_writer_optimize_bins(HashTable * bin_tab,ArrayList * bin_arr,HashTable ** new_tab,ArrayList ** new_arrs)2276 void SamBam_writer_optimize_bins(HashTable *bin_tab, ArrayList *bin_arr, HashTable ** new_tab, ArrayList ** new_arrs){
2277 int this_level;
2278 for(this_level = 5; this_level > 2; this_level --){
2279 // SUBREADprintf("OPTIMIZING AT LEVEL %d\n", this_level);
2280 HashTable * new_bin_tab = HashTableCreate(2000);
2281 HashTableSetDeallocationFunctions(new_bin_tab, NULL, (void(*)(void *))ArrayListDestroy);
2282 ArrayList * new_bin_list = ArrayListCreate(2000);
2283
2284 SamBam_writer_optimize_bins_level(bin_tab, bin_arr, new_bin_tab, new_bin_list, this_level);
2285 bin_tab = new_bin_tab;
2286 bin_arr = new_bin_list;
2287 (*new_tab) = new_bin_tab;
2288 (*new_arrs) = new_bin_list;
2289 }
2290 }
2291
SamBam_write_sorted_thread_collect(SamBam_Writer * writer)2292 void SamBam_write_sorted_thread_collect(SamBam_Writer * writer){
2293 int thread_i;
2294 if(writer -> chunk_buffer_used>0) SamBam_writer_submit_sorted_compressing_task(writer);
2295 for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2296 //SUBREADprintf("FINISHED %d-th CHRO, MERGING THR %d\n", old_chro_no, writer->sorted_compress_this_thread_no );
2297 SamBam_thread_wait_merge_write(writer, writer->sorted_compress_this_thread_no);
2298 writer->sorted_compress_this_thread_no ++;
2299 if(writer->sorted_compress_this_thread_no == writer -> threads) writer->sorted_compress_this_thread_no = 0;
2300 }
2301 }
2302
2303 #define SAMBAM_Block2Vpos(v) (v)=((HashTableGet(writer -> block_no_p1_to_vpos_tab, NULL+(v>>16)+1)-NULL )<<16 )|(v & 0xffff);
SamBam_write_BAI_for_1chr(SamBam_Writer * writer,HashTable ** bin_chunks_table,ArrayList ** bins_list,ArrayList ** win16k_list)2304 void SamBam_write_BAI_for_1chr(SamBam_Writer * writer, HashTable ** bin_chunks_table, ArrayList ** bins_list, ArrayList ** win16k_list){
2305 SamBam_write_sorted_thread_collect(writer);
2306
2307 if(1){
2308 HashTable * new_bin_tab = NULL;
2309 ArrayList * new_bin_list = NULL;
2310 SamBam_writer_optimize_bins(*bin_chunks_table ,*bins_list,&new_bin_tab,&new_bin_list);
2311 *bin_chunks_table = new_bin_tab;
2312 *bins_list = new_bin_list;
2313 }
2314
2315 int n_bins = (*bins_list)->numOfElements;
2316 int win16_nos = (*win16k_list)->numOfElements;
2317 //SUBREADprintf("WIN_ITEMS=%d\t\t\tOLD_CHRO=%d NEW_CHRO=%d CURR_BATCH=%d\n", win16_nos, old_chro_no, chro_no , current_min_batch);
2318
2319 fwrite(&n_bins, 4, 1, writer -> BAI_fp);
2320 int bini;
2321 for(bini = 0; bini < n_bins; bini ++){
2322 int bin_no = ArrayListGet(*bins_list, bini)-NULL;
2323 ArrayList * chunks_list = HashTableGet(*bin_chunks_table, NULL+ bin_no+1);
2324
2325 int n_chunks = chunks_list -> numOfElements/2;
2326 fwrite(&bin_no, 4, 1, writer -> BAI_fp);
2327 fwrite(&n_chunks, 4, 1, writer -> BAI_fp);
2328 int chunk_i;
2329 for(chunk_i = 0; chunk_i < 2* n_chunks; chunk_i +=2){
2330 long long int Voff_st = ArrayListGet(chunks_list, chunk_i) - NULL;
2331 long long int Voff_en = ArrayListGet(chunks_list, chunk_i+1) - NULL;
2332 SAMBAM_Block2Vpos(Voff_st);
2333 SAMBAM_Block2Vpos(Voff_en);
2334 fwrite(&Voff_st, 8, 1, writer -> BAI_fp);
2335 fwrite(&Voff_en, 8, 1, writer -> BAI_fp);
2336 }
2337 }
2338
2339 fwrite(&win16_nos, 4, 1, writer -> BAI_fp);
2340 for(bini = 0; bini < win16_nos; bini++){
2341 long long int Voff = ArrayListGet(*win16k_list , bini) - NULL;
2342 SAMBAM_Block2Vpos(Voff);
2343 fwrite(&Voff, 8, 1, writer -> BAI_fp);
2344 }
2345
2346 }
2347
2348 #define SAMBAM_reset_sorting_writer { HashTableRemoveAll( writer -> block_no_p1_to_vpos_tab ); }
2349
2350 // MUST be called by THREAD 0!
SamBam_writer_sort_bins_to_BAM(SamBam_Writer * writer)2351 void SamBam_writer_sort_bins_to_BAM(SamBam_Writer * writer){
2352 SamBam_writer_one_thread_merge_sortedbins(writer);
2353
2354 FILE ** sb_fps = malloc(sizeof(FILE *) * writer -> sorted_batch_id);
2355 unsigned long long * current_min_fps = malloc(sizeof(unsigned long long) * writer -> sorted_batch_id);
2356 int bii;
2357 unsigned long long current_min = SUBREAD_MAX_ULONGLONG;
2358 int current_min_batch = -1;
2359 writer -> chunk_buffer_used = 0;
2360
2361 for(bii = 0; bii < writer -> sorted_batch_id; bii++){
2362 char tfp[MAX_FILE_NAME_LENGTH+40];
2363 current_min_fps[bii] = SUBREAD_MAX_ULONGLONG;
2364
2365 sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, bii);
2366 sb_fps[bii] = fopen(tfp,"rb");
2367 if(sb_fps[bii]!=NULL){
2368 current_min_fps[bii] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[bii]);
2369 if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2370 current_min = current_min_fps[bii];
2371 current_min_batch = bii;
2372 }
2373 }
2374 }
2375
2376 ArrayList * bins_list = NULL; // [Bin_Number_0, Bin_Number_1, ...]
2377 ArrayList * win16k_list = NULL; // [ Voffset_0, Voffset_1, ... ]
2378 HashTable * bin_chunks_table=NULL;
2379 SAMBAM_renew_BAItabs;
2380
2381 int chro_no = (int)(current_min >> 32);
2382 int old_chro_no = -1, last_written_BAI_chro = -1;;
2383
2384 int wlen = fwrite("BAI\1", 4, 1, writer -> BAI_fp);
2385 if(wlen < 1) assert(wlen >0);
2386 int thread_i, n_ref = writer -> chromosome_name_table -> numOfElements;
2387 subread_lock_t initlocks[writer -> threads];
2388 wlen = fwrite(&n_ref, 4, 1, writer -> BAI_fp);
2389 SAMBAM_write_empty_chros(0, chro_no);
2390
2391 writer -> block_no_p1_to_vpos_tab = HashTableCreate(100000);
2392 writer -> writer_threads = calloc(sizeof(struct SamBam_sorted_compressor_st), writer -> threads);
2393 for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2394 memset(&(writer -> writer_threads[thread_i].strm),0,sizeof(z_stream));
2395 deflateInit2(&(writer -> writer_threads[thread_i].strm), writer -> fastest_compression? SAMBAM_COMPRESS_LEVEL_FASTEST : SAMBAM_COMPRESS_LEVEL_NORMAL , Z_DEFLATED,
2396 SAMBAM_GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY);
2397 subread_init_lock(initlocks+thread_i);
2398 subread_lock_occupy(initlocks+thread_i);
2399
2400 void ** thparam = malloc(sizeof(void*)*3);
2401 thparam[0] = writer;
2402 thparam[1] = NULL + thread_i;
2403 thparam[2] = initlocks+thread_i;
2404 pthread_create(&writer -> writer_threads[thread_i].thread_stub, NULL,SamBam_writer_sorted_compress,thparam);
2405 }
2406 for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2407 subread_lock_occupy(initlocks+thread_i);
2408 subread_destroy_lock(initlocks+thread_i);
2409 }
2410
2411 SAMBAM_reset_sorting_writer;
2412 while(1){ // GO THROUGH EACH READ and write into BAM
2413 //if(current_min_batch<0)SUBREADprintf("FINALTEST: OLD/NEW = %d , %d\n", old_chro_no ,chro_no);
2414 if(old_chro_no >=0 && chro_no != old_chro_no){ // if there is old_chro && if has to write. Note: old_chro_no<0 means we reached the "unmapped" part of the sorted temp files
2415 //SUBREADprintf("\n====== W1CHR %d =====\n", old_chro_no);
2416 SamBam_write_BAI_for_1chr(writer, &bin_chunks_table, &bins_list, &win16k_list);
2417 last_written_BAI_chro = old_chro_no;
2418 SAMBAM_write_empty_chros(old_chro_no +1, chro_no < 0?n_ref:chro_no);
2419 SAMBAM_renew_BAItabs;
2420 SAMBAM_reset_sorting_writer;
2421 }
2422
2423 if(current_min_batch>-1){
2424 SamBam_writer_sort_bins_to_BAM_write_1R(writer, sb_fps[current_min_batch], bin_chunks_table, bins_list, win16k_list, chro_no);
2425 current_min_fps[current_min_batch] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[current_min_batch]);
2426 current_min = SUBREAD_MAX_ULONGLONG;
2427 current_min_batch = -1;
2428 old_chro_no = chro_no;
2429 }else{
2430 SamBam_write_sorted_thread_collect(writer);
2431 break; // all finished. quit.
2432 }
2433
2434
2435 for(bii = 0; bii < writer -> sorted_batch_id; bii++){
2436 if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2437 current_min = current_min_fps[bii];
2438 current_min_batch = bii;
2439 }
2440 }
2441 chro_no = (int)(current_min >> 32);
2442 }
2443
2444 for(bii = 0; bii < writer -> sorted_batch_id; bii++){
2445 if(!sb_fps[bii]) continue;
2446
2447 char tfp[MAX_FILE_NAME_LENGTH+40];
2448 sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, bii);
2449 fclose(sb_fps[bii]);
2450 unlink(tfp);
2451 }
2452 if(bin_chunks_table != NULL) {
2453 HashTableDestroy(bin_chunks_table);
2454 ArrayListDestroy(bins_list);
2455 ArrayListDestroy(win16k_list);
2456 }
2457
2458 terminate_workers(&writer->sorted_notifier);
2459 for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2460 pthread_join(writer -> writer_threads[thread_i].thread_stub, NULL);
2461 deflateEnd(&(writer -> writer_threads[thread_i].strm));
2462 }
2463 HashTableDestroy(writer -> block_no_p1_to_vpos_tab);
2464
2465 free(writer->writer_threads);
2466 free(current_min_fps);
2467 free(sb_fps);
2468 }
2469
SamBam_writer_finalise_thread(SamBam_Writer * writer,int thread_id)2470 void SamBam_writer_finalise_thread(SamBam_Writer * writer, int thread_id){
2471 if(writer -> threads < 2){
2472 if(writer -> sort_reads_by_coord){
2473 SamBam_writer_sort_buff_one_write(writer, writer ->chunk_buffer, writer -> chunk_buffer_used, -1);
2474 writer -> chunk_buffer_used= 0;
2475 }else{
2476 if(writer -> chunk_buffer_used) SamBam_writer_add_chunk(writer, thread_id);
2477 }
2478 }else{
2479 if(writer -> sort_reads_by_coord){
2480 SamBam_writer_sort_buff_one_write(writer, writer -> threads_chunk_buffer[thread_id], writer -> threads_chunk_buffer_used[thread_id], thread_id);
2481 writer -> threads_chunk_buffer_used [thread_id] = 0;
2482 }else{
2483 if(writer -> threads_chunk_buffer_used[thread_id]) SamBam_writer_add_chunk(writer, thread_id);
2484 }
2485 }
2486 }
2487
SamBam_writer_finalise_one_thread(SamBam_Writer * writer)2488 void SamBam_writer_finalise_one_thread(SamBam_Writer * writer){
2489 if(writer -> threads < 2){
2490 if(writer -> sort_reads_by_coord){
2491 if(writer -> chunk_buffer_used>0){
2492 SamBam_writer_sort_buff_one_write(writer, writer -> chunk_buffer, writer -> chunk_buffer_used, -1);
2493 writer -> chunk_buffer_used = 0;
2494 }
2495 }else{
2496 if(writer -> chunk_buffer_used)
2497 SamBam_writer_add_chunk(writer, -1);
2498 }
2499 }
2500 }
2501