1 /***************************************************************
2 
3    The Subread software package is free software package:
4    you can redistribute it and/or modify it under the terms
5    of the GNU General Public License as published by the
6    Free Software Foundation, either version 3 of the License,
7    or (at your option) any later version.
8 
9    Subread is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty
11    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12 
13    See the GNU General Public License for more details.
14 
15    Authors: Drs Yang Liao and Wei Shi
16 
17   ***************************************************************/
18 
19 /***************************************************************
20 
21 	The SamBam_reg2bin function was derived from the BAM
22     specification. (The SAM Format Specication Working
23     Group, September 7, 2011)
24 
25   ***************************************************************/
26 
27 
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <assert.h>
32 #include <ctype.h>
33 #include <unistd.h>
34 #include "subread.h"
35 #include "core.h"
36 #include "HelperFunctions.h"
37 #include "gene-algorithms.h"
38 #include "sambam-file.h"
39 #include "input-files.h"
40 #define Z_DEFAULT_MEM_LEVEL  8
41 
SamBam_fetch_next_chunk(SamBam_FILE * fp)42 int SamBam_fetch_next_chunk(SamBam_FILE *fp)
43 {
44 	int x1, room =  SAMBAM_INPUT_STREAM_SIZE - ( fp -> input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr);
45 
46 	if(room < 65536)
47 		return -1;
48 
49 
50 	for(x1=0; x1 < fp->input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr; x1 ++)
51 	{
52 		fp -> input_binary_stream_buffer [x1] = fp -> input_binary_stream_buffer [x1 + fp->input_binary_stream_read_ptr - fp -> input_binary_stream_buffer_start_ptr];
53 	}
54 	fp -> input_binary_stream_buffer_start_ptr = fp->input_binary_stream_read_ptr;
55 
56 	char * in_buff = malloc(65537);
57 	unsigned int real_len = 0;
58 	int ret, have = 0;
59 
60 	while (1){
61 			int nchunk = 0;
62 			ret = PBam_get_next_zchunk(fp -> os_file, in_buff, 65536, & real_len);
63 			if(ret > 0)
64 				nchunk = SamBam_unzip(fp -> input_binary_stream_buffer + fp->input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr + have, 65536 , in_buff , ret, 0);
65 			else if(ret == -2){
66 				SUBREADputs("ERROR: BAM format is broken.");
67 				return -2;
68 			}
69 
70 			//printf("RET=%d; CHK=%d\n", ret, nchunk);
71 
72 			if(nchunk>0)
73 				have += nchunk;
74 			if(have > 3000) break;
75 			if(feof(fp->os_file)){
76 				fp->is_eof=1;
77 				break;
78 			}
79 	}
80 	free(in_buff);
81 
82 	fp -> input_binary_stream_write_ptr += have;
83 
84 	return have;
85 
86 }
87 
is_paired_end_BAM(char * fn)88 int is_paired_end_BAM(char * fn){
89 	FILE * fp = fopen(fn, "rb");
90 	if(!fp) return 0;
91 	unsigned char h2[2];
92 	int retvv = fread(h2, 1,2,fp);
93 	if(retvv <2) return 0;
94 	int is_bam = (h2[0]==31) && (h2[1]==139);
95 	fclose(fp);
96 
97 	SamBam_FILE * sambam_reader;
98 	char fline[3000];
99 	sambam_reader = SamBam_fopen(fn, is_bam?SAMBAM_FILE_BAM:SAMBAM_FILE_SAM);
100 	while(1){
101 		char * is_ret = SamBam_fgets(sambam_reader, fline, 2999, 1);
102 		if(!is_ret) return 0;
103 		if(fline[0]!='@') break;
104 	}
105 	char * tok = NULL;
106 	strtok_r(fline, "\t", &tok);
107 	int flags = atoi(strtok_r(NULL, "\t", &tok));
108 	SamBam_fclose(sambam_reader);
109 	return flags & 1;
110 }
SamBam_fopen(char * fname,int file_type)111 SamBam_FILE * SamBam_fopen(char * fname , int file_type)
112 {
113 	SamBam_FILE * ret = (SamBam_FILE *)malloc(sizeof(SamBam_FILE));
114 	memset(ret, 0, sizeof(SamBam_FILE));
115 	ret -> file_type = file_type;
116 
117 	if(file_type ==SAMBAM_FILE_SAM)
118 	{
119 		ret -> os_file = f_subr_open(fname, "rb");
120 		if(!ret -> os_file)
121 		{
122 			free(ret);
123 			return NULL;
124 		}
125 
126 		ret -> header_length = 0;
127 		char nch0=-1, nchold=-1;
128 
129 		while(!feof(ret -> os_file)){
130 			nchold = nch0;
131 			nch0=fgetc(ret -> os_file);
132 
133 			if(nchold=='\n' && nch0!='@') break;
134 
135 			if(nch0!='@' && ret -> header_length == 0){
136 				ret -> header_length = 0;
137 				break;
138 			}
139 			ret -> header_length++;
140 		}
141 		//SUBREADprintf("The SAM header has %d bytes\n", ret -> header_length);
142 		fseek(ret -> os_file,0,SEEK_SET);
143 	}
144 	else
145 	{
146 		ret -> os_file = f_subr_open(fname, "rb");
147 		if(ret -> os_file == NULL)
148 		{
149 			free(ret);
150 			return NULL;
151 		}
152 		unsigned char first_ch = fgetc(ret->os_file);
153 		unsigned char second_ch = fgetc(ret->os_file);
154 
155 		if(first_ch!=31 || second_ch!=139)
156 		{
157 			free(ret);
158 			SUBREADprintf("Not a BAM file! %d %d\n", first_ch, second_ch);
159 			return NULL;
160 		}
161 
162 		fseek(ret->os_file, 0, SEEK_SET);
163 
164 		ret -> input_binary_stream_buffer = (char *)malloc(SAMBAM_INPUT_STREAM_SIZE);
165 		ret -> input_binary_stream_read_ptr = 0;
166 		ret -> input_binary_stream_write_ptr = 0;
167 		ret -> input_binary_stream_buffer_start_ptr = 0;
168 
169 		ret -> bam_file_stage = BAM_FILE_STAGE_HEADER;
170 		ret -> is_eof = 0;
171 
172 		SB_FETCH(ret);
173 
174 		if(SB_EOF(ret))
175 		{
176 			free(ret->input_binary_stream_buffer);
177 			free(ret);
178 			SUBREADprintf("FEOF 0.\n");
179 			return NULL;
180 
181 		}
182 
183 		int magic_4 = 0;
184 		memcpy(&magic_4 , SB_READ(ret), 4);
185 		SB_RINC(ret, 4);
186 
187 		if(magic_4 != 21840194) // this number is the four bytes of "BAM1"
188 		{
189 			free(ret->input_binary_stream_buffer);
190 			free(ret);
191 			SUBREADprintf("FEOF 4 == %d.\n", magic_4);
192 			return NULL;
193 		}
194 
195 
196 		int l_text = 0;
197 		memcpy(&l_text, SB_READ(ret), 4);
198 		SB_RINC(ret, 4);
199 
200 		ret -> bam_file_next_section_start = ret -> input_binary_stream_read_ptr + l_text;
201 		ret -> header_length =  ret -> bam_file_next_section_start;
202 	}
203 	return ret;
204 }
205 
cigar_op_char(int ch)206 char cigar_op_char(int ch)
207 {
208 	if(ch<9)
209 		return "MIDNSHP=X"[ch];
210 	else
211 	{
212 		SUBREADprintf("Unknwon opt was found in the CIGAR string: '%c'.\n", ch);
213 		return 'M';
214 	}
215 }
216 
read_int_char(int ch)217 char read_int_char(int ch)
218 {
219 	assert(ch<16);
220 	return "=ACMGRSVTWYHKDBN"[ch];
221 }
222 
SamBam_fclose(SamBam_FILE * fp)223 void SamBam_fclose(SamBam_FILE * fp)
224 {
225 	if(fp->file_type==SAMBAM_FILE_SAM)
226 	{
227 		fclose(fp->os_file);
228 		free(fp);
229 	}
230 	else
231 	{
232 		fclose(fp->os_file);
233 		free(fp -> input_binary_stream_buffer);
234 		free(fp -> bam_chro_table);
235 		free(fp);
236 	}
237 }
238 
SamBam_feof(SamBam_FILE * fp)239 int SamBam_feof(SamBam_FILE * fp)
240 {
241 	if(fp->file_type ==SAMBAM_FILE_SAM)
242 		return feof(fp->os_file);
243 	else return SB_EOF(fp);
244 }
245 
SamBam_read_ref_info(SamBam_FILE * ret)246 void SamBam_read_ref_info(SamBam_FILE * ret)
247 {
248 	unsigned int ref_info_size = 0;
249 	ret ->bam_chro_table_size = 0;
250 	//printf("CKK0\n");
251 
252 	SB_FETCH(ret);
253 	if(SB_EOF(ret))
254 		return;
255 
256 	//printf("CKK1\n");
257 
258 	memcpy(&ref_info_size, SB_READ(ret),4);
259 	SB_RINC(ret, 4);
260 
261 	int xk1;
262 	ret -> bam_chro_table = malloc(sizeof(SamBam_Reference_Info) * ref_info_size);
263 	for(xk1=0;xk1<ref_info_size;xk1++)
264 	{
265 		SB_FETCH(ret);
266 
267 		if(SB_EOF(ret))
268 			break;
269 
270 		int ref_name_len = 0;
271 		memcpy(&ref_name_len, SB_READ(ret),4);
272 		SB_RINC(ret, 4);
273 
274 		int ref_readin_len = min(ref_name_len, BAM_MAX_CHROMOSOME_NAME_LEN-1);
275 		int ref_skip_len = ref_name_len - ref_readin_len;
276 
277 		memcpy(ret -> bam_chro_table[xk1].chro_name, SB_READ(ret), ref_readin_len);
278 		ret -> bam_chro_table[xk1].chro_name[ref_readin_len] = 0;
279 		SB_RINC(ret, ref_readin_len + ref_skip_len);
280 
281 		memcpy(&(ret -> bam_chro_table[xk1].chro_length), SB_READ(ret),4);
282 		SB_RINC(ret, 4);
283 
284 		//SUBREADprintf("CHRO[%d] : %s [%d]\n", xk1+1, ret -> bam_chro_table[xk1].chro_name , ret -> bam_chro_table[xk1].chro_length);
285 	}
286 	ret ->bam_chro_table_size = ref_info_size;
287 }
288 
SamBam_fgets(SamBam_FILE * fp,char * buff,int buff_len,int seq_needed)289 char * SamBam_fgets(SamBam_FILE * fp, char * buff , int buff_len, int seq_needed)
290 {
291 	if(fp->file_type==SAMBAM_FILE_SAM){
292 		char * ret = fgets(buff, buff_len, fp->os_file);
293 		int strlenbuff = 0;
294 		if(ret){
295 			strlenbuff = strlen(buff);
296 			///if(buff[0]!='@')
297 			//	if(strlenbuff < 100)SUBREADprintf("WRONG LOAD: '%s'\n", buff);
298 		}
299 		if(strlenbuff < 1 || ret == NULL) return NULL;
300 		else{
301 			if(ret[strlenbuff-1]!='\n')
302 			{
303 				while(1)
304 				{
305 					int ch = getc(fp->os_file);
306 					if(ch == EOF || ch == '\n')break;
307 				}
308 				ret[strlenbuff-1] = '\n';
309 			}
310 			if(fp -> is_paired_end < 10){
311 				if(buff[0]!='@'){
312 					int tabs = 0,x1=0, tmpi=0;
313 					for(x1 = 0; x1 < strlenbuff; x1++){
314 						if(buff[x1] == '\t'){
315 							if(tabs == 1){
316 		//						SUBREADprintf("TMPI_SAM = %d\n", tmpi);
317 								fp -> is_paired_end = 10 + (tmpi & 1);
318 								break;
319 							} else tabs ++;
320 						}else{
321 							if(tabs == 1)tmpi = tmpi * 10 + buff[x1]-'0';
322 						}
323 					}
324 				}
325 			}
326 
327 			if(buff[0] == '@'){
328 				fp -> header_length = ftello(fp->os_file) + strlenbuff+1;
329 			}
330 			return ret;
331 		}
332 	}
333 	else
334 	{
335 		int xk1;
336 		// decrypt the BAM mess.
337 		if(fp-> bam_file_stage == BAM_FILE_STAGE_HEADER)
338 		{
339 			char nch;
340 			xk1=0;
341 			SB_FETCH(fp);
342 			if(SB_EOF(fp))
343 				return NULL;
344 
345 			while(1)
346 			{
347 				if(fp -> input_binary_stream_read_ptr >= fp -> bam_file_next_section_start)
348 					break;
349 
350 				SB_FETCH(fp);
351 				nch = *(SB_READ(fp));
352 				SB_RINC(fp,1);
353 
354 				//printf("%c", nch);
355 				if(nch == '\r')continue;
356 				if(SB_EOF(fp)||nch == '\n' || nch <0) break;
357 				if(xk1 < buff_len-2)
358 				{
359 					buff[xk1]=nch;
360 					xk1++;
361 				}
362 			}
363 
364 			buff[xk1]='\n';
365 			buff[xk1+1]=0;
366 
367 			//SUBREADprintf("BUFF=%s\n========================================================================\n\n", buff);
368 			//printf("RL=%d , PTR %d >? RECORD_START %d\n\n\n", xk1, fp -> input_binary_stream_read_ptr , fp -> bam_file_next_section_start);
369 
370 			if(fp -> input_binary_stream_read_ptr >= fp -> bam_file_next_section_start)
371 			{
372 				SamBam_read_ref_info(fp);
373 				fp -> bam_file_stage = BAM_FILE_STAGE_ALIGNMENT;
374 				fp -> header_length = fp-> input_binary_stream_read_ptr;
375 			}
376 			return buff;
377 		}
378 		else
379 		{
380 			SamBam_Alignment *aln = &fp->aln_buff;
381 			int chunk_ptr = 0;
382 			SB_FETCH(fp);
383 			if(SB_EOF(fp)) return NULL;
384 
385 			fp -> is_paired_end = 10 + ((*(fp -> input_binary_stream_buffer + fp -> input_binary_stream_read_ptr - fp -> input_binary_stream_buffer_start_ptr + 18)) & 1);
386 			//SUBREADprintf("FLAG=%d\n",  *(fp -> input_binary_stream_buffer + fp -> input_binary_stream_read_ptr - fp -> input_binary_stream_buffer_start_ptr + 18) );
387 			int text_len = PBam_chunk_gets(SB_READ(fp) , &chunk_ptr, fp -> input_binary_stream_write_ptr - fp -> input_binary_stream_read_ptr , fp -> bam_chro_table, buff , buff_len, aln, seq_needed);
388 			SB_RINC(fp, chunk_ptr);
389 
390 			if(text_len>0) return buff;
391 			return NULL;
392 		}
393 	}
394 }
395 
396 
397 
PBam_get_next_zchunk(FILE * bam_fp,char * buffer,int buffer_length,unsigned int * real_len)398 int PBam_get_next_zchunk(FILE * bam_fp, char * buffer, int buffer_length, unsigned int * real_len)
399 {
400 	unsigned char ID1, ID2, CM, FLG;
401 	unsigned short XLEN;
402 	int BSIZE=-1, rlen, is_file_broken = 0, rrtv;
403 
404 	if(feof(bam_fp)) return -1;
405 
406 	rrtv = fread(&ID1, 1, 1, bam_fp);
407 	if(rrtv <1) return -1;
408 	rrtv = fread(&ID2, 1, 1, bam_fp);
409 	if(rrtv <1) return -1;
410 	rrtv = fread(&CM, 1, 1, bam_fp);
411 	if(rrtv <1) return -1;
412 	rrtv = fread(&FLG, 1, 1, bam_fp);
413 	if(rrtv <1) return -1;
414 	if(feof(bam_fp)) return -1;
415 
416 	if(ID1!=31 || ID2!=139 || CM!=8 || FLG!=4)
417 	{
418 		//SUBREADprintf("4CHR = %d, %d, %d, %d\n", ID1, ID2, CM, FLG);
419 		return -1;
420 	}
421 	fseeko(bam_fp, 6, SEEK_CUR);
422 	rrtv = fread(&XLEN,1, 2, bam_fp );
423 	if(rrtv < 2) return -1;
424 
425 	int XLEN_READ = 0;
426 	while(1)
427 	{
428 		unsigned char SI1, SI2;
429 		unsigned short SLEN, BSIZE_MID;
430 
431 		rrtv = fread(&SI1, 1, 1, bam_fp);
432 		if(rrtv <1) return -1;
433 		rrtv = fread(&SI2, 1, 1, bam_fp);
434 		if(rrtv <1) return -1;
435 		rlen = fread(&SLEN, 2, 1, bam_fp);
436 		if(rlen < 1) is_file_broken = 1;
437 
438 		if(SI1==66 && SI2== 67 && SLEN == 2)
439 		{
440 			rrtv = fread(&BSIZE_MID, 1,2 , bam_fp);
441 			if(rrtv <2) return -1;
442 			BSIZE = BSIZE_MID;
443 		}
444 		else	fseeko(bam_fp, SLEN, SEEK_CUR);
445 		XLEN_READ += SLEN + 4;
446 		if(XLEN_READ>=XLEN) break;
447 	}
448 
449 	if(BSIZE>19)
450 	{
451 		int CDATA_LEN = BSIZE - XLEN - 19;
452 		int CDATA_READING = min(CDATA_LEN, buffer_length);
453 		rrtv = fread(buffer, 1, CDATA_READING, bam_fp);
454 		if(rrtv < CDATA_READING) return -1;
455 		if(CDATA_READING<CDATA_LEN){
456 			SUBREADputs("ERROR: buffer insufficient");
457 			return -1;
458 		}
459 		fseeko(bam_fp, 4, SEEK_CUR);
460 		rlen = fread(&real_len, 4, 1, bam_fp);//Input SIZE (length of uncompressed data);uint32_t
461 		if(rlen < 1) is_file_broken = 1;
462 
463 	//	SUBREADprintf("read_data=%u\n", CDATA_LEN);
464 		if(is_file_broken){
465 			SUBREADputs("ERROR: the input BAM file is broken.");
466 		}
467 		return is_file_broken?-2:CDATA_READING;
468 	}
469 	else
470 		return -1;
471 }
472 
473 
474 // returns 0 if the header finished.
475 // returns 1 if the header is going on.
476 // returns -1 if error.
PBam_chunk_headers(char * chunk,int * chunk_ptr,int chunk_len,SamBam_Reference_Info ** bam_chro_table,int * table_size,int * table_items,int * state,int * header_txt_remainder,int * reminder_byte_len)477 int PBam_chunk_headers(char * chunk, int *chunk_ptr, int chunk_len, SamBam_Reference_Info ** bam_chro_table, int * table_size, int * table_items, int * state, int * header_txt_remainder, int * reminder_byte_len)
478 {
479 
480 	if((*state)  == 0)
481 	{
482 		unsigned int header_txt_len ;
483 		if(0!=memcmp("BAM\x1",chunk + (*chunk_ptr),4))
484 			return -1;
485 		(*chunk_ptr)+=4;	// MAGIC
486 		(*state) = 1;
487 
488 		memcpy(&header_txt_len, chunk + (*chunk_ptr),4);
489 		(*chunk_ptr)+=4;
490 		if(header_txt_len + 8 < chunk_len)
491 		{
492 			(* state) = 2;
493 			(*chunk_ptr) += header_txt_len;
494 		}
495 		else
496 		{
497 			(* state) = 1;
498 			(* header_txt_remainder) = header_txt_len - (chunk_len - 8);
499 			return 1;
500 		}
501 	}
502 
503 	if((*state) == 1)
504 	{
505 		if((*header_txt_remainder)<chunk_len)
506 		{
507 			(*state) = 2;
508 			(*chunk_ptr) += (*header_txt_remainder);
509 		}
510 		else if((*header_txt_remainder)==chunk_len)
511 		{
512 			(*state) = 2;
513 			return 1;
514 		}
515 		else
516 		{
517 			(* header_txt_remainder) -= (chunk_len);
518 			return 1;
519 		}
520 	}
521 
522 	if((*state) == 2 || (*state == 3))
523 	{
524 		int chrs, remainder_chrs;
525 		if((*state)==2)
526 		{
527 			memcpy(&chrs, chunk + (*chunk_ptr),4);
528 			(*chunk_ptr)+=4;
529 
530 			remainder_chrs = chrs;
531 		}
532 		else	remainder_chrs = (*header_txt_remainder);
533 
534 		while((*chunk_ptr) < chunk_len && remainder_chrs>0)
535 		{
536 			int chro_name_len;
537 			unsigned int chro_len;
538 			(*reminder_byte_len) = chunk_len - (*chunk_ptr);
539 
540 			if( (*chunk_ptr) < chunk_len-4)
541 			{
542 				memcpy(&chro_name_len, chunk + (*chunk_ptr),4);
543 				(*chunk_ptr)+=4;
544 				if( (*chunk_ptr) <= chunk_len-chro_name_len-4)
545 				{
546 					char * chro_name = chunk + (*chunk_ptr);
547 					(*chunk_ptr)+=chro_name_len;
548 					memcpy(&chro_len, chunk + (*chunk_ptr),4);
549 					(*chunk_ptr)+=4;
550 
551 					(*reminder_byte_len) =0;
552 
553 					//todo: insert item
554 					if(0==(* table_items))
555 					{
556 						(*table_size) = 50;
557 						(*bam_chro_table) = malloc(sizeof(SamBam_Reference_Info)*50);
558 					}
559 					else if((*table_size) <= (* table_items))
560 					{
561 						(*table_size) *= 2;
562 						(*bam_chro_table) = realloc((*bam_chro_table),sizeof(SamBam_Reference_Info)*(*table_size));
563 					}
564 
565 					SamBam_Reference_Info * new_event = (*bam_chro_table) + (* table_items);
566 					if(strlen(chro_name)>=BAM_MAX_CHROMOSOME_NAME_LEN) chro_name[BAM_MAX_CHROMOSOME_NAME_LEN-1]=0;
567 					strcpy(new_event->chro_name, chro_name);
568 					new_event -> chro_length = chro_len;
569 
570 					(* table_items)++;
571 					//SUBREADprintf("CHRO %d/%d added\n", (* table_items),(remainder_chrs));
572 					remainder_chrs --;
573 				}
574 				else break;
575 			}
576 			else break;
577 
578 		}
579 
580 		if(remainder_chrs)
581 		{
582 			(*state) = 3;
583 			(*header_txt_remainder) = remainder_chrs;
584 			return 1;
585 		}
586 		else{
587 			(*state) = 4;
588 			return 0;
589 		}
590 	}
591 	return -1;
592 }
593 
convert_BAM_binary_to_SAM(SamBam_Reference_Info * chro_table,char * bam_bin,char * sam_txt)594 int convert_BAM_binary_to_SAM( SamBam_Reference_Info * chro_table, char * bam_bin, char * sam_txt ){
595 	int bin_len = 0;
596 	memcpy(&bin_len, bam_bin, 4);
597 	bin_len += 4;
598 
599 	int sam_ptr = 0, tmpint = 0;
600 	sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", bam_bin+36);
601 
602 	memcpy(&tmpint, bam_bin + 16 ,4);
603 	sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", (tmpint >> 16) & 0xffff);
604 	int cigar_opts = tmpint & 0xffff;
605 
606 	memcpy(&tmpint, bam_bin + 4  ,4);
607 	int r1chro = tmpint;
608 	sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", tmpint<0?"*":chro_table[tmpint].chro_name);
609 	memcpy(&tmpint, bam_bin + 8  ,4);
610 	sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", tmpint+1);
611 	memcpy(&tmpint, bam_bin + 12 ,4);
612 	sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", (tmpint >> 8) & 0xff);
613 	int name_len = tmpint & 0xff;
614 	int cigar_i;
615 	for(cigar_i = 0; cigar_i < cigar_opts; cigar_i ++){
616 		unsigned int cigarint=0;
617 		memcpy(&cigarint, bam_bin + name_len + 36 + cigar_i * 4,4);
618 		sam_ptr += sprintf(sam_txt + sam_ptr, "%u%c", cigarint >> 4, "MIDNSHP=X"[cigarint&0xf]);
619 	}
620 	sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", cigar_i<1?"*":"");
621 
622 	memcpy(&tmpint, bam_bin + 24, 4);
623 	//SUBREADprintf("CHRO_IDX=%d\n", tmpint);
624 	sam_ptr += sprintf(sam_txt + sam_ptr, "%s\t", tmpint<0?"*":((tmpint == r1chro)?"=":chro_table[tmpint].chro_name));
625 
626 	memcpy(&tmpint, bam_bin + 28, 4);
627 	sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", tmpint+1);
628 
629 	memcpy(&tmpint, bam_bin + 32, 4);
630 	sam_ptr += sprintf(sam_txt + sam_ptr, "%d\t", tmpint);
631 
632 	int seq_len;
633 	memcpy(&seq_len, bam_bin + 20,4);
634 	int seqi, flex_ptr=name_len + 36 +  cigar_opts * 4;
635 	for(seqi=0; seqi<seq_len; seqi++){
636 		sam_txt[sam_ptr++]="=ACMGRSVTWYHKDBN"[ (bam_bin[flex_ptr] >> ( 4*!(seqi %2) )) & 15 ];
637 		if(seqi %2) flex_ptr++;
638 	}
639 	sam_txt[sam_ptr++]='\t';
640 	if(seqi %2) flex_ptr++;
641 	for(seqi=0; seqi<seq_len; seqi++){
642 		unsigned char nch = (unsigned char) bam_bin[flex_ptr++];
643 		if(nch!=0xff||seqi == 0)
644 				sam_txt[sam_ptr++]=nch==0xff?'*':(nch+33);
645 	}
646 	sam_txt[sam_ptr++]='\t';
647 
648 	while(flex_ptr < bin_len){
649 		sam_txt[sam_ptr++]=bam_bin[flex_ptr++];
650 		sam_txt[sam_ptr++]=bam_bin[flex_ptr++];
651 		sam_txt[sam_ptr++]=':';
652 		char tagtype = bam_bin[flex_ptr++];
653 
654 		if(tagtype == 'B'){
655 			char elemtype = bam_bin[flex_ptr++];
656 			int elem_no=0, is_int_type = 0, type_bytes = 0, is_signed = 0;
657 			memcpy(&elem_no, bam_bin + flex_ptr, 4);
658 			flex_ptr += 4;
659 			sam_txt[sam_ptr++]='B';
660 			sam_txt[sam_ptr++]=':';
661 			sam_txt[sam_ptr++]=elemtype;
662 			sam_txt[sam_ptr++]=',';
663 
664 			if(elemtype == 'i' || elemtype == 'I'){
665 				is_int_type = 1;
666 				type_bytes = 4;
667 				is_signed = elemtype == 'i' ;
668 			}else if(elemtype == 's' || elemtype == 'S'){
669 				is_int_type = 1;
670 				type_bytes = 2;
671 				is_signed = elemtype == 's' ;
672 			}else if(elemtype == 'c' || elemtype == 'C'){
673 				is_int_type = 1;
674 				type_bytes = 1;
675 				is_signed = elemtype == 's' ;
676 			}else if(elemtype == 'f'){
677 				type_bytes = 4;
678 			}
679 
680 			int elemi;
681 			for(elemi =0; elemi < elem_no; elemi++){
682 				if(is_int_type){
683 					int tagval = 0;
684 					memcpy(&tagval,  bam_bin + flex_ptr, type_bytes);
685 					long long printv = is_signed?tagval:( (unsigned int) tagval );
686 					if(elemtype == 'i') sam_ptr += sprintf(sam_txt + sam_ptr, "%d,", (int)printv);
687 					if(elemtype == 'I') sam_ptr += sprintf(sam_txt + sam_ptr, "%u,", (unsigned int)printv);
688 
689 					if(elemtype == 's') sam_ptr += sprintf(sam_txt + sam_ptr, "%d,", (short)printv);
690 					if(elemtype == 'S') sam_ptr += sprintf(sam_txt + sam_ptr, "%u,", (unsigned short)printv);
691 
692 					if(elemtype == 'c') sam_ptr += sprintf(sam_txt + sam_ptr, "%d,", (char)printv);
693 					if(elemtype == 'C') sam_ptr += sprintf(sam_txt + sam_ptr, "%u,", (unsigned char)printv);
694 				}else{
695 					float tagval = 0;
696 					memcpy(&tagval,  bam_bin + flex_ptr, type_bytes);
697 					sam_ptr += sprintf(sam_txt + sam_ptr, "%f,", tagval);
698 				}
699 				flex_ptr += type_bytes;
700 			}
701 
702 			sam_txt[sam_ptr-1] = '\t';
703 			sam_txt[sam_ptr] = 0;
704 		}else{
705 				int is_int_type = 0, is_float_type = 0, type_bytes = 0, is_string_type = 0, is_char_type = 0, is_signed = 0;
706 				if(tagtype == 'i' || tagtype == 'I'){
707 					is_int_type = 1;
708 					type_bytes = 4;
709 					is_signed = tagtype == 'i' ;
710 				}else if(tagtype == 's' || tagtype == 'S'){
711 					is_int_type = 1;
712 					type_bytes = 2;
713 					is_signed = tagtype == 's' ;
714 				}else if(tagtype == 'c' || tagtype == 'C'){
715 					is_int_type = 1;
716 					type_bytes = 1;
717 					is_signed = tagtype == 's' ;
718 				}else if(tagtype == 'f'){
719 					is_float_type = 1;
720 					type_bytes = 4;
721 				}else if(tagtype == 'Z' || tagtype == 'H'){
722 					is_string_type = 1;
723 					while(bam_bin[flex_ptr+(type_bytes ++)]);
724 				}else if(tagtype == 'A'){
725 					is_char_type = 1;
726 					type_bytes = 1;
727 				}
728 
729 
730 				sam_txt[sam_ptr++]=is_int_type?'i':tagtype;
731 				sam_txt[sam_ptr++]=':';
732 
733 				if(is_int_type){
734 					int tagval = 0;
735 					memcpy(&tagval,  bam_bin + flex_ptr, type_bytes);
736 					long long printv = is_signed?tagval:( (unsigned int) tagval );
737 					#ifdef __MINGW32__
738 					sam_ptr += sprintf(sam_txt + sam_ptr, "%I64d\t", printv);
739 					#else
740 					sam_ptr += sprintf(sam_txt + sam_ptr, "%lld\t", printv);
741 					#endif
742 				}else if(is_string_type){
743 					// type_bytes includes \0
744 					memcpy(sam_txt + sam_ptr, bam_bin + flex_ptr, type_bytes -1);
745 					sam_txt[ sam_ptr + type_bytes -1 ] = '\t';
746 
747 					//sam_txt[ sam_ptr + type_bytes +1]=0;
748 					//SUBREADprintf("STR_LEN=%d\tSTR=%s\n", type_bytes-1, sam_txt + sam_ptr);
749 					sam_ptr += type_bytes;
750 				}else if(is_float_type){
751 					float tagval = 0;
752 					memcpy(&tagval,  bam_bin + flex_ptr, type_bytes);
753 					sam_ptr += sprintf(sam_txt + sam_ptr, "%f\t", tagval);
754 				}else if(is_char_type){
755 					sam_txt[ sam_ptr++ ] = bam_bin[flex_ptr];
756 					sam_txt[ sam_ptr++ ] = '\t';
757 				}
758 				flex_ptr += type_bytes;
759 		}
760 	}
761 
762 	sam_txt[sam_ptr-1]=0; //last '\t'
763 	return sam_ptr-1;
764 }
765 
PBam_chunk_gets(char * chunk,int * chunk_ptr,int chunk_limit,SamBam_Reference_Info * bam_chro_table,char * buff,int buff_len,SamBam_Alignment * aln,int seq_needed)766 int PBam_chunk_gets(char * chunk, int *chunk_ptr, int chunk_limit, SamBam_Reference_Info * bam_chro_table, char * buff , int buff_len, SamBam_Alignment*aln, int seq_needed)
767 {
768 	int xk1;
769 	// decrypt the BAM mess.
770 	unsigned int block_size;
771 	if((*chunk_ptr) +4> chunk_limit) return -1;
772 
773 	memcpy(&block_size, chunk+(*chunk_ptr), 4);
774 	//SUBREADprintf("PBSIZE=%u\n", block_size);
775 	(*chunk_ptr)+=4;
776 	unsigned int next_start = block_size+(*chunk_ptr);
777 
778 	int ref_id;
779 	memcpy(&ref_id, chunk+(*chunk_ptr), 4);
780 	(*chunk_ptr)+=4;
781 
782 	if(ref_id == -1) aln -> chro_name = NULL;
783 	else aln -> chro_name = bam_chro_table[ref_id].chro_name;
784 
785 	memcpy(&(aln -> chro_offset), chunk+(*chunk_ptr), 4);
786 	(*chunk_ptr)+=4;
787 
788 	unsigned int comb1;
789 	memcpy(&comb1, chunk+(*chunk_ptr), 4);
790 	(*chunk_ptr)+=4;
791 
792 	aln -> mapping_quality = 0xff & (comb1 >> 8);
793 
794 	unsigned int comb2;
795 	memcpy(&comb2, chunk+(*chunk_ptr), 4);
796 	(*chunk_ptr)+=4;
797 
798 	aln -> flags = 0xffff&(comb2 >> 16);
799 
800 	unsigned int read_len;
801 	memcpy(&read_len, chunk+(*chunk_ptr), 4);
802 
803 	(*chunk_ptr)+=4;
804 
805 	unsigned int mate_ref_id;
806 	memcpy(&mate_ref_id, chunk+(*chunk_ptr), 4);
807 	(*chunk_ptr)+=4;
808 
809 	if(mate_ref_id == -1) aln -> mate_chro_name = NULL;
810 	else aln -> mate_chro_name = bam_chro_table[mate_ref_id].chro_name;
811 
812 	memcpy(&(aln -> mate_chro_offset), chunk+(*chunk_ptr), 4);
813 	(*chunk_ptr)+=4;
814 
815 	memcpy(&(aln -> templete_length), chunk+(*chunk_ptr), 4);
816 	(*chunk_ptr)+=4;
817 
818 	int read_name_len = comb1 & 0xff;
819 	assert(read_name_len < BAM_MAX_READ_NAME_LEN);
820 
821 	memcpy(aln -> read_name, chunk+(*chunk_ptr), read_name_len);
822 	aln -> read_name[read_name_len] = 0;
823 	(*chunk_ptr)+=read_name_len;
824 
825 	int cigar_ops = comb2 & 0xffff;
826 	aln -> cigar[0]=0;
827 	for(xk1=0; xk1<cigar_ops;xk1++)
828 	{
829 		char cigar_piece_buf[BAM_MAX_CIGAR_LEN];
830 		unsigned int cigar_piece;
831 
832 		if((*chunk_ptr) +4 > chunk_limit) return -1;
833 		memcpy(&cigar_piece,  chunk+(*chunk_ptr),4);
834 		(*chunk_ptr)+=4;
835 
836 		sprintf(cigar_piece_buf, "%u%c", cigar_piece>>4, cigar_op_char(cigar_piece&0xf));
837 		if(strlen(cigar_piece_buf)+strlen(aln->cigar)<BAM_MAX_CIGAR_LEN-1)
838 			strcat(aln->cigar, cigar_piece_buf);
839 		else
840 		{
841 			SUBREADprintf("WARNING: cigar string is too long to the buffer.\n");
842 			SUBREADprintf("Please only use the compressed BAM format.\n");
843 			assert(0);
844 			return -1;
845 		}
846 	}
847 
848 	char read_2_seq = 0;
849 	int seq_qual_bytes = read_len + (read_len /2)+(read_len%2);
850 
851 	if(seq_needed)
852 		memcpy( aln-> buff_for_seq, chunk+(*chunk_ptr), seq_qual_bytes);
853 	(*chunk_ptr) += seq_qual_bytes;
854 
855 	char extra_tags [CORE_ADDITIONAL_INFO_LENGTH];
856 	extra_tags[0]=0;
857 	int extra_len = 0;
858 	while( (*chunk_ptr) < next_start)
859 	{
860 		char extag[2];
861 		char extype;
862 		int delta, need_tag = 1;
863 		memcpy(extag,  chunk+(*chunk_ptr), 2);
864 		extype = chunk[2+(*chunk_ptr)];
865 		(*chunk_ptr)+=3;
866 		//fprintf(stderr, "COL_EXTYPE: %c\n", extype);
867 		if(extype == 'Z' || extype == 'H')
868 		{
869 			delta = 0;
870 			// 'Z' columns are NULL-terminated.
871 			while(chunk[delta + (*chunk_ptr)]) delta++;
872 			delta += 1;
873 		}
874 		else if(extype == 'A' || extype == 'c' || extype=='C') delta=1;
875 		else if(extype == 'i' || extype=='I' || extype == 'f') delta=4;
876 		else if(extype == 's' || extype=='S') delta=2;
877 		else if(extype == 'B')
878 		{
879 			extype = chunk[(*chunk_ptr)];
880 		//	fprintf(stderr, "B_EXTYPE: %c\n", extype);
881 
882 			(*chunk_ptr)++;
883 			if(extype == 'A' || extype=='Z') delta=1;
884 			else if(extype == 'c' || extype=='C') delta=1;
885 			else if(extype == 'i' || extype=='I' || extype == 'f') delta=4;
886 			else if(extype == 's' || extype=='S') delta=2;
887 			else break;
888 
889 			int array_len;
890 			need_tag = 0;
891 			memcpy(&array_len, chunk+(*chunk_ptr), 4);
892 			(*chunk_ptr)+=4;
893 			delta *= array_len;
894 		}
895 		else{
896 		//	fprintf(stderr, "NO_EXTYPE: %c\n", extype);
897 			break;
898 		}
899 
900 		if(need_tag){
901 			if(extype == 'c' || extype=='C' || extype == 'i' || extype=='I' || extype == 's' || extype=='S'){
902 				int tmpi = 0;
903 				memcpy(&tmpi, chunk+(*chunk_ptr),delta);
904 				if(tmpi >= 0 && extra_len < CORE_ADDITIONAL_INFO_LENGTH - 18){
905 					int sret = sprintf(extra_tags + strlen(extra_tags), "\t%c%c:i:%d", extag[0], extag[1], tmpi);
906 					extra_len += sret;
907 				}
908 			}else if(extype == 'Z'){
909 				if(extra_len < CORE_ADDITIONAL_INFO_LENGTH - 7 - delta){
910 					sprintf(extra_tags + strlen(extra_tags), "\t%c%c:Z:", extag[0], extag[1]);
911 					extra_len += 6;
912 					*(extra_tags + strlen(extra_tags)+delta-1) = 0;
913 					memcpy(extra_tags + strlen(extra_tags), chunk + (*chunk_ptr), delta - 1);
914 					extra_len += delta - 1;
915 				}
916 			}else if(extype == 'A'){
917 				if(extra_len < CORE_ADDITIONAL_INFO_LENGTH - 8){
918 					int sret = sprintf(extra_tags + strlen(extra_tags), "\t%c%c:A:%c", extag[0], extag[1], *(chunk + *chunk_ptr) );
919 					extra_len += sret;
920 				}
921 			}
922 		}
923 
924 		if((*chunk_ptr) + delta > chunk_limit) return -1;
925 		(*chunk_ptr)+=delta;
926 
927 	}
928 
929 	if(next_start > chunk_limit) return -1;
930 	(*chunk_ptr) = next_start;
931 
932 	if(seq_needed)
933 	{
934 		for(xk1=0;xk1<read_len;xk1++)
935 		{
936 			if(xk1 %2 == 0){
937 				read_2_seq = aln-> buff_for_seq[xk1/2];
938 			}
939 			if(xk1 < BAM_MAX_READ_LEN)
940 				aln -> sequence[xk1] = read_int_char(0xf&(read_2_seq >> (xk1%2?0:4)));
941 		}
942 		aln -> sequence[min(BAM_MAX_READ_LEN-1,read_len)] = 0;
943 		if(read_len >= BAM_MAX_READ_LEN-1)
944 			SUBREADprintf("WARNING: read is too long to the buffer\n");
945 
946 
947 		for(xk1=0;xk1<read_len;xk1++)
948 		{
949 			read_2_seq = aln -> buff_for_seq[(read_len /2)+(read_len%2) + xk1] ;
950 			if(xk1 < BAM_MAX_READ_LEN)
951 				aln -> seq_quality[xk1] = 33+read_2_seq;
952 		}
953 		aln -> seq_quality[min(BAM_MAX_READ_LEN-1,read_len)] = 0;
954 		if(aln -> seq_quality[0]==' ')
955 			strcpy(aln -> seq_quality, "*");
956 	}
957 	else
958 	{
959 		aln -> sequence[0]='N';
960 		aln -> sequence[1]=0;
961 		aln -> seq_quality[0]='#';
962 		aln -> seq_quality[1]=0;
963 	}
964 
965 	char * chro_name = "*";
966 	char * cigar = "*";
967 	unsigned int chro_offset = 0;
968 
969 	if(aln -> chro_name){
970 		chro_name = aln -> chro_name;
971 		chro_offset = aln -> chro_offset+1;
972 		if(aln -> cigar[0])
973 			cigar = aln -> cigar;
974 	}
975 
976 	char * mate_chro_name = "*";
977 	unsigned int mate_chro_offset = 0;
978 	if(aln -> mate_chro_name)
979 	{
980 		if(aln -> mate_chro_name == chro_name) mate_chro_name = "=";
981 		else
982 			mate_chro_name = aln -> mate_chro_name;
983 		mate_chro_offset = aln -> mate_chro_offset+1;
984 	}
985 
986 	long long int templete_length = aln -> templete_length;
987 
988 
989 	//fprintf(stderr, "HN_TAG=%d\n", nh_val	);
990 
991 	#ifdef __MINGW32__
992 	int plen = snprintf(buff, buff_len-1, "%s\t%u\t%s\t%u\t%d\t%s\t%s\t%u\t%I64d\t%s\t%s%s\n%c", aln -> read_name, aln -> flags , chro_name, chro_offset, aln -> mapping_quality, cigar, mate_chro_name, mate_chro_offset, templete_length, aln -> sequence , aln -> seq_quality, extra_tags, 0);
993 	#else
994 	int plen = snprintf(buff, buff_len-1, "%s\t%u\t%s\t%u\t%d\t%s\t%s\t%u\t%lld\t%s\t%s%s\n%c", aln -> read_name, aln -> flags , chro_name, chro_offset, aln -> mapping_quality, cigar, mate_chro_name, mate_chro_offset, templete_length, aln -> sequence , aln -> seq_quality, extra_tags, 0);
995 	#endif
996 	//fprintf(stderr,"%s", buff);
997 
998 	return plen;
999 }
1000 
1001 
PBum_load_header(FILE * bam_fp,SamBam_Reference_Info ** chro_tab,char * remainder_reads_data,int * remainder_reads_data_len)1002 int PBum_load_header(FILE * bam_fp, SamBam_Reference_Info** chro_tab, char * remainder_reads_data , int * remainder_reads_data_len)
1003 {
1004 	char * CDATA = malloc(80010);
1005 	char * PDATA = malloc(1000000);
1006 
1007 	int chro_tab_size = 0, chro_tab_items = 0, chro_tab_state = 0, header_remainder = 0, remainder_byte_len = 0, bam_is_broken = 0;
1008 	z_stream strm;
1009 	while(1)
1010 	{
1011 		unsigned int real_len = 0;
1012 		int rlen = PBam_get_next_zchunk(bam_fp,CDATA,80000, & real_len);
1013 		if(rlen<0){
1014 			bam_is_broken = (rlen == -2);
1015 			if(bam_is_broken){
1016 				SUBREADprintf("BAM file format error.\n");
1017 				free(CDATA);
1018 				free(PDATA);
1019 				return -1;
1020 			}
1021 			break;
1022 		}
1023 
1024 		strm.zalloc = Z_NULL;
1025 		strm.zfree = Z_NULL;
1026 		strm.opaque = Z_NULL;
1027 		strm.avail_in = 0;
1028 		strm.next_in = Z_NULL;
1029 		int ret = inflateInit2(&strm, SAMBAM_GZIP_WINDOW_BITS);
1030 		if (ret != Z_OK)
1031 		{
1032 			free(CDATA);
1033 			free(PDATA);
1034 			return -1;
1035 		}
1036 		strm.avail_in = (unsigned int)rlen;
1037 		strm.next_in = (unsigned char *)CDATA;
1038 
1039 
1040 		strm.avail_out = 1000000 - remainder_byte_len;
1041 		strm.next_out = (unsigned char *)(PDATA + remainder_byte_len);
1042 		ret = inflate(&strm, Z_FINISH);
1043 		int have = 1000000 - strm.avail_out;
1044 		int PDATA_ptr=0;
1045 
1046 		inflateEnd(&strm);
1047 
1048 		ret = PBam_chunk_headers(PDATA, &PDATA_ptr, have, chro_tab, &chro_tab_size, &chro_tab_items, &chro_tab_state, &header_remainder,&remainder_byte_len);
1049 		memcpy(PDATA , PDATA + have - remainder_byte_len, remainder_byte_len);
1050 		if(ret<0)
1051 		{
1052 			SUBREADprintf("Header error.\n");
1053 			free(CDATA);
1054 			free(PDATA);
1055 			return -1;
1056 		}
1057 		else if(ret == 0)
1058 		{
1059 			//SUBREADprintf("Header loaded = %d\n", (chro_tab_items));
1060 			remainder_byte_len=0;
1061 		}
1062 		if(chro_tab_state>3){
1063 			if(remainder_reads_data && PDATA_ptr < have)
1064 			{
1065 				memcpy(remainder_reads_data , PDATA + PDATA_ptr, have - PDATA_ptr);
1066 				(*remainder_reads_data_len) =  have - PDATA_ptr ;
1067 			}
1068 			break;
1069 		}
1070 	}
1071 	free(CDATA);
1072 	free(PDATA);
1073 	return 0;
1074 }
1075 
test_bamview(int argc,char ** argv)1076 int test_bamview(int argc, char ** argv)
1077 {
1078 	if(argc>1)
1079 	{
1080 		SamBam_FILE * fp = SamBam_fopen(argv[1], SAMBAM_FILE_BAM);
1081 		assert(fp);
1082 		/*
1083 		while(1)
1084 		{
1085 			char buf[3000];
1086 			char * buf2 = SamBam_fgets(fp,buf, 3000);
1087 			//SUBREADprintf(">>%s<<\n",buf);
1088 			//if(buf2)
1089 			//	fwrite(buf,strlen(buf), 1, stdout);
1090 			//else break;
1091 		}
1092 		*/
1093 		SamBam_fclose(fp);
1094 	}
1095 	return 0;
1096 }
1097 
SamBam_writer_create(SamBam_Writer * writer,char * BAM_fname,int threads,int sort_reads_by_coord,int is_temp_BAM_file,char * tmpfname)1098 int SamBam_writer_create(SamBam_Writer * writer, char * BAM_fname, int threads, int sort_reads_by_coord, int is_temp_BAM_file, char * tmpfname)
1099 {
1100 	memset(writer, 0, sizeof(SamBam_Writer));
1101 
1102 	if(BAM_fname)
1103 	{
1104 		writer -> bam_fp = f_subr_open(BAM_fname, "wb");
1105 		if(sort_reads_by_coord){
1106 			char tname[MAX_FILE_NAME_LENGTH];
1107 			sprintf(tname, "%s.bai", BAM_fname);
1108 			writer -> BAI_fp = f_subr_open(tname, "wb");
1109 			worker_master_mutex_init(&writer->sorted_notifier, threads);
1110 		}
1111 		if(!writer -> bam_fp) return -1;
1112 	}
1113 	#ifdef MAKE_STANDALONE
1114 	else
1115 		writer -> bam_fp = stdout;
1116 	#endif
1117 
1118 	writer -> threads = threads;
1119 	writer -> sort_reads_by_coord = sort_reads_by_coord;
1120 	writer -> fastest_compression = is_temp_BAM_file;
1121 	writer -> compressed_chunk_buffer = malloc(70000);
1122 	writer -> chunk_buffer = malloc(70000);
1123 	writer -> chunk_buffer_max_size = 70000;
1124 	strcpy(writer -> tmpf_prefix, tmpfname);
1125 
1126 	if(threads>= 2){
1127 		int x1;
1128 		writer -> threads_chunk_buffer = malloc(sizeof(char *) * threads) ;
1129 		writer -> threads_chunk_buffer_compressed = malloc(sizeof(char *) * threads) ;
1130 		writer -> threads_chunk_buffer_used = malloc(sizeof(long long) * threads);
1131 		writer -> threads_output_stream = malloc(sizeof(z_stream) * threads);
1132 		writer -> threads_chunk_buffer_max_size = malloc(sizeof(long long) * threads);
1133 		memset(writer -> threads_chunk_buffer_used, 0, sizeof(long long) * threads);
1134 		for(x1 = 0; x1 < threads ; x1++){
1135 			writer -> threads_chunk_buffer [x1] = malloc(70000);
1136 			writer -> threads_chunk_buffer_compressed [x1] = malloc(70000);
1137 			writer -> threads_chunk_buffer_max_size[x1] = 70000;
1138 		}
1139 		subread_init_lock(&writer -> thread_bam_lock);
1140 	}
1141 	writer -> chromosome_name_table = HashTableCreate(1603);
1142 	writer -> chromosome_id_table = HashTableCreate(1603);
1143 	writer -> chromosome_len_table = HashTableCreate(1603);
1144 	writer -> header_plain_text_buffer = malloc(100000000);
1145 	writer -> header_plain_text_buffer_max = 100000000;
1146 	writer -> header_plain_text_buffer_used = 0;
1147 
1148 	//memset(writer -> header_plain_text_buffer , 0 , 100000000);
1149 	HashTableSetHashFunction(writer -> chromosome_name_table , fc_chro_hash);
1150 	HashTableSetKeyComparisonFunction(writer -> chromosome_name_table , fc_strcmp_chro);
1151 	HashTableSetDeallocationFunctions(writer -> chromosome_name_table , free, NULL);
1152 
1153 	return 0;
1154 }
1155 
SamBam_writer_chunk_header(SamBam_Writer * writer,int compressed_size)1156 void SamBam_writer_chunk_header(SamBam_Writer * writer, int compressed_size)
1157 {
1158 
1159 	// the four magic characters
1160 	fputc(31,  writer -> bam_fp);
1161 	fputc(139,  writer -> bam_fp);
1162 	fputc(8,  writer -> bam_fp);
1163 	fputc(4,  writer -> bam_fp);
1164 
1165 	time_t time_now = 0;
1166 	fwrite(&time_now,4,1, writer -> bam_fp);
1167 
1168 	int tmp_i;
1169 	// Extra flags and OS
1170 	fputc(0,  writer -> bam_fp);
1171 	fputc(0xff,  writer -> bam_fp);
1172 
1173 	// Extra length
1174 	tmp_i = 6;
1175 	fwrite(&tmp_i,2,1, writer -> bam_fp);
1176 
1177 
1178 	// SI1 and SI2 magic numbers, and SLEN
1179 	fputc(66,  writer -> bam_fp);
1180 	fputc(67,  writer -> bam_fp);
1181 	tmp_i = 2;
1182 	fwrite(&tmp_i,2,1, writer -> bam_fp);
1183 	tmp_i = compressed_size + 19 + 6;
1184 	fwrite(&tmp_i,2,1, writer -> bam_fp);
1185 }
1186 
SamBam_CRC32(char * dat,int len)1187 unsigned int SamBam_CRC32(char * dat, int len)
1188 {
1189 	unsigned int crc0 = crc32(0, NULL, 0);
1190 	unsigned int ret = crc32(crc0, (unsigned char *)dat, len);
1191 	return ret;
1192 }
1193 
SamBam_writer_add_chunk(SamBam_Writer * writer,int thread_no)1194 void SamBam_writer_add_chunk(SamBam_Writer * writer, int thread_no)
1195 {
1196 	int compressed_size ;
1197 	unsigned int CRC32;
1198 	z_stream * this_stream = thread_no < 0 ? &writer ->output_stream:writer -> threads_output_stream + thread_no;
1199 	long long * this_buffer_used = thread_no < 0 ? &writer ->chunk_buffer_used : writer -> threads_chunk_buffer_used + thread_no;
1200 	char * this_buffer = thread_no < 0 ? writer ->chunk_buffer: writer -> threads_chunk_buffer[thread_no];
1201 	char * this_compressed_chunk_buffer = thread_no < 0 ? writer ->compressed_chunk_buffer : writer -> threads_chunk_buffer_compressed[thread_no];
1202 
1203 	if(*this_buffer_used < 1){
1204 		// magic block with 0 byte data(EOF)
1205 		subread_lock_occupy(&writer -> thread_bam_lock);
1206 		fwrite( "\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00",1,28,writer -> bam_fp);
1207 		writer -> current_BAM_pos = ftello(writer -> bam_fp);
1208 		subread_lock_release(&writer -> thread_bam_lock);
1209 		return;
1210 	}
1211 
1212 	this_stream -> avail_out = 70000;
1213 	this_stream -> avail_in = (* this_buffer_used);
1214 	CRC32 = SamBam_CRC32(this_buffer , * this_buffer_used);
1215 
1216 	this_stream -> zalloc = Z_NULL;
1217 	this_stream -> zfree = Z_NULL;
1218 	this_stream -> opaque = Z_NULL;
1219 
1220 	deflateInit2(this_stream, writer -> fastest_compression? SAMBAM_COMPRESS_LEVEL_FASTEST : SAMBAM_COMPRESS_LEVEL_NORMAL, Z_DEFLATED,
1221 		SAMBAM_GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY);
1222 
1223 	this_stream -> next_in = (unsigned char *)this_buffer;
1224 	this_stream -> next_out = (unsigned char *)this_compressed_chunk_buffer;
1225 
1226 	deflate(this_stream, Z_FINISH);
1227 	deflateEnd(this_stream);
1228 
1229 	compressed_size = 70000 - this_stream -> avail_out;
1230 
1231 	subread_lock_occupy(&writer -> thread_bam_lock);
1232 	SamBam_writer_chunk_header(writer, compressed_size);
1233 	int chunk_write_size = fwrite(this_compressed_chunk_buffer, 1, compressed_size, writer -> bam_fp);
1234 	fwrite(&CRC32 , 4, 1, writer -> bam_fp);
1235 	fwrite(this_buffer_used , 4, 1, writer -> bam_fp);
1236 	writer -> current_BAM_pos = ftello(writer -> bam_fp);
1237 	subread_lock_release(&writer -> thread_bam_lock);
1238 
1239 	if(chunk_write_size < compressed_size){
1240 		if(!writer -> is_internal_error)SUBREADputs("ERROR: no space left in the output directory.");
1241 		writer -> is_internal_error = 1;
1242 	}
1243 
1244 	(* this_buffer_used) = 0;
1245 }
1246 
1247 double sambam_t1 = 0;
1248 
SamBam_writer_write_header(SamBam_Writer * writer)1249 void SamBam_writer_write_header(SamBam_Writer * writer)
1250 {
1251 	int header_ptr=0, header_block_start = 0;
1252 	while(header_ptr < writer->header_plain_text_buffer_used)
1253 	{
1254 		if(( header_ptr - header_block_start > 55000 || header_ptr >= writer->header_plain_text_buffer_used-1) && writer -> header_plain_text_buffer[header_ptr] == '\n')
1255 		{
1256 			writer -> chunk_buffer_used = 0;
1257 			if(header_block_start == 0)	// the very first block
1258 			{
1259 				memcpy(writer -> chunk_buffer, "BAM\1",4);
1260 				writer -> chunk_buffer_used  = 4;
1261 				memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used, &writer -> header_plain_text_buffer_used, 4);
1262 				writer -> chunk_buffer_used += 4;
1263 
1264 			}
1265 
1266 			memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used , writer -> header_plain_text_buffer + header_block_start, header_ptr - header_block_start+1);
1267 			writer -> chunk_buffer_used +=  header_ptr - header_block_start + 1;
1268 			SamBam_writer_add_chunk(writer, -1);
1269 			header_block_start = header_ptr + 1;
1270 		}
1271 		header_ptr ++;
1272 	}
1273 
1274 	free(writer -> header_plain_text_buffer);
1275 	writer -> header_plain_text_buffer = NULL;
1276 
1277 	// reference sequences
1278 	writer -> chunk_buffer_used = 0;
1279 	memcpy(writer -> chunk_buffer, & writer -> chromosome_name_table -> numOfElements, 4);
1280 	writer -> chunk_buffer_used = 4;
1281 
1282 	for( header_ptr=0 ;  header_ptr < writer -> chromosome_name_table -> numOfElements ; header_ptr ++)
1283 	{
1284 		//printf("D=%d\n", writer -> chromosome_id_table -> numOfElements);
1285 		char * chro_name = HashTableGet(writer -> chromosome_id_table, NULL + 1 + header_ptr);
1286 		unsigned int chro_len = HashTableGet(writer -> chromosome_len_table, NULL + 1 + header_ptr) - NULL - 1;
1287 		assert(chro_name);
1288 		int chro_name_len = strlen(chro_name)+1;
1289 
1290 		memcpy(writer -> chunk_buffer +  writer -> chunk_buffer_used , &chro_name_len, 4);
1291 		writer -> chunk_buffer_used += 4;
1292 
1293 		strcpy(writer -> chunk_buffer +  writer -> chunk_buffer_used , chro_name);
1294 		writer -> chunk_buffer_used += chro_name_len;
1295 
1296 		memcpy(writer -> chunk_buffer +  writer -> chunk_buffer_used , &chro_len, 4);
1297 		writer -> chunk_buffer_used += 4;
1298 
1299 		if(header_ptr ==  writer -> chromosome_name_table -> numOfElements - 1 || writer -> chunk_buffer_used > 55000)
1300 		{
1301 			SamBam_writer_add_chunk(writer, -1);
1302 			writer -> chunk_buffer_used = 0;
1303 		}
1304 	}
1305 
1306 }
1307 
1308 void SamBam_writer_finalise_one_thread(SamBam_Writer * writer);
1309 void SamBam_writer_sort_bins_to_BAM(SamBam_Writer * writer);
1310 
SamBam_writer_close(SamBam_Writer * writer)1311 int SamBam_writer_close(SamBam_Writer * writer)
1312 {
1313 	if(writer -> writer_state == 0)	// no reads were added
1314 	{
1315 		if(writer -> header_plain_text_buffer)
1316 			SamBam_writer_write_header(writer);
1317 	} else SamBam_writer_finalise_one_thread(writer);
1318 
1319 	if(writer -> sort_reads_by_coord){
1320 		SamBam_writer_sort_bins_to_BAM(writer);
1321 		worker_master_mutex_destroy(&writer->sorted_notifier);
1322 	}
1323 
1324 	writer -> chunk_buffer_used = 0;
1325 	SamBam_writer_add_chunk(writer, -1);
1326 //	fputc(0, writer -> bam_fp);
1327 
1328 	writer -> output_stream.next_in= NULL;
1329 	writer -> output_stream.avail_in= 0;
1330 	writer -> output_stream.next_out= NULL;
1331 	writer -> output_stream.avail_out= 0;
1332 
1333 	free(writer -> chunk_buffer);
1334 	free(writer -> compressed_chunk_buffer);
1335 	if(writer -> threads >=2){
1336 		int x1;
1337 		for(x1 = 0 ; x1 < writer -> threads; x1 ++){
1338 			free(writer -> threads_chunk_buffer[x1]);
1339 			free(writer -> threads_chunk_buffer_compressed[x1]);
1340 		}
1341 		free(writer -> threads_output_stream);
1342 		free(writer -> threads_chunk_buffer);
1343 		free(writer -> threads_chunk_buffer_compressed);
1344 		free(writer -> threads_chunk_buffer_used);
1345 	}
1346 	HashTableDestroy(writer -> chromosome_name_table);
1347 	HashTableDestroy(writer -> chromosome_id_table);
1348 	HashTableDestroy(writer -> chromosome_len_table);
1349 	#ifdef MAKE_STANDALONE
1350 	if(stdout != writer -> bam_fp)
1351 	#endif
1352 	fclose(writer -> bam_fp);
1353 	if(writer -> BAI_fp!=NULL) fclose(writer -> BAI_fp);
1354 
1355 	return 0;
1356 }
1357 
SamBam_writer_add_header(SamBam_Writer * writer,char * header_text,int add_chro)1358 int SamBam_writer_add_header(SamBam_Writer * writer, char * header_text, int add_chro)
1359 {
1360 	int new_text_len = strlen(header_text);
1361 
1362 	if(writer -> header_plain_text_buffer_max <= writer -> header_plain_text_buffer_used + new_text_len + 1)
1363 	{
1364 		//return 0;
1365 		writer -> header_plain_text_buffer_max *=2;
1366 		writer -> header_plain_text_buffer = realloc(writer -> header_plain_text_buffer ,  writer -> header_plain_text_buffer_max);
1367 		//printf("REAL: %d : %llX\n",writer -> header_plain_text_buffer_max, (long long ) writer -> header_plain_text_buffer);
1368 	}
1369 
1370 	strcpy(writer -> header_plain_text_buffer + writer -> header_plain_text_buffer_used, header_text);
1371 	writer -> header_plain_text_buffer_used += new_text_len;
1372 	strcpy(writer -> header_plain_text_buffer + writer -> header_plain_text_buffer_used, "\n");
1373 	writer -> header_plain_text_buffer_used ++;
1374 
1375 //	SUBREADprintf("ADJHEADERXCD BIN=%s\n", header_text);
1376 	if(add_chro && memcmp(header_text, "@SQ",3)==0)
1377 	{
1378 		char * chro = NULL;
1379 		int chro_len = -1;
1380 		char * toktmp = NULL;
1381 		char * ret_tmp = strtok_r(header_text, "\t", &toktmp);
1382 
1383 		while(1){
1384 			if(!ret_tmp) break;
1385 
1386 			if(memcmp(ret_tmp,"SN:", 3)==0) chro = ret_tmp + 3;
1387 			else if(memcmp(ret_tmp,"LN:", 3)==0) chro_len = atoi(ret_tmp + 3);
1388 
1389 			ret_tmp = strtok_r(NULL, "\t", &toktmp);
1390 		}
1391 
1392 		if(chro && (chro_len>0))
1393 			SamBam_writer_add_chromosome(writer, chro, chro_len, 0);
1394 
1395 	}
1396 
1397 	//if(writer -> header_plain_text_buffer_used %97==0) printf("MV=%d\n",writer -> header_plain_text_buffer_used);
1398 
1399 	return 0;
1400 }
1401 
SamBam_writer_add_chromosome(SamBam_Writer * writer,char * chro_name,unsigned int chro_length,int add_header)1402 int SamBam_writer_add_chromosome(SamBam_Writer * writer, char * chro_name, unsigned int chro_length, int add_header)
1403 {
1404 	unsigned int chro_id = writer -> chromosome_name_table -> numOfElements;
1405 
1406 	//SUBREADprintf("ADJHEADER_CHRO %s of %u\n", chro_name, chro_length);
1407 	//assert(strlen(chro_name) < 30);
1408 
1409 	char * chro_name_space = malloc(strlen(chro_name)+1);
1410 	strcpy(chro_name_space , chro_name);
1411 	HashTablePut(writer -> chromosome_name_table, chro_name_space, NULL+1+chro_id);
1412 	HashTablePut(writer -> chromosome_id_table, NULL+1+chro_id, chro_name_space);
1413 	HashTablePut(writer -> chromosome_len_table, NULL+1+chro_id, NULL + 1 + chro_length);
1414 
1415 	if(add_header)
1416 	{
1417 		char * line_buf = malloc(1000);
1418 		snprintf(line_buf,999, "@SQ\tSN:%s\tLN:%u", chro_name , chro_length);
1419 		SamBam_writer_add_header(writer, line_buf, 0);
1420 		free(line_buf);
1421 	}
1422 
1423 	return 0;
1424 }
1425 
1426 
SamBam_compress_cigar(char * cigar,int * cigar_int,int * ret_coverage,int max_secs)1427 int SamBam_compress_cigar(char * cigar, int * cigar_int, int * ret_coverage, int max_secs)
1428 {
1429 	int tmp_int=0;
1430 	int cigar_cursor = 0, num_opt = 0;
1431 	int coverage_len = 0;
1432 	(* ret_coverage) = 0;
1433 
1434 	if(cigar[0]=='*') return 0;
1435 
1436 	while(1)
1437 	{
1438 		char nch = cigar[cigar_cursor++];
1439 		if(!nch)break;
1440 		if(isdigit(nch))
1441 		{
1442 			tmp_int = tmp_int*10+(nch-'0');
1443 		}
1444 		else
1445 		{
1446 			int int_opt=0;
1447 			if(nch == 'M' || nch == 'N' || nch == 'D') coverage_len += tmp_int;
1448 			//if(nch == 'M' ||nch == 'D' || nch == '=' || nch == 'X') coverage_len += tmp_int;
1449 			for(; int_opt<8; int_opt++) if("MIDNSHP=X"[int_opt] == nch)break;
1450 			cigar_int[num_opt ++] = (tmp_int << 4) | int_opt;
1451 			tmp_int = 0;
1452 			//SUBREADprintf("CIGARCOM: %d-th is %c\n", num_opt, nch);
1453 			if(num_opt>=max_secs)break;
1454 		}
1455 	}
1456 
1457 	(*ret_coverage) = coverage_len;
1458 	return num_opt;
1459 }
1460 
SamBam_read2bin(char * read_txt,char * read_bin)1461 void SamBam_read2bin(char * read_txt, char * read_bin)
1462 {
1463 	int bin_cursor = 0, txt_cursor = 0;
1464 
1465 	while(1)
1466 	{
1467 		char nch = read_txt[txt_cursor++];
1468 		if(!nch)break;
1469 		int fourbit;
1470 		for(fourbit=0;fourbit<15;fourbit++) if("=ACMGRSVTWYHKDBN"[fourbit] == nch)break;
1471 
1472 		if(bin_cursor %2 == 0)  read_bin[bin_cursor/2] =  fourbit<<4;
1473 		else read_bin[bin_cursor/2] |=  fourbit;
1474 
1475 		bin_cursor++;
1476 	}
1477 }
1478 
SamBam_compress_additional(char * additional_columns,char * bin)1479 int SamBam_compress_additional(char * additional_columns, char * bin)
1480 {
1481 	int col_cursor = 0 , col_len = strlen(additional_columns);
1482 	int bin_cursor = 0;
1483 
1484 	while(col_cursor<col_len)
1485 	{
1486 		if(col_cursor==0 || additional_columns[col_cursor]=='\t')
1487 		{
1488 			if(additional_columns[col_cursor]=='\t') col_cursor++;
1489 
1490 			bin[bin_cursor] = additional_columns[col_cursor];
1491 			bin[bin_cursor+1] = additional_columns[col_cursor+1];
1492 
1493 			char datatype = additional_columns[col_cursor+3];
1494 			if(datatype=='i' || datatype == 'f')
1495 			{
1496 				int dig_len =0;
1497 				while(additional_columns[dig_len+col_cursor+5] != '\t' && additional_columns[dig_len+col_cursor+5]) dig_len++;
1498 				int val = 0;
1499 				float fval = 0;
1500 				if(datatype=='i') val = atoi(additional_columns+col_cursor+5);
1501 				else val = atof(additional_columns+col_cursor+5);
1502 
1503 				bin[bin_cursor+2]=datatype;
1504 				memcpy(bin+bin_cursor+3, (datatype=='i')? ((void *)&val):((void *)&fval),4);
1505 				bin_cursor += 3 + 4;
1506 				col_cursor += 5 + dig_len;
1507 			}
1508 			else if(datatype=='Z' || datatype == 'H')
1509 			{
1510 				bin[bin_cursor+2]=datatype;
1511 				bin_cursor +=3;
1512 				int str_len = 0;
1513 				col_cursor +=5;
1514 				while(additional_columns[str_len+col_cursor] != '\t' && additional_columns[str_len+col_cursor])
1515 				{
1516 					bin[bin_cursor + str_len] = additional_columns[str_len+col_cursor];
1517 					str_len++;
1518 					if(bin_cursor + str_len > 780) break;
1519 				}
1520 
1521 				bin[bin_cursor + str_len] =0;
1522 
1523 				bin_cursor += str_len + 1;
1524 				col_cursor += str_len;
1525 			}
1526 			else if(datatype=='A')
1527 			{
1528 				bin[bin_cursor+2]='A';
1529 				bin[bin_cursor+3]=additional_columns[col_cursor+5];
1530 				col_cursor += 6;
1531 				bin_cursor += 4;
1532 			}
1533 			else if(datatype=='B')
1534 				//array
1535 			{
1536 				char celltype = additional_columns[col_cursor+5];
1537 				int * items = (int *)(&bin[bin_cursor+4]);
1538 
1539 				bin[bin_cursor+2]='B';
1540 				bin[bin_cursor+3]=celltype;
1541 				bin_cursor += 4 + 4;
1542 				col_cursor += 7;
1543 
1544 				(*items) = 0;
1545 
1546 				int last_cursor = col_cursor;
1547 				while(1){
1548 					if(additional_columns[col_cursor] == ',' || additional_columns[col_cursor] == '\t' || additional_columns[col_cursor] == 0)
1549 					{ // add new item
1550 
1551 						char cell_buff [30];
1552 						if((col_cursor - last_cursor) < 29)
1553 						{
1554 							memcpy(cell_buff, additional_columns + last_cursor, (col_cursor - last_cursor));
1555 							cell_buff[(col_cursor - last_cursor)] = 0;
1556 							int intv = 0; float fltv = 0;
1557 							if(celltype == 'i')intv = atoi(cell_buff);
1558 							else fltv = atof(cell_buff);
1559 							if(bin_cursor < 780){
1560 								memcpy(bin + bin_cursor, (celltype == 'i')?(void *)&intv:(void *)&fltv, 4);
1561 								bin_cursor += 4;
1562 								(*items) ++;
1563 							}
1564 						}
1565 						last_cursor = col_cursor+1;
1566 					}
1567 					if(additional_columns[col_cursor] == '\t' || additional_columns[col_cursor] == 0)
1568 						break;
1569 
1570 					col_cursor++;
1571 
1572 				}
1573 
1574 			}
1575 
1576 			if(bin_cursor>750) break;
1577 			continue;
1578 		}
1579 
1580 		col_cursor++;
1581 	}
1582 	return bin_cursor;
1583 }
1584 
SamBam_reg2bin(int beg,int end)1585 int SamBam_reg2bin(int beg, int end)
1586 {
1587 	--end;
1588 	if (beg>>14 == end>>14) return ((1<<15)-1)/7 + (beg>>14);
1589 	if (beg>>17 == end>>17) return ((1<<12)-1)/7 + (beg>>17);
1590 	if (beg>>20 == end>>20) return ((1<<9)-1)/7 + (beg>>20);
1591 	if (beg>>23 == end>>23) return ((1<<6)-1)/7 + (beg>>23);
1592 	if (beg>>26 == end>>26) return ((1<<3)-1)/7 + (beg>>26);
1593 	return 0;
1594 }
1595 
SamBam_writer_finish_header(SamBam_Writer * writer)1596 void SamBam_writer_finish_header( SamBam_Writer * writer ){
1597 	if(writer -> writer_state == 0){
1598 		if(writer -> header_plain_text_buffer)
1599 		SamBam_writer_write_header(writer);
1600 		writer -> writer_state = 10;
1601 	}
1602 }
1603 
1604 #define FC_MAX_CIGAR_SECTIONS 96
1605 
1606 // The caller has to free the returned pointer.
1607 // It returns NULL if no such field.
duplicate_TAB_record_field(char * rline,int fno,int to_end)1608 char * duplicate_TAB_record_field(char * rline, int fno, int to_end){
1609 	int i, fldi = 0, start_pos = -1;
1610 	if(fno >=1)
1611 		for(i=0;rline[i] && rline[i] != '\n'; i++){
1612 			int nch = rline[i];
1613 			if(nch == '\t'){
1614 				fldi++;
1615 				if(fldi == fno) start_pos = i+1;
1616 				if(fldi == fno+1) break;
1617 			}
1618 		}
1619 	else{
1620 		start_pos = rline[0]>0?0:-1;
1621 		if(start_pos == 0) for(i=0;rline[i] && rline[i] != '\n' && rline[i] != '\t'; i++);
1622 		else i=-1;
1623 	}
1624 	int end_pos =i;
1625 	if(to_end){
1626 		end_pos = strlen(rline);
1627 		if(end_pos<1)  return NULL;
1628 
1629 		if(rline[end_pos-1]=='\n') end_pos--;
1630 	}
1631 
1632 	if(start_pos<0 || end_pos <= start_pos) return NULL;
1633 	char * ret = malloc(end_pos - start_pos +1);
1634 	memcpy(ret, rline+start_pos, end_pos -start_pos);
1635 	ret[end_pos -start_pos] = 0;
1636 	return ret;
1637 }
1638 
SamBam_writer_add_read_bin(SamBam_Writer * writer,int thread_no,char * rbin,int committable)1639 int SamBam_writer_add_read_bin(SamBam_Writer * writer, int thread_no, char * rbin, int committable){
1640 	char * this_chunk_buffer=NULL;
1641 	srInt_64 *this_chunk_buffer_used=NULL;
1642 	if(thread_no >= 0){
1643 		if(writer -> sort_reads_by_coord && writer -> threads_chunk_buffer_max_size[thread_no] < writer -> threads_chunk_buffer_used[thread_no] + 12000){
1644 			writer -> threads_chunk_buffer_max_size[thread_no] = writer -> threads_chunk_buffer_max_size[thread_no] * 7/4;
1645 			writer -> threads_chunk_buffer[ thread_no ] = realloc(writer -> threads_chunk_buffer[ thread_no ], writer -> threads_chunk_buffer_max_size[thread_no]);
1646 		}
1647 		this_chunk_buffer = writer -> threads_chunk_buffer[ thread_no ];
1648 		this_chunk_buffer_used = writer -> threads_chunk_buffer_used + thread_no;
1649 	}else{
1650 		if(writer -> sort_reads_by_coord && writer -> chunk_buffer_max_size < writer -> chunk_buffer_used + 12000){
1651 			//SUBREADprintf("0THR_REALLOCATE MEM : %d -> %d\n", writer -> chunk_buffer_max_size,  writer -> chunk_buffer_max_size * 7/4);
1652 			writer -> chunk_buffer_max_size = writer -> chunk_buffer_max_size * 7/4;
1653 			writer -> chunk_buffer = realloc(writer -> chunk_buffer, writer -> chunk_buffer_max_size);
1654 		}
1655 		this_chunk_buffer = writer -> chunk_buffer;
1656 		this_chunk_buffer_used = &writer -> chunk_buffer_used;
1657 	}
1658 	//SUBREADprintf("WRTXBIN: PTR %p of USED %lld at read %p\n", this_chunk_buffer, *this_chunk_buffer_used, rbin);
1659 	int reclen=0;
1660 	memcpy(&reclen, rbin,4);
1661 	memcpy(this_chunk_buffer+(*this_chunk_buffer_used), rbin, reclen+4);
1662 	(*this_chunk_buffer_used)+= reclen+4;
1663 
1664 	if((*this_chunk_buffer_used)>55000 && committable && !writer -> sort_reads_by_coord)
1665 		SamBam_writer_add_chunk(writer, thread_no);
1666 	//SUBREADprintf("WRTXBIN: FIN WTR %p of USED %lld\n", this_chunk_buffer, *this_chunk_buffer_used);
1667 	return 0;
1668 }
SamBam_writer_add_read_line(SamBam_Writer * writer,int thread_no,char * rline,int committable)1669 int SamBam_writer_add_read_line(SamBam_Writer * writer, int thread_no, char * rline, int committable){
1670 	char * read_name, * flag_str, *chro_name, *chro_position_str, * mapping_quality_str, * cigar, * next_chro_name, *next_chro_position_str, *temp_len_str, *read_text, *qual_text, *additional_columns;
1671 	read_name = duplicate_TAB_record_field(rline, 0,0);
1672 	flag_str = duplicate_TAB_record_field(rline, 1,0);
1673 	chro_name = duplicate_TAB_record_field(rline, 2,0);
1674 	chro_position_str = duplicate_TAB_record_field(rline, 3,0);
1675 	mapping_quality_str = duplicate_TAB_record_field(rline, 4,0);
1676 	cigar = duplicate_TAB_record_field(rline, 5,0);
1677 	next_chro_name = duplicate_TAB_record_field(rline, 6,0);
1678 	next_chro_position_str = duplicate_TAB_record_field(rline, 7,0);
1679 	temp_len_str = duplicate_TAB_record_field(rline, 8,0);
1680 	read_text = duplicate_TAB_record_field(rline, 9,0);
1681 	qual_text = duplicate_TAB_record_field(rline, 10,0);
1682 	additional_columns = duplicate_TAB_record_field(rline, 11,1);
1683 	if(qual_text==NULL){
1684 		SUBREADprintf("FATAL ERROR : bad read format: %s, %s, %s, %s\n", read_name, flag_str, chro_name, rline);
1685 		return -1;
1686 	}
1687 
1688 	SamBam_writer_add_read(writer, thread_no, read_name, atoi(flag_str), chro_name, atoi(chro_position_str), atoi(mapping_quality_str), cigar, next_chro_name, atoi(next_chro_position_str), atoi(temp_len_str), strlen(read_text), read_text, qual_text, additional_columns, committable);
1689 
1690 	if(additional_columns) free(additional_columns);
1691 	free(qual_text);
1692 	free(read_text);
1693 	free(temp_len_str);
1694 	free(next_chro_position_str);
1695 	free(next_chro_name);
1696 	free(cigar);
1697 	free(mapping_quality_str);
1698 	free(chro_position_str);
1699 	free(chro_name);
1700 	free(flag_str);
1701 	free(read_name);
1702 	return 0;
1703 }
1704 
SamBam_writer_add_read(SamBam_Writer * writer,int thread_no,char * read_name,unsigned int flags,char * chro_name,unsigned int chro_position,int mapping_quality,char * cigar,char * next_chro_name,unsigned int next_chro_position,int temp_len,int read_len,char * read_text,char * qual_text,char * additional_columns,int committable)1705 int SamBam_writer_add_read(SamBam_Writer * writer, int thread_no, char * read_name, unsigned int flags, char * chro_name, unsigned int chro_position, int mapping_quality, char * cigar, char * next_chro_name, unsigned int next_chro_position, int temp_len, int read_len, char * read_text, char * qual_text, char * additional_columns, int committable)
1706 {
1707 	assert(writer -> writer_state!=0);
1708 	if(!qual_text || !read_text)
1709 	{
1710 		SUBREADprintf("ERROR: sam file is incomplete.\n");
1711 		return 1;
1712 	}
1713 
1714 	char additional_bin[1000];
1715 	int cigar_opts[FC_MAX_CIGAR_SECTIONS], xk1, cover_length = 0;
1716 	int cigar_opt_len = SamBam_compress_cigar(cigar, cigar_opts, & cover_length, FC_MAX_CIGAR_SECTIONS);
1717 	int read_name_len = 1+strlen(read_name) ;
1718 	int additional_bin_len = 0;
1719 	if(additional_columns) additional_bin_len = SamBam_compress_additional(additional_columns, additional_bin);
1720 	int record_length = 4 + 4 + 4 + 4 +  /* l_seq: */ 4 + 4 + 4 + 4 + /* read_name:*/ read_name_len + cigar_opt_len * 4 + (read_len + 1) /2 + read_len + additional_bin_len;
1721 
1722 
1723 	char * this_chunk_buffer;
1724 	srInt_64 * this_chunk_buffer_used;
1725 
1726 	if(thread_no >= 0){
1727 		if(writer -> sort_reads_by_coord && writer -> threads_chunk_buffer_max_size[thread_no] < writer -> threads_chunk_buffer_used[thread_no] + 12000){
1728 			writer -> threads_chunk_buffer_max_size[thread_no] = writer -> threads_chunk_buffer_max_size[thread_no] * 7/4;
1729 			writer -> threads_chunk_buffer[ thread_no ] = realloc(writer -> threads_chunk_buffer[ thread_no ], writer -> threads_chunk_buffer_max_size[thread_no]);
1730 		}
1731 		this_chunk_buffer = writer -> threads_chunk_buffer[ thread_no ];
1732 		this_chunk_buffer_used = writer -> threads_chunk_buffer_used + thread_no;
1733 	}else{
1734 		if(writer -> sort_reads_by_coord && writer -> chunk_buffer_max_size < writer -> chunk_buffer_used + 12000){
1735 			//SUBREADprintf("REALLOCATE MEM : %d -> %d\n", writer -> chunk_buffer_max_size,  writer -> chunk_buffer_max_size * 7/4);
1736 			writer -> chunk_buffer_max_size = writer -> chunk_buffer_max_size * 7/4;
1737 			writer -> chunk_buffer = realloc(writer -> chunk_buffer, writer -> chunk_buffer_max_size);
1738 		}
1739 		this_chunk_buffer = writer -> chunk_buffer;
1740 		this_chunk_buffer_used = &writer -> chunk_buffer_used;
1741 	}
1742 
1743 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & record_length , 4);
1744 	(*this_chunk_buffer_used) += 4;
1745 
1746 	int bin = SamBam_reg2bin(chro_position -1, chro_position-1+cover_length);
1747 	//if(chro_position>0)SUBREADprintf("CREATE_BIN=%d\n", bin);
1748 
1749 	int refID = HashTableGet(writer -> chromosome_name_table, chro_name) - NULL - 1;
1750 	int bin_mq_nl = (bin<<16) | (mapping_quality << 8) | read_name_len ;
1751 	int fag_nc = (flags<<16) | cigar_opt_len;
1752 	int nextRefID = -1;
1753 
1754 	if(next_chro_name[0] != '*' && next_chro_name[0]!='=')
1755 		nextRefID = HashTableGet(writer -> chromosome_name_table, next_chro_name) - NULL - 1;
1756 	else if(next_chro_name[0] == '=')
1757 		nextRefID = refID;
1758 
1759 
1760 	chro_position--;
1761 	next_chro_position--;
1762 
1763 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & refID , 4);
1764 	(*this_chunk_buffer_used) += 4;
1765 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & chro_position , 4);
1766 	(*this_chunk_buffer_used) += 4;
1767 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & bin_mq_nl , 4);
1768 	(*this_chunk_buffer_used) += 4;
1769 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & fag_nc , 4);
1770 	(*this_chunk_buffer_used) += 4;
1771 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & read_len , 4);
1772 	(*this_chunk_buffer_used) += 4;
1773 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & nextRefID , 4);
1774 	(*this_chunk_buffer_used) += 4;
1775 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & next_chro_position , 4);
1776 	(*this_chunk_buffer_used) += 4;
1777 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , & temp_len , 4);
1778 	(*this_chunk_buffer_used) += 4;
1779 	strcpy(this_chunk_buffer + (*this_chunk_buffer_used) , read_name);
1780 	(*this_chunk_buffer_used) += read_name_len;
1781 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used) , cigar_opts, 4*cigar_opt_len);
1782 	(*this_chunk_buffer_used) += 4*cigar_opt_len;
1783 	SamBam_read2bin(read_text  , this_chunk_buffer + (*this_chunk_buffer_used));
1784 	(*this_chunk_buffer_used) += (read_len + 1) /2;
1785 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used), qual_text, read_len);
1786 	for(xk1=0; xk1<read_len; xk1++)
1787 		this_chunk_buffer[(*this_chunk_buffer_used)+xk1] -= 33;
1788 
1789 	(*this_chunk_buffer_used) += read_len;
1790 	memcpy(this_chunk_buffer + (*this_chunk_buffer_used), additional_bin, additional_bin_len);
1791 	(*this_chunk_buffer_used) += additional_bin_len;
1792 
1793 
1794 	if((*this_chunk_buffer_used)>55000 && committable && !writer -> sort_reads_by_coord)
1795 		SamBam_writer_add_chunk(writer, thread_no);
1796 
1797 	return 0;
1798 }
1799 
SamBam_unzip(char * out,int outlen,char * in,int inlen,int sync_only)1800 int SamBam_unzip(char * out, int outlen , char * in , int inlen, int sync_only)
1801 {
1802 	z_stream strm;
1803 	strm.zalloc = Z_NULL;
1804 	strm.zfree = Z_NULL;
1805 	strm.opaque = Z_NULL;
1806 	strm.avail_in = 0;
1807 	strm.next_in = Z_NULL;
1808 	int ret = inflateInit2(&strm, SAMBAM_GZIP_WINDOW_BITS);
1809 	if (ret != Z_OK)
1810 		return -1;
1811 
1812 	strm.avail_in = (unsigned int)inlen;
1813 	strm.next_in = (unsigned char *)in;
1814 
1815 	strm.avail_out = outlen;
1816 	strm.next_out = (unsigned char *)out;
1817 	ret = inflate(&strm, sync_only?Z_SYNC_FLUSH:Z_FINISH);
1818 	if(ret != Z_STREAM_END && ret!=0)
1819 	{
1820 		inflateEnd(&strm);
1821 		SUBREADprintf("DATA ERROR! code=%d\n", ret);
1822 		assert(0);
1823 		return -1;
1824 	}
1825 	int have = outlen - strm.avail_out;
1826 
1827 	inflateEnd(&strm);
1828 	//SUBREADprintf("DECOMPRESS GENERATED=%d\n", have);
1829 
1830 	return have;
1831 }
1832 
1833 
SamBam_writer_sort_buff_one_compare(void * Lbin,void * Rbin,ArrayList * me)1834 int SamBam_writer_sort_buff_one_compare(void * Lbin, void * Rbin, ArrayList * me){
1835 	unsigned long long * Lv = Lbin, *Rv = Rbin;
1836 	if(*Lv > *Rv) return 1;
1837 	else if(*Lv < *Rv) return -1;
1838 	return 0;
1839 }
1840 // return total reads in bin.
SamBam_writer_sort_buff_one_write(SamBam_Writer * writer,char * bin,int binlen,int thread_id)1841 int SamBam_writer_sort_buff_one_write(SamBam_Writer * writer, char * bin, int binlen, int thread_id){
1842 	int bin_cursor = 0;
1843 
1844 	ArrayList* sort_linear_pos = ArrayListCreate(1000000);
1845 	ArrayListSetDeallocationFunction(sort_linear_pos,  free);
1846 
1847 	int ii_reads = 0;
1848 	while(bin_cursor < binlen){
1849 		int this_block_len = 0;
1850 		memcpy(&this_block_len, bin + bin_cursor, 4);
1851 
1852 		char * key_binpos = malloc(12);
1853 		memcpy(key_binpos,   bin + bin_cursor+8, 4);
1854 		memcpy(key_binpos+4,   bin + bin_cursor+4, 4);
1855 		memcpy(key_binpos+8, &bin_cursor, 4);
1856 
1857 		ArrayListPush(sort_linear_pos, key_binpos);
1858 		bin_cursor +=4 + this_block_len;
1859 
1860 		ii_reads ++;
1861 	}
1862 	ArrayListSort(sort_linear_pos, SamBam_writer_sort_buff_one_compare);
1863 
1864 	char * nbin = NULL;
1865 	if(binlen >0 && binlen < 0x7fffffff)nbin=malloc(binlen);
1866 	int nb_cursor = 0, xx, wlen=0;
1867 	for(xx=0; xx<ii_reads;xx++){
1868 		int * binpos = ArrayListGet(sort_linear_pos, xx);
1869 		int block_len = 0;
1870 		memcpy(&block_len, bin + binpos[2] , 4);
1871 		assert(block_len < 10000);
1872 		memcpy(nbin + nb_cursor, bin + binpos[2], 4+block_len);
1873 		nb_cursor += 4+block_len;
1874 	}
1875 	assert(binlen == nb_cursor);
1876 	memcpy(bin, nbin, binlen);
1877 	ArrayListDestroy(sort_linear_pos);
1878 
1879 	char tmpfname[MAX_FILE_NAME_LENGTH+40];
1880 	if(writer -> threads>1) subread_lock_occupy(&writer -> thread_bam_lock);
1881 	sprintf(tmpfname, "%s-%06d.sortedbin", writer -> tmpf_prefix, writer -> sorted_batch_id++);
1882 	if(writer -> threads>1) subread_lock_release(&writer -> thread_bam_lock);
1883 	FILE * tofp  = fopen(tmpfname, "wb");
1884 	if(tofp){
1885 		if(binlen>0)wlen = fwrite(nbin, binlen,1, tofp);
1886 		fclose(tofp);
1887 	}
1888 	free(nbin);
1889 	if(wlen < 1 && binlen>0) {
1890 		SUBREADprintf("ERROR: no space (%d bytes) in the temp directory (%s).\nThe program cannot run properly.\n", binlen, tmpfname);
1891 		writer ->is_internal_error = 1;
1892 		return -1;
1893 	}
1894 	return ii_reads;
1895 }
1896 
SamBam_writer_sort_bins_to_BAM_FP_pos(FILE * fp)1897 unsigned long long SamBam_writer_sort_bins_to_BAM_FP_pos(FILE * fp){
1898 	int intvals[3];
1899 	unsigned long long ret = 0;
1900 	int rlen = fread(intvals, sizeof(int),3,fp);
1901 	if(rlen>0 && intvals[0]<10000){
1902 		int chro_no, poss;
1903 		chro_no=intvals[1];
1904 		poss=intvals[2];
1905 		ret =((1LLU*chro_no) << 32)| poss;
1906 		if(ret == SUBREAD_MAX_ULONGLONG) ret = ret- 10 ;
1907 		fseek(fp, -12, SEEK_CUR);
1908 		return ret;
1909 	}
1910 	return SUBREAD_MAX_ULONGLONG;
1911 }
1912 
1913 
SamBam_writer_calc_cigar_span(char * bin)1914 int SamBam_writer_calc_cigar_span(char * bin){
1915 	int cops = 0, rname_len = 0, ret = 0;
1916 	memcpy(&cops, bin+12, 4);
1917 	memcpy(&rname_len, bin+8, 4);
1918 	rname_len = rname_len & 0xff;
1919 	cops = cops & 0xffff;
1920 	int ii;
1921 	for(ii = 0; ii < cops ; ii++){
1922 		unsigned int copt = 0;
1923 		memcpy(&copt, bin+32+rname_len+4*ii, 4);
1924 		int copt_char = copt & 0xf;
1925 		unsigned int copt_len = copt >> 4;
1926 		if(copt_char == 0 || copt_char == 2 || copt_char == 3 || copt_char == 7 || copt_char == 8) ret += copt_len;
1927 	}
1928 
1929 	return ret;
1930 }
1931 
1932 #define MAX_ALLOWED_GAP_IN_BAI_CHUNK 5 // the number of blocks, not the file positions.
1933 
SamBam_writer_sort_bins_to_BAM_test_bins(SamBam_Writer * writer,HashTable * bin_tab,ArrayList * bin_list,ArrayList * win16k_list,int block_len,void *** last_chunk_ptr,int chro_no)1934 void SamBam_writer_sort_bins_to_BAM_test_bins(SamBam_Writer * writer, HashTable * bin_tab, ArrayList * bin_list, ArrayList * win16k_list, int block_len, void ***last_chunk_ptr, int chro_no){
1935 	int inbin_pos = writer -> chunk_buffer_used - block_len ; // point to the byte AFTER "block_len" int.
1936 	int pos=0, bin_mq_nl = 0, binno=0;
1937 
1938 	memcpy(&pos, writer -> chunk_buffer + inbin_pos + 4, 4);
1939 	memcpy(&bin_mq_nl, writer -> chunk_buffer + inbin_pos + 8,4);
1940 	binno = bin_mq_nl>>16;
1941 
1942 	int cigar_span = SamBam_writer_calc_cigar_span(writer -> chunk_buffer + inbin_pos);
1943 
1944 	int this_w16_no = (pos + cigar_span) >>14;	// WIN is calculated on 0-based pos.
1945 	unsigned long long this_Vpos = writer -> this_bam_block_no<<16 | (inbin_pos-4);
1946 
1947 	// if this read is after the maximum coordinate in the win16k list: all elements before last one and this one starts at this read.
1948 	if(this_w16_no >=win16k_list->numOfElements){
1949 		int bbi;
1950 		for(bbi = win16k_list->numOfElements; bbi <=this_w16_no; bbi++)
1951 			ArrayListPush(win16k_list, NULL+ this_Vpos);
1952 	}
1953 
1954 	// a read only belongs to ONE bin which is the smallest bin that can FULLY cover the full length of this read.
1955 	ArrayList * this_bin_chunks = HashTableGet(bin_tab, NULL+binno+1);
1956 	if(NULL == this_bin_chunks){
1957 		this_bin_chunks = ArrayListCreate(5);
1958 		HashTablePut(bin_tab, NULL+binno+1, this_bin_chunks);
1959 		ArrayListPush(bin_list, NULL+binno);
1960 	}
1961 
1962 	int found = 0;
1963 	// a bin is not necessarily continuous. Say, a top-level bin only contains a few reads (most reads a in low-level bins), but their locations are everywhere.
1964 	if(this_bin_chunks -> numOfElements > 0){
1965 		//SUBREADprintf("RESLOCS : %ld == %ld\n", this_Vpos, this_bin_chunks -> elementList [ this_bin_chunks -> numOfElements - 1 ] - NULL);
1966 		long long diff = this_Vpos >>16;
1967 		diff -=(this_bin_chunks -> elementList [ this_bin_chunks -> numOfElements - 1] - NULL)>>16;
1968 		if(diff < MAX_ALLOWED_GAP_IN_BAI_CHUNK){
1969 			this_bin_chunks -> elementList [ this_bin_chunks -> numOfElements -1] = NULL+this_Vpos + block_len+4;
1970 			*last_chunk_ptr = this_bin_chunks -> elementList + this_bin_chunks -> numOfElements -1;
1971 			found = 1;
1972 		}
1973 	}
1974 
1975 	// if the last chunk in this bin isn't good to be extended (too far from the file location of the new read), a new chunk is created.
1976 	if(!found){
1977 		ArrayListPush(this_bin_chunks, NULL + this_Vpos);
1978 		ArrayListPush(this_bin_chunks, NULL + this_Vpos + block_len+4);
1979 		*last_chunk_ptr = this_bin_chunks -> elementList + this_bin_chunks -> numOfElements -1;
1980 	}
1981 }
1982 
SamBam_writer_sorted_compress(void * vptr0)1983 void * SamBam_writer_sorted_compress(void * vptr0){
1984 	void ** vptr = vptr0;
1985 	SamBam_Writer * writer = vptr[0];
1986 	int this_thread_no = vptr[1]-NULL;
1987 	subread_lock_t * initlock = vptr[2];
1988 
1989 	worker_thread_start(&writer->sorted_notifier, this_thread_no);
1990 	subread_lock_release(initlock);
1991 	free(vptr0);
1992 	struct SamBam_sorted_compressor_st * me = writer -> writer_threads + this_thread_no;
1993 
1994 	while(1){
1995 		int termed = worker_wait_for_job(&writer->sorted_notifier, this_thread_no);
1996 		if(termed)break;
1997 
1998 		me->CRC32_plain = SamBam_CRC32(me->plain_text,  me->text_size);
1999 		me->strm.next_in = (unsigned char*)me->plain_text;
2000 		me->strm.avail_in = me->text_size;
2001 		me->strm.next_out = (unsigned char*)me->zipped_bin;
2002 		me->strm.avail_out = 70000;
2003 		int deret = deflate(&me->strm,Z_FINISH);
2004 		if(deret >=0){
2005 			deflateReset(&me->strm);
2006 			me->bin_size = 70000 - me->strm.avail_out;
2007 			me->last_job_done = 1;
2008 		}else{
2009 			SUBREADprintf("Error: cannot compress BAM block #%d , which is %llu, had %d => 70000 [ %d ] bytes , return = %d\n", this_thread_no, me -> bam_block_no, me->text_size, me->bin_size, deret);
2010 		}
2011 	}
2012 	return NULL;
2013 }
2014 
SamBam_thread_wait_merge_write(SamBam_Writer * writer,int thread_no)2015 void SamBam_thread_wait_merge_write(SamBam_Writer * writer, int thread_no){
2016 	master_wait_for_job_done(&writer -> sorted_notifier, thread_no);
2017 
2018 	if(writer -> writer_threads[thread_no].last_job_done){
2019 		srInt_64 fpos = ftello(writer -> bam_fp);
2020 		HashTablePut(writer -> block_no_p1_to_vpos_tab, NULL+1+writer -> writer_threads[thread_no].bam_block_no, NULL+fpos);
2021 
2022 		SamBam_writer_chunk_header(writer, writer -> writer_threads[thread_no].bin_size);
2023 		int rlen = fwrite( writer -> writer_threads[thread_no].zipped_bin ,1, writer -> writer_threads[thread_no].bin_size, writer -> bam_fp);
2024 		if(rlen != writer -> writer_threads[thread_no].bin_size){
2025 			SUBREADprintf("ERROR: cannot write output files.\n");
2026 			assert(0);
2027 		}
2028 		rlen = fwrite(&writer -> writer_threads[thread_no].CRC32_plain , 4, 1, writer -> bam_fp);
2029 		rlen = fwrite(&writer -> writer_threads[thread_no].text_size , 4, 1, writer -> bam_fp);
2030 
2031 		writer -> writer_threads[thread_no].text_size = 0;
2032 		writer -> writer_threads[thread_no].bin_size = 0;
2033 		writer -> writer_threads[thread_no].bam_block_no = -1;
2034 		writer -> writer_threads[thread_no].last_job_done = 0;
2035 	}
2036 }
2037 
SamBam_thread_set_new_job(SamBam_Writer * writer,int thread_no)2038 void SamBam_thread_set_new_job(SamBam_Writer * writer, int thread_no){
2039 	memcpy(writer -> writer_threads[thread_no].plain_text, writer ->chunk_buffer, writer->chunk_buffer_used);
2040 	writer -> writer_threads[thread_no].text_size = writer->chunk_buffer_used;
2041 	writer -> writer_threads[thread_no].bam_block_no = writer -> this_bam_block_no;
2042 	writer -> chunk_buffer_used = 0;
2043 	master_notify_worker(&writer -> sorted_notifier, thread_no);
2044 }
2045 
SamBam_writer_submit_sorted_compressing_task(SamBam_Writer * writer)2046 void SamBam_writer_submit_sorted_compressing_task(SamBam_Writer * writer){
2047 	SamBam_thread_wait_merge_write(writer, writer -> sorted_compress_this_thread_no);
2048 	SamBam_thread_set_new_job(writer, writer -> sorted_compress_this_thread_no);
2049 
2050 	writer -> sorted_compress_this_thread_no++;
2051 	if(writer -> sorted_compress_this_thread_no == writer -> threads) writer -> sorted_compress_this_thread_no=0;
2052 	writer -> this_bam_block_no ++;
2053 }
2054 
SamBam_writer_sort_bins_to_BAM_write_1R(SamBam_Writer * writer,FILE * fp,HashTable * bin_tab,ArrayList * bin_list,ArrayList * win16k_list,int chro_no)2055 void SamBam_writer_sort_bins_to_BAM_write_1R(SamBam_Writer * writer, FILE * fp, HashTable * bin_tab, ArrayList * bin_list, ArrayList * win16k_list, int chro_no){
2056 	int block_len=0;
2057 	int rlen = fread(&block_len, 4,1,fp);
2058 	//SUBREADprintf("WBIN=%d\n", block_len);
2059 	if(rlen<1 || block_len >= 10000){
2060 		SUBREADprintf("ERROR: sorted bin files are broken. RLEN=%d , BLKLEN=%d\n", rlen, block_len);
2061 		assert(rlen >=1);
2062 	}
2063 
2064 	memcpy(writer -> chunk_buffer + writer -> chunk_buffer_used, &block_len, 4);
2065 	writer ->chunk_buffer_used += 4;
2066 	rlen = fread(writer -> chunk_buffer + writer -> chunk_buffer_used, 1, block_len ,fp);
2067 	if(rlen < block_len){
2068 		SUBREADprintf("ERROR: sorted bin files are broken.\n");
2069 		assert(rlen >=block_len);
2070 	}
2071 	writer -> chunk_buffer_used+= rlen;
2072 
2073 	void ** last_chunk_ptr = NULL;
2074 	SamBam_writer_sort_bins_to_BAM_test_bins(writer, bin_tab, bin_list, win16k_list, block_len, &last_chunk_ptr, chro_no);
2075 
2076 	if(writer -> chunk_buffer_used>55000){
2077 		assert(writer -> chunk_buffer_used< 65000);
2078 		SamBam_writer_submit_sorted_compressing_task(writer);
2079 	}
2080 }
2081 
2082 #define SAMBAM_MERGE_MAX_FPS (350)
2083 
SamBam_writer_one_thread_merge_sortedbins(SamBam_Writer * writer)2084 void SamBam_writer_one_thread_merge_sortedbins(SamBam_Writer * writer){
2085 	int new_bins = 0;
2086 
2087 	if(writer -> sorted_batch_id > SAMBAM_MERGE_MAX_FPS){
2088 		int merge_i;
2089 		for(merge_i = 0; merge_i < writer -> sorted_batch_id; merge_i+= SAMBAM_MERGE_MAX_FPS){
2090 			char tfp[MAX_FILE_NAME_LENGTH+50];
2091 			int this_size = min(SAMBAM_MERGE_MAX_FPS, writer -> sorted_batch_id - merge_i);
2092 			if(this_size < 2) break;
2093 
2094 			//SUBREADprintf("CREATE_MERGE %d\n", new_bins + writer -> sorted_batch_id);
2095 			sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, writer -> sorted_batch_id + (new_bins++) );
2096 			FILE * outbinfp = fopen(tfp,"wb");
2097 
2098 			FILE ** sb_fps = malloc(sizeof(FILE *) * this_size);
2099 			unsigned long long * current_min_fps = malloc(sizeof(unsigned long long) * this_size);
2100 			unsigned long long current_min = SUBREAD_MAX_ULONGLONG;
2101 			int current_min_batch = -1, bii;
2102 
2103 			for(bii = 0; bii < this_size; bii++){
2104 				char tfpx[MAX_FILE_NAME_LENGTH+50];
2105 				current_min_fps[bii] = SUBREAD_MAX_ULONGLONG;
2106 
2107 				sprintf(tfpx , "%s-%06d.sortedbin", writer -> tmpf_prefix, bii + merge_i);
2108 				sb_fps[bii] = fopen(tfpx,"rb");
2109 				current_min_fps[bii] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[bii]);
2110 				if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2111 					current_min = current_min_fps[bii];
2112 					current_min_batch = bii;
2113 				}
2114 			}
2115 
2116 			char * mtmp = malloc(10000);
2117 			while(1){
2118 				if(current_min_batch>-1){
2119 					int blk_len = 0;
2120 					int rrlen = fread(&blk_len, 4, 1, sb_fps[current_min_batch]);
2121 					if(rrlen <1) assert(rrlen >0);
2122 					assert(blk_len < 10000);
2123 					rrlen = fread(mtmp, blk_len, 1, sb_fps[current_min_batch]);
2124 					if(rrlen <1) assert(rrlen >0);
2125 					fwrite(&blk_len,4,1, outbinfp);
2126 					fwrite(mtmp, blk_len,1, outbinfp);
2127 
2128 					current_min_fps[current_min_batch] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[current_min_batch]);
2129 					current_min = SUBREAD_MAX_ULONGLONG;
2130 					current_min_batch = -1;
2131 				}else break;
2132 
2133 				for(bii = 0; bii < this_size; bii++){
2134 					if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2135 						current_min = current_min_fps[bii];
2136 						current_min_batch = bii;
2137 					}
2138 				}
2139 			}
2140 			free(mtmp);
2141 
2142 			for(bii = 0; bii < this_size; bii++){
2143 				fclose(sb_fps[bii]);
2144 				sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, merge_i + bii);
2145 				//SUBREADprintf("    DEL_MERGE %d\n", bii);
2146 				unlink(tfp);
2147 			}
2148 			fclose(outbinfp);
2149 		}
2150 
2151 		writer -> sorted_batch_id += new_bins;
2152 	}
2153 }
2154 
2155 
2156 #define SAMBAM_renew_BAItabs	{ if(bin_chunks_table != NULL) { HashTableDestroy(bin_chunks_table); ArrayListDestroy(bins_list); ArrayListDestroy(win16k_list) ; } \
2157 	bin_chunks_table = HashTableCreate(10000); bins_list = ArrayListCreate(10000); win16k_list = ArrayListCreate(10000);\
2158 	HashTableSetDeallocationFunctions(bin_chunks_table, NULL, (void(*)(void *))ArrayListDestroy); }
2159 
2160 #define SAMBAM_write_empty_chros(nSTART, nEND) {int iikk; for(iikk=(nSTART); iikk<(nEND);iikk++){ int wwwlen = fwrite("\0\0\0\0\0\0\0\0", 1,8,writer -> BAI_fp); if(wwwlen < 8){ assert(wwwlen==8);}} }
2161 
2162 int level_min_binno[] = {0, 1, 9, 73, 585, 4681};
2163 
SamBam_writer_merge_chunks_compare(void * vL,void * vR,ArrayList * me)2164 int SamBam_writer_merge_chunks_compare(void * vL, void * vR, ArrayList * me){
2165 	long long * lL = vL;
2166 	long long * lR = vR;
2167 	if( (*lL) > (*lR)) return 1;
2168 	if( (*lL) < (*lR)) return -1;
2169 	return 0;
2170 }
2171 
SamBam_writer_merge_chunks(ArrayList * chs)2172 void SamBam_writer_merge_chunks(ArrayList * chs){
2173 	ArrayList * intvs = ArrayListCreate(chs -> numOfElements/2);
2174 	ArrayListSetDeallocationFunction(intvs, free);
2175 	int i;
2176 	for(i=0; i<chs -> numOfElements; i+=2){
2177 		long long * ll = malloc(sizeof(long long)*2);
2178 		ll[0] = ArrayListGet(chs, i)-NULL;
2179 		ll[1] = ArrayListGet(chs, i+1)-NULL;
2180 		ArrayListPush(intvs, ll);
2181 	}
2182 	chs -> numOfElements = 0;
2183 
2184 	ArrayListSort(intvs, SamBam_writer_merge_chunks_compare);
2185 	long long * ll0 =  ArrayListGet(intvs, 0);
2186 	ArrayListPush(chs, NULL+ll0[0]);
2187 	ArrayListPush(chs, NULL+ll0[1]);
2188 
2189 	for(i=1; i<intvs-> numOfElements; i++){
2190 		long long last_end = ArrayListGet(chs, chs -> numOfElements-1) -NULL;
2191 		long long * ll_this = ArrayListGet(intvs, i);
2192 
2193 		long long dist = ll_this[0]>>16;
2194 		dist -= last_end>>16;
2195 
2196 		if(dist < MAX_ALLOWED_GAP_IN_BAI_CHUNK)
2197 			chs-> elementList[chs -> numOfElements-1] = NULL+max(ll_this[1],last_end);
2198 		else{
2199 			ArrayListPush(chs, NULL+ll_this[0]);
2200 			ArrayListPush(chs, NULL+ll_this[1]);
2201 		}
2202 	}
2203 
2204 	//SUBREADprintf("MERGE CHUNKS : %ld -> %ld\n", intvs-> numOfElements, chs -> numOfElements/2);
2205 	ArrayListDestroy(intvs);
2206 }
2207 
SamBam_writer_optimize_bins_level(HashTable * bin_tab,ArrayList * bin_arr,HashTable * new_tab,ArrayList * new_arrs,int this_level)2208 void SamBam_writer_optimize_bins_level(HashTable *bin_tab, ArrayList *bin_arr, HashTable * new_tab, ArrayList * new_arrs, int this_level){
2209 	int i,j, my_min_binno = level_min_binno[this_level], my_parent_min_binno = -1, my_child_min_binno = 999999;
2210 	if(this_level>0) my_parent_min_binno = level_min_binno[this_level-1];
2211 	if(this_level<5) my_child_min_binno = level_min_binno[this_level+1];
2212 
2213 
2214 	// copy all parents and ancestors and children and descendant bins into new table & list
2215 	for(i=0; i< bin_arr -> numOfElements; i++){
2216 		int binno = ArrayListGet(bin_arr, i)-NULL;
2217 		if(binno < my_min_binno || binno >= my_child_min_binno){
2218 			ArrayList * old_bin_arr = HashTableGet(bin_tab, NULL+1+binno);
2219 			if(old_bin_arr -> numOfElements > 1){
2220 				HashTablePut(new_tab, NULL+1+binno, ArrayListDuplicate(old_bin_arr));
2221 				ArrayListPush(new_arrs, NULL+binno);
2222 			}
2223 		}
2224 	}
2225 
2226 	// scan this_level bins. If small enough -> put to parent bin and delete it.
2227 	// Otherwise put it into new tab&list.
2228 	for(i=0; i< bin_arr -> numOfElements; i++){
2229 		int binno = ArrayListGet(bin_arr, i)-NULL;
2230 		if(binno < my_min_binno || binno >= my_child_min_binno )continue;
2231 		ArrayList * small_bin_arr = HashTableGet(bin_tab, NULL+1+binno);
2232 		if(small_bin_arr -> numOfElements < 2) continue;
2233 
2234 		long long min_start_Voff = SUBREAD_MAX_LONGLONG;
2235 		long long max_end_Voff = -1;
2236 
2237 		for(j =0 ; j< small_bin_arr -> numOfElements; j+=2){
2238 			long long this_start = ArrayListGet(small_bin_arr, j)-NULL;
2239 			long long this_end = ArrayListGet(small_bin_arr, j+1)-NULL;
2240 			min_start_Voff = min(min_start_Voff, this_start);
2241 			max_end_Voff = max(max_end_Voff, this_end);
2242 		}
2243 
2244 		long long dist = max_end_Voff >> 16;
2245 		dist -= min_start_Voff >>16;
2246 		if(dist < MAX_ALLOWED_GAP_IN_BAI_CHUNK) {
2247 			int parent_binno = my_parent_min_binno +((binno - my_min_binno)>>3);
2248 			ArrayList * parent_bin_arr = HashTableGet(new_tab, NULL+1+parent_binno);
2249 			if(NULL == parent_bin_arr){
2250 				parent_bin_arr = ArrayListCreate(10);
2251 				HashTablePut(new_tab, NULL+1+parent_binno, parent_bin_arr);
2252 				ArrayListPush(new_arrs, NULL+parent_binno);
2253 			}
2254 			for(j =0 ; j< small_bin_arr -> numOfElements; j++)
2255 				ArrayListPush(parent_bin_arr, ArrayListGet(small_bin_arr, j));
2256 
2257 		} else {
2258 			HashTablePut(new_tab, NULL+1+binno, ArrayListDuplicate(small_bin_arr));
2259 			ArrayListPush(new_arrs, NULL + binno);
2260 		}
2261 	}
2262 
2263 	// look into all parent_level bins, reduce its size
2264 	for(i=0; i< new_arrs -> numOfElements; i++){
2265 		int binno = ArrayListGet(new_arrs, i)-NULL;
2266 		if(binno >=my_min_binno || binno < my_parent_min_binno) continue;
2267 		ArrayList * large_bin_arr = HashTableGet(new_tab, NULL+1+binno);
2268 		SamBam_writer_merge_chunks(large_bin_arr);
2269 	}
2270 
2271 	//SUBREADprintf("Tab size : %ld => %ld\n",bin_tab -> numOfElements, new_tab -> numOfElements);
2272 	HashTableDestroy(bin_tab);
2273 	ArrayListDestroy(bin_arr);
2274 }
2275 
SamBam_writer_optimize_bins(HashTable * bin_tab,ArrayList * bin_arr,HashTable ** new_tab,ArrayList ** new_arrs)2276 void SamBam_writer_optimize_bins(HashTable *bin_tab, ArrayList *bin_arr, HashTable ** new_tab, ArrayList ** new_arrs){
2277 	int this_level;
2278 	for(this_level = 5; this_level > 2; this_level --){
2279 	//	SUBREADprintf("OPTIMIZING AT LEVEL %d\n", this_level);
2280 		HashTable * new_bin_tab = HashTableCreate(2000);
2281 		HashTableSetDeallocationFunctions(new_bin_tab, NULL, (void(*)(void *))ArrayListDestroy);
2282 		ArrayList * new_bin_list = ArrayListCreate(2000);
2283 
2284 		SamBam_writer_optimize_bins_level(bin_tab, bin_arr, new_bin_tab, new_bin_list, this_level);
2285 		bin_tab = new_bin_tab;
2286 		bin_arr = new_bin_list;
2287 		(*new_tab) = new_bin_tab;
2288 		(*new_arrs) = new_bin_list;
2289 	}
2290 }
2291 
SamBam_write_sorted_thread_collect(SamBam_Writer * writer)2292 void SamBam_write_sorted_thread_collect(SamBam_Writer * writer){
2293 	int thread_i;
2294 	if(writer -> chunk_buffer_used>0) SamBam_writer_submit_sorted_compressing_task(writer);
2295 	for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2296 		//SUBREADprintf("FINISHED %d-th CHRO, MERGING THR %d\n", old_chro_no, writer->sorted_compress_this_thread_no );
2297 		SamBam_thread_wait_merge_write(writer, writer->sorted_compress_this_thread_no);
2298 		writer->sorted_compress_this_thread_no ++;
2299 		if(writer->sorted_compress_this_thread_no == writer -> threads) writer->sorted_compress_this_thread_no = 0;
2300 	}
2301 }
2302 
2303 #define SAMBAM_Block2Vpos(v) (v)=((HashTableGet(writer -> block_no_p1_to_vpos_tab, NULL+(v>>16)+1)-NULL )<<16 )|(v & 0xffff);
SamBam_write_BAI_for_1chr(SamBam_Writer * writer,HashTable ** bin_chunks_table,ArrayList ** bins_list,ArrayList ** win16k_list)2304 void SamBam_write_BAI_for_1chr(SamBam_Writer * writer, HashTable ** bin_chunks_table, ArrayList ** bins_list, ArrayList ** win16k_list){
2305 	SamBam_write_sorted_thread_collect(writer);
2306 
2307 	if(1){
2308 		HashTable * new_bin_tab = NULL;
2309 		ArrayList * new_bin_list = NULL;
2310 		SamBam_writer_optimize_bins(*bin_chunks_table ,*bins_list,&new_bin_tab,&new_bin_list);
2311 		*bin_chunks_table = new_bin_tab;
2312 		*bins_list = new_bin_list;
2313 	}
2314 
2315 	int n_bins = (*bins_list)->numOfElements;
2316 	int win16_nos = (*win16k_list)->numOfElements;
2317 	//SUBREADprintf("WIN_ITEMS=%d\t\t\tOLD_CHRO=%d  NEW_CHRO=%d  CURR_BATCH=%d\n", win16_nos, old_chro_no, chro_no , current_min_batch);
2318 
2319 	fwrite(&n_bins, 4, 1, writer -> BAI_fp);
2320 	int bini;
2321 	for(bini = 0; bini < n_bins; bini ++){
2322 		int bin_no = ArrayListGet(*bins_list, bini)-NULL;
2323 		ArrayList * chunks_list = HashTableGet(*bin_chunks_table, NULL+ bin_no+1);
2324 
2325 		int n_chunks = chunks_list -> numOfElements/2;
2326 		fwrite(&bin_no, 4, 1, writer -> BAI_fp);
2327 		fwrite(&n_chunks, 4, 1, writer -> BAI_fp);
2328 		int chunk_i;
2329 		for(chunk_i = 0; chunk_i < 2* n_chunks; chunk_i +=2){
2330 			long long int Voff_st = ArrayListGet(chunks_list, chunk_i) - NULL;
2331 			long long int Voff_en = ArrayListGet(chunks_list, chunk_i+1) - NULL;
2332 			SAMBAM_Block2Vpos(Voff_st);
2333 			SAMBAM_Block2Vpos(Voff_en);
2334 			fwrite(&Voff_st, 8, 1, writer -> BAI_fp);
2335 			fwrite(&Voff_en, 8, 1, writer -> BAI_fp);
2336 		}
2337 	}
2338 
2339 	fwrite(&win16_nos, 4, 1, writer -> BAI_fp);
2340 	for(bini = 0; bini < win16_nos; bini++){
2341 		long long int Voff = ArrayListGet(*win16k_list , bini) - NULL;
2342 		SAMBAM_Block2Vpos(Voff);
2343 		fwrite(&Voff, 8, 1, writer -> BAI_fp);
2344 	}
2345 
2346 }
2347 
2348 #define SAMBAM_reset_sorting_writer { HashTableRemoveAll( writer -> block_no_p1_to_vpos_tab ); }
2349 
2350 // MUST be called by THREAD 0!
SamBam_writer_sort_bins_to_BAM(SamBam_Writer * writer)2351 void SamBam_writer_sort_bins_to_BAM(SamBam_Writer * writer){
2352 	SamBam_writer_one_thread_merge_sortedbins(writer);
2353 
2354 	FILE ** sb_fps = malloc(sizeof(FILE *) * writer -> sorted_batch_id);
2355 	unsigned long long * current_min_fps = malloc(sizeof(unsigned long long) * writer -> sorted_batch_id);
2356 	int bii;
2357 	unsigned long long current_min = SUBREAD_MAX_ULONGLONG;
2358 	int current_min_batch = -1;
2359 	writer -> chunk_buffer_used = 0;
2360 
2361 	for(bii = 0; bii < writer -> sorted_batch_id; bii++){
2362 		char tfp[MAX_FILE_NAME_LENGTH+40];
2363 		current_min_fps[bii] = SUBREAD_MAX_ULONGLONG;
2364 
2365 		sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, bii);
2366 		sb_fps[bii] = fopen(tfp,"rb");
2367 		if(sb_fps[bii]!=NULL){
2368 			current_min_fps[bii] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[bii]);
2369 			if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2370 				current_min = current_min_fps[bii];
2371 				current_min_batch = bii;
2372 			}
2373 		}
2374 	}
2375 
2376 	ArrayList * bins_list = NULL; //	[Bin_Number_0, Bin_Number_1, ...]
2377 	ArrayList * win16k_list = NULL;	// [ Voffset_0, Voffset_1, ... ]
2378 	HashTable * bin_chunks_table=NULL;
2379 	SAMBAM_renew_BAItabs;
2380 
2381 	int chro_no = (int)(current_min >> 32);
2382 	int old_chro_no = -1, last_written_BAI_chro = -1;;
2383 
2384 	int wlen = fwrite("BAI\1", 4, 1, writer -> BAI_fp);
2385 	if(wlen < 1) assert(wlen >0);
2386 	int thread_i, n_ref = writer -> chromosome_name_table -> numOfElements;
2387 	subread_lock_t initlocks[writer -> threads];
2388 	wlen = fwrite(&n_ref, 4, 1, writer -> BAI_fp);
2389 	SAMBAM_write_empty_chros(0, chro_no);
2390 
2391 	writer -> block_no_p1_to_vpos_tab = HashTableCreate(100000);
2392 	writer -> writer_threads = calloc(sizeof(struct SamBam_sorted_compressor_st), writer -> threads);
2393 	for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2394 		memset(&(writer -> writer_threads[thread_i].strm),0,sizeof(z_stream));
2395 		deflateInit2(&(writer -> writer_threads[thread_i].strm), writer -> fastest_compression? SAMBAM_COMPRESS_LEVEL_FASTEST : SAMBAM_COMPRESS_LEVEL_NORMAL , Z_DEFLATED,
2396 			SAMBAM_GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY);
2397 		subread_init_lock(initlocks+thread_i);
2398 		subread_lock_occupy(initlocks+thread_i);
2399 
2400 		void ** thparam = malloc(sizeof(void*)*3);
2401 		thparam[0] = writer;
2402 		thparam[1] = NULL + thread_i;
2403 		thparam[2] = initlocks+thread_i;
2404 		pthread_create(&writer -> writer_threads[thread_i].thread_stub, NULL,SamBam_writer_sorted_compress,thparam);
2405 	}
2406 	for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2407 		subread_lock_occupy(initlocks+thread_i);
2408 		subread_destroy_lock(initlocks+thread_i);
2409 	}
2410 
2411 	SAMBAM_reset_sorting_writer;
2412 	while(1){ // GO THROUGH EACH READ and write into BAM
2413 		//if(current_min_batch<0)SUBREADprintf("FINALTEST: OLD/NEW = %d , %d\n", old_chro_no ,chro_no);
2414 		if(old_chro_no >=0 && chro_no != old_chro_no){ // if there is old_chro && if has to write. Note: old_chro_no<0 means we reached the "unmapped" part of the sorted temp files
2415 			//SUBREADprintf("\n====== W1CHR %d =====\n", old_chro_no);
2416 			SamBam_write_BAI_for_1chr(writer, &bin_chunks_table, &bins_list, &win16k_list);
2417 			last_written_BAI_chro = old_chro_no;
2418 			SAMBAM_write_empty_chros(old_chro_no +1, chro_no < 0?n_ref:chro_no);
2419 			SAMBAM_renew_BAItabs;
2420 			SAMBAM_reset_sorting_writer;
2421 		}
2422 
2423 		if(current_min_batch>-1){
2424 			SamBam_writer_sort_bins_to_BAM_write_1R(writer, sb_fps[current_min_batch], bin_chunks_table, bins_list, win16k_list, chro_no);
2425 			current_min_fps[current_min_batch] = SamBam_writer_sort_bins_to_BAM_FP_pos(sb_fps[current_min_batch]);
2426 			current_min = SUBREAD_MAX_ULONGLONG;
2427 			current_min_batch = -1;
2428 			old_chro_no = chro_no;
2429 		}else{
2430 			SamBam_write_sorted_thread_collect(writer);
2431 			break; // all finished. quit.
2432 		}
2433 
2434 
2435 		for(bii = 0; bii < writer -> sorted_batch_id; bii++){
2436 			if(current_min_fps[bii] < SUBREAD_MAX_ULONGLONG && current_min_fps[bii] < current_min){
2437 				current_min = current_min_fps[bii];
2438 				current_min_batch = bii;
2439 			}
2440 		}
2441 		chro_no = (int)(current_min >> 32);
2442 	}
2443 
2444 	for(bii = 0; bii < writer -> sorted_batch_id; bii++){
2445 		if(!sb_fps[bii]) continue;
2446 
2447 		char tfp[MAX_FILE_NAME_LENGTH+40];
2448 		sprintf(tfp , "%s-%06d.sortedbin", writer -> tmpf_prefix, bii);
2449 		fclose(sb_fps[bii]);
2450 		unlink(tfp);
2451 	}
2452 	if(bin_chunks_table != NULL) {
2453 		HashTableDestroy(bin_chunks_table);
2454 		ArrayListDestroy(bins_list);
2455 		ArrayListDestroy(win16k_list);
2456 	}
2457 
2458 	terminate_workers(&writer->sorted_notifier);
2459 	for(thread_i = 0; thread_i < writer -> threads; thread_i++){
2460 		pthread_join(writer -> writer_threads[thread_i].thread_stub, NULL);
2461 		deflateEnd(&(writer -> writer_threads[thread_i].strm));
2462 	}
2463 	HashTableDestroy(writer -> block_no_p1_to_vpos_tab);
2464 
2465 	free(writer->writer_threads);
2466 	free(current_min_fps);
2467 	free(sb_fps);
2468 }
2469 
SamBam_writer_finalise_thread(SamBam_Writer * writer,int thread_id)2470 void SamBam_writer_finalise_thread(SamBam_Writer * writer, int thread_id){
2471 	if(writer -> threads < 2){
2472 		if(writer -> sort_reads_by_coord){
2473 			SamBam_writer_sort_buff_one_write(writer, writer ->chunk_buffer, writer -> chunk_buffer_used, -1);
2474 			writer -> chunk_buffer_used= 0;
2475 		}else{
2476 			if(writer -> chunk_buffer_used) SamBam_writer_add_chunk(writer, thread_id);
2477 		}
2478 	}else{
2479 		if(writer -> sort_reads_by_coord){
2480 			SamBam_writer_sort_buff_one_write(writer, writer -> threads_chunk_buffer[thread_id], writer -> threads_chunk_buffer_used[thread_id], thread_id);
2481 			writer -> threads_chunk_buffer_used [thread_id] = 0;
2482 		}else{
2483 			if(writer -> threads_chunk_buffer_used[thread_id]) SamBam_writer_add_chunk(writer, thread_id);
2484 		}
2485 	}
2486 }
2487 
SamBam_writer_finalise_one_thread(SamBam_Writer * writer)2488 void SamBam_writer_finalise_one_thread(SamBam_Writer * writer){
2489 	if(writer -> threads < 2){
2490 		if(writer -> sort_reads_by_coord){
2491 			if(writer -> chunk_buffer_used>0){
2492 				SamBam_writer_sort_buff_one_write(writer, writer -> chunk_buffer, writer -> chunk_buffer_used, -1);
2493 				writer -> chunk_buffer_used = 0;
2494 			}
2495 		}else{
2496 			if(writer -> chunk_buffer_used)
2497 				SamBam_writer_add_chunk(writer, -1);
2498 		}
2499 	}
2500 }
2501