1 #include <assert.h>
2 #include "core.h"
3 #include "seek-zlib.h"
4 #include "input-files.h"
5 #include "gene-algorithms.h"
6 
7 #define SEEKGZ_INIT_TEXT_SIZE (1024*1024)
8 #define SEEKGZ_BINBUFF_SIZE (1*1024*1024)
9 
seekgz_ftello(seekable_zfile_t * fp)10 unsigned long long seekgz_ftello(seekable_zfile_t * fp){
11 	unsigned long long ret = ftello(fp -> gz_fp);
12 	ret -= fp -> stem.avail_in;
13 	return ret;
14 }
15 
crc_pos(char * bin,int len)16 unsigned int crc_pos(char * bin, int len){
17 	unsigned int crc0 = 0;//crc32(0, NULL, 0);
18 	unsigned int CRC32 = crc32(crc0, (unsigned char *) bin, len);
19 	return CRC32;
20 }
21 
seekgz_try_read_some_zipped_data(seekable_zfile_t * fp,int * is_eof)22 void seekgz_try_read_some_zipped_data(seekable_zfile_t * fp, int * is_eof){
23 	if(feof(fp->gz_fp)){
24 		if(is_eof) (*is_eof)=1;
25 		return;
26 	}
27 
28 	assert(fp -> stem.avail_in >= 0);
29 	if(fp -> stem.avail_in < SEEKGZ_BINBUFF_SIZE / 2 ) {
30 		if(fp -> in_zipped_buff_read_ptr > 0 && fp -> stem.avail_in > 0){
31 			int i;
32 			for(i = 0 ; i < fp -> stem.avail_in ; i ++){
33 				fp -> in_zipped_buffer[i] = fp -> in_zipped_buffer[i + fp -> in_zipped_buff_read_ptr];
34 			}
35 		}
36 		fp -> in_zipped_buff_read_ptr = 0;
37 
38 		int readlen = fread(fp -> in_zipped_buffer + fp -> stem.avail_in, 1 , SEEKGZ_BINBUFF_SIZE - fp -> stem.avail_in , fp -> gz_fp);
39 		if(readlen>0)
40 			fp -> stem.avail_in += readlen;
41 		fp -> stem.next_in = (unsigned char *)fp -> in_zipped_buffer;
42 	}
43 }
44 
seekgz_get_next_zipped_char(seekable_zfile_t * fp)45 int seekgz_get_next_zipped_char(seekable_zfile_t * fp){
46 	seekgz_try_read_some_zipped_data(fp, NULL);
47 	int ret = -1;
48 
49 	if(fp -> stem.avail_in > 0)
50 	{
51 		ret = fp -> in_zipped_buffer [ fp -> in_zipped_buff_read_ptr ++];
52 		fp -> stem.next_in = (unsigned char *)(fp -> in_zipped_buffer + fp -> in_zipped_buff_read_ptr);
53 		fp -> stem.avail_in --;
54 		if(ret<0) ret=256+ret;
55 	}
56 	return ret;
57 
58 }
59 
seekgz_skip_gzfile_header(seekable_zfile_t * fp,int tail_size)60 int seekgz_skip_gzfile_header(seekable_zfile_t * fp, int tail_size){
61 	int id1, id2;
62 
63 	if(tail_size){
64 		for(id1=0; id1<tail_size; id1++)
65 			seekgz_get_next_zipped_char(fp);
66 	}
67 	id1 = seekgz_get_next_zipped_char(fp);
68 	id2 = seekgz_get_next_zipped_char(fp);
69 
70 	if(id1 != 31 || id2 != 139){
71 		//SUBREADprintf("WRONG header:%d,%d\n", id1, id2);
72 		return 1;
73 	}
74 
75 	 seekgz_get_next_zipped_char(fp); // CM
76 	int FLG= seekgz_get_next_zipped_char(fp); // FLG
77 	seekgz_get_next_zipped_char(fp);
78 	seekgz_get_next_zipped_char(fp);
79 	seekgz_get_next_zipped_char(fp);
80 	seekgz_get_next_zipped_char(fp);
81 	seekgz_get_next_zipped_char(fp); // XFL
82 	seekgz_get_next_zipped_char(fp); // OS
83 
84 	//fprintf(stderr, "FLG=%d, XFL=%d\n" , FLG, XFL);
85 
86 	if(FLG & (1<<2)){	// FEXT
87 		unsigned short XLEN=0;
88 		XLEN = seekgz_get_next_zipped_char(fp);
89 		XLEN += seekgz_get_next_zipped_char(fp)*256;
90 		for(; XLEN>0; XLEN--)
91 			seekgz_get_next_zipped_char(fp);
92 	}
93 
94 	for(id1 = 3; id1 <=4; id1++){
95 		if(FLG & (1<<id1)){	// FNAME or FCOMMENT
96 			while(1){
97 				int namec = seekgz_get_next_zipped_char(fp);
98 				if(0==namec) break;
99 			}
100 		}
101 	}
102 	if(FLG & (1<<1)){	// FCRC
103 		seekgz_get_next_zipped_char(fp);
104 		seekgz_get_next_zipped_char(fp);
105 	}
106 
107 	fp -> next_block_file_offset = seekgz_ftello(fp);
108 	fp -> next_block_file_bits = 0;
109 	fp -> rolling_dict_window_used = 0;
110 	return 0;
111 }
112 int seekgz_load_more_blocks( seekable_zfile_t * fp , int bytes_ahead, subread_lock_t * read_lock );
113 
seekgz_open(const char * fname,seekable_zfile_t * fp,FILE * old_fp)114 int seekgz_open(const char * fname, seekable_zfile_t * fp, FILE * old_fp){
115 	memset(fp, 0, sizeof(seekable_zfile_t));
116 	if(old_fp) fp -> gz_fp = old_fp;
117 	else fp -> gz_fp = f_subr_open(fname, "rb");
118 	if(NULL==fp -> gz_fp)return -1;
119 
120 	fp -> in_zipped_buffer = malloc(SEEKGZ_BINBUFF_SIZE);
121 	subread_init_lock(&fp -> write_lock);
122 	//fp -> txt_buffer_size = SEEKGZ_INIT_TEXT_SIZE;
123 
124 	fp -> blocks_in_chain = 0;
125 	fp -> block_chain_current_no = 0; // no blocks in the chain
126 	fp -> rolling_dict_window_used = 0;
127 	fp -> current_block_txt_read_ptr = 0;
128 
129 	fp -> stem.zalloc = Z_NULL;
130 	fp -> stem.zfree = Z_NULL;
131 	fp -> stem.opaque = Z_NULL;
132 	fp -> stem.avail_in = 0;
133 	fp -> stem.next_in = Z_NULL;
134 	if( old_fp ){
135 		fp -> stem.avail_in = 2;
136 		fp -> in_zipped_buffer[0] = 0x1f;
137 		fp -> in_zipped_buffer[1] = 0x8b;
138 	}
139 
140 	int ret = seekgz_skip_gzfile_header(fp,0);
141 	if(ret) return 1;
142 	ret = inflateInit2(&(fp -> stem), -15);
143 	if(ret) return 1;
144 
145 	fp -> next_block_file_offset = seekgz_ftello( fp );
146 	fp -> next_block_file_bits = 0;
147 	ret = seekgz_load_more_blocks(fp,  300000, NULL);
148 	//SUBREADprintf("Gzip : opened %s of %p\n", fname, fp);
149 	return ret;
150 }
151 
seekgz_tell(seekable_zfile_t * fp,seekable_position_t * pos)152 void seekgz_tell(seekable_zfile_t * fp, seekable_position_t * pos){
153 	if(fp -> blocks_in_chain>0){
154 		seekable_decompressed_block_t * curr_block = fp -> block_rolling_chain+fp -> block_chain_current_no;
155 
156 		pos -> block_gzfile_offset = curr_block -> block_start_in_file_offset;
157 		pos -> block_gzfile_bits = curr_block -> block_start_in_file_bits;
158 		memcpy(pos -> dict_window, curr_block -> block_dict_window, curr_block -> block_dict_window_size);
159 		pos -> block_dict_window_size = curr_block -> block_dict_window_size;
160 		pos -> in_block_text_offset = fp -> current_block_txt_read_ptr;
161 	}else{
162 		pos -> block_gzfile_offset = 0xffffffffffffffffllu;
163 	}
164 }
165 
166 int seekgz_load_1_block( seekable_zfile_t * fp , int empty_block_no);
seekgz_seek(seekable_zfile_t * fp,seekable_position_t * pos)167 void seekgz_seek(seekable_zfile_t * fp, seekable_position_t * pos){
168 	//SUBREADprintf("SEEK:gzip -> fpos=%llu, block_offset=%d\n", pos -> block_gzfile_offset, pos -> in_block_text_offset);
169 	//
170 	if(pos -> block_gzfile_offset== 0xffffffffffffffffllu){
171 		fp -> block_chain_current_no = 0;
172 		fp -> blocks_in_chain = 0;
173 		fp -> stem.avail_in = 0;
174 		fseeko(fp->gz_fp, 0, SEEK_END);
175 		fgetc(fp->gz_fp);
176 		return ;
177 	}
178 	fseeko(fp->gz_fp, pos -> block_gzfile_offset - (pos -> block_gzfile_bits?1:0), SEEK_SET);
179 
180 	if(Z_OK!=inflateReset(&fp->stem))
181 		SUBREADprintf("FATAL: UNABLE TO INIT STREAM.\n\n\n");
182 	if(pos -> block_dict_window_size>0){
183 		if(pos -> block_gzfile_bits){
184 			char nch = fgetc(fp->gz_fp);
185 			//fprintf(stderr, "SEEK 2 FPPOS:%llu, NCH=%d\n", ftello(fp->gz_fp) , nch);
186 			inflatePrime(&fp->stem, pos -> block_gzfile_bits, nch>>(8-pos -> block_gzfile_bits));
187 		}
188 		if(Z_OK != inflateSetDictionary(&fp->stem, (unsigned char *)pos -> dict_window, pos -> block_dict_window_size))
189 			SUBREADprintf("FATAL: UNABLE TO RESET STREAM.\n\n\n");
190 	}
191 
192 	fp -> stem.avail_in = 0;
193 
194 	int ii;
195 	for(ii = 0; ii < fp -> blocks_in_chain ; ii++){
196 		int iv = ii + fp -> block_chain_current_no;
197 		if(iv >=SEEKGZ_CHAIN_BLOCKS_NO) iv-=SEEKGZ_CHAIN_BLOCKS_NO;
198 		free(fp -> block_rolling_chain[iv].block_txt);
199 		free(fp -> block_rolling_chain[iv].linebreak_positions);
200 	}
201 
202 	fp -> block_chain_current_no = 0;
203 	fp -> blocks_in_chain = 0;
204 	fp -> in_zipped_buff_read_ptr = 0;
205 	fp -> current_block_txt_read_ptr = 0;
206 
207 	memcpy(fp -> rolling_dict_window, pos -> dict_window, pos -> block_dict_window_size);
208 	fp -> rolling_dict_window_used= pos -> block_dict_window_size;
209 	fp -> next_block_file_offset = pos -> block_gzfile_offset;
210 	fp -> next_block_file_bits = pos -> block_gzfile_bits;
211 	seekgz_load_more_blocks(fp, 30000, NULL);
212 	fp -> current_block_txt_read_ptr = pos -> in_block_text_offset;
213 
214 	//SUBREADprintf("SEK: %d WIN=%d BLK (%d / %d), %d AVI\n", fp -> blocks_in_chain,  pos -> block_dict_window_size, fp -> current_block_txt_read_ptr , fp -> block_rolling_chain[0].block_txt_size, fp -> stem.avail_in);
215 }
216 
217 
seekgz_find_linebreaks(seekable_zfile_t * fp,int empty_block_no)218 void seekgz_find_linebreaks(seekable_zfile_t * fp, int empty_block_no){
219 	int lbks= 0, arrsize = 5000;
220 	unsigned int * arr = malloc(sizeof(int) * arrsize);
221 
222 	int ii;
223 	for(ii = 0; ii < fp -> block_rolling_chain[empty_block_no].block_txt_size ; ii++){
224 		char nch = fp-> block_rolling_chain[empty_block_no].block_txt[ii];
225 		if(nch == '\n'){
226 			if(lbks >= arrsize){
227 				arrsize*=2;
228 				arr = realloc(arr, sizeof(int) * arrsize);
229 			}
230 
231 			arr[lbks++] = ii;
232 		}
233 	}
234 	fp -> block_rolling_chain[empty_block_no].linebreak_positions = arr;
235 	fp -> block_rolling_chain[empty_block_no].linebreaks = lbks;
236 }
237 
seekgz_update_current_window(seekable_zfile_t * fp,char * txt,int tlen)238 void seekgz_update_current_window(seekable_zfile_t * fp , char * txt, int tlen){
239 	int fp_start=0, new_start=0, cplen=0;
240 
241 	int moveback = 0;
242 	if(tlen + fp -> rolling_dict_window_used > SEEKGZ_ZLIB_WINDOW_SIZE)
243 		moveback = tlen + fp -> rolling_dict_window_used - SEEKGZ_ZLIB_WINDOW_SIZE;
244 	if(moveback>0 && moveback < fp -> rolling_dict_window_used){
245 		int ii;
246 		for(ii = 0; ii <  fp -> rolling_dict_window_used - moveback; ii++){
247 			fp -> rolling_dict_window[ii] = fp -> rolling_dict_window[ii + moveback];
248 		}
249 	}
250 
251 	fp -> rolling_dict_window_used = moveback < fp -> rolling_dict_window_used ? (fp -> rolling_dict_window_used - moveback):0;
252 
253 	if(tlen > SEEKGZ_ZLIB_WINDOW_SIZE){
254 		fp_start =0;
255 		new_start = tlen - SEEKGZ_ZLIB_WINDOW_SIZE;
256 		cplen = SEEKGZ_ZLIB_WINDOW_SIZE;
257 	}else{
258 		new_start = 0;
259 		fp_start = fp -> rolling_dict_window_used;
260 		cplen = tlen;
261 	}
262 
263 	memcpy(fp -> rolling_dict_window + fp_start, txt + new_start, cplen);
264 	fp -> rolling_dict_window_used += cplen;
265 }
266 
seekgz_eof(seekable_zfile_t * fp)267 int seekgz_eof(seekable_zfile_t * fp){
268 	if(fp -> stem.avail_in < 1 && feof(fp->gz_fp))return 1;
269 	return 0;
270 }
271 
272 
273 #define SEEKGZ_ONE_BLOCK_TEXT_INIT_SIZE (256*1024)
274 // 1block ret < 0 : bad error
275 // 1block ret > 0 : data is loaded
276 // 1block ret == 0 : no data is loaded
277 
278 // A block MUST have at least 10000 bytes otherwise multiple blocks are concatinated
279 
280 #define MINIMUM_GZIP_BLOCK_SIZE 10000
281 
seekgz_load_1_block(seekable_zfile_t * fp,int empty_block_no)282 int seekgz_load_1_block( seekable_zfile_t * fp , int empty_block_no){
283 	if(seekgz_eof(fp))return 0;
284 
285 	char * out_txt = malloc(SEEKGZ_ONE_BLOCK_TEXT_INIT_SIZE);
286 	int out_txt_size = SEEKGZ_ONE_BLOCK_TEXT_INIT_SIZE;
287 	int out_txt_used = 0;
288 
289 	memcpy(fp -> block_rolling_chain[empty_block_no].block_dict_window, fp -> rolling_dict_window, fp -> rolling_dict_window_used);
290 	fp -> block_rolling_chain[empty_block_no].block_dict_window_size = fp -> rolling_dict_window_used;
291 	//fp -> rolling_dict_window_used = 0;
292 
293 	fp -> block_rolling_chain[empty_block_no].block_start_in_file_offset = fp -> next_block_file_offset;
294 	fp -> block_rolling_chain[empty_block_no].block_start_in_file_bits = fp -> next_block_file_bits;
295 
296 	int is_data_error = 0;
297 	int is_eof = 0;
298 	while(1){
299 		int is_block_end = 0;
300 		int is_gzip_unit_end = 0;
301 		while(1){
302 			seekgz_try_read_some_zipped_data(fp, &is_eof);
303 			if(fp -> stem.avail_in < 1 && is_eof)break;
304 
305 			if(out_txt_size < out_txt_used *4/3){
306 				out_txt_size *= 2;
307 				out_txt = realloc(out_txt,out_txt_size);
308 			}
309 
310 			int old_avail_in = fp -> stem.avail_in;
311 			fp -> stem.avail_out = out_txt_size - out_txt_used;
312 			fp -> stem.next_out = (unsigned char *)out_txt + out_txt_used;
313 
314 			void * old_next_in = fp -> stem.next_in;
315 
316 				//SUBREADprintf("i  INFLATINHG: FILEPOS=%llu  IN_AVAIL=%d  OUT_AVAIL=%d\n", seekgz_ftello(fp), fp -> stem.avail_in, fp -> stem.avail_out);
317 			int ret_ifl = inflate(&(fp -> stem), Z_BLOCK);
318 			int have = (out_txt_size - out_txt_used) - fp -> stem.avail_out;
319 			if(ret_ifl != Z_OK && ret_ifl != Z_STREAM_END){
320 				SUBREADprintf("ERR  INFLATINHG: RET=%d BY IN %d zipped bytes  ==>  %d PLAIN TXT\n", ret_ifl, old_avail_in - fp -> stem.avail_in, have);
321 				SUBREADprintf("ERR  INFLATINHG: BEND=%d\n", ( fp -> stem.data_type & 128 )&& !(fp -> stem.data_type & 64));
322 			}
323 
324 			//if(ret_ifl != Z_STREAM_END){
325 			//	SUBREADprintf("INF: STREAM_END\n");
326 			//}
327 			if(ret_ifl != Z_OK && ret_ifl != Z_STREAM_END){
328 				is_data_error = 1;
329 				break;
330 			}
331 			int zipped_data_used = (void *)fp -> stem.next_in - old_next_in;
332 
333 			fp -> in_zipped_buff_read_ptr += zipped_data_used;
334 			if( ( fp -> stem.data_type & 128 )&& !(fp -> stem.data_type & 64)) is_block_end = 1;
335 			if(ret_ifl == Z_STREAM_END) is_gzip_unit_end = 1;
336 
337 			//if(is_block_end)
338 			//	fp -> rolling_dict_window_used = 0;
339 			//else
340 			seekgz_update_current_window(fp, out_txt + out_txt_used, have);
341 
342 			out_txt_used += have;
343 
344 			if(is_block_end){
345 				fp -> next_block_file_offset = seekgz_ftello(fp);
346 				fp -> next_block_file_bits = fp->stem.data_type & 7;
347 			}
348 
349 			if(is_gzip_unit_end){
350 				seekgz_skip_gzfile_header(fp, 8);
351 				inflateReset(&fp->stem);
352 			}
353 
354 			if(is_block_end || is_gzip_unit_end) break;
355 		}
356 		if(out_txt_used >= MINIMUM_GZIP_BLOCK_SIZE || is_data_error ) break;
357 		if(fp -> stem.avail_in < 1 && is_eof)break;
358 	}
359 
360 	if(is_data_error) return -1;
361 
362 	//SUBREADprintf("o  END_1BLK: GOT %d PLAIN TXT i; first char: '%c'  fp==%p\n\n", out_txt_used, out_txt[0], fp);
363 	if(out_txt_used>0){
364 		fp -> block_rolling_chain[empty_block_no].block_txt = out_txt;
365 		fp -> block_rolling_chain[empty_block_no].block_txt_size = out_txt_used;
366 		seekgz_find_linebreaks(fp, empty_block_no);
367 		return out_txt_used;
368 	}else{
369 		free(out_txt);
370 		return 0;
371 	}
372 }
373 
seekgz_load_more_blocks(seekable_zfile_t * fp,int bytes_ahead,subread_lock_t * read_lock)374 int seekgz_load_more_blocks( seekable_zfile_t * fp , int bytes_ahead, subread_lock_t * read_lock ){
375 	int loaded_bytes=0, ret=0;
376 
377 	while(1){
378 		int empty_block_no = -1;
379 		subread_lock_occupy(&fp -> write_lock );
380 		if(read_lock) subread_lock_occupy(read_lock);
381 		if(fp -> blocks_in_chain < SEEKGZ_CHAIN_BLOCKS_NO){
382 			empty_block_no = fp-> block_chain_current_no + fp -> blocks_in_chain;
383 			if(empty_block_no >= SEEKGZ_CHAIN_BLOCKS_NO) empty_block_no -= SEEKGZ_CHAIN_BLOCKS_NO;
384 		}
385 		if(read_lock) subread_lock_release(read_lock);
386 
387 		if(empty_block_no < 0 || (bytes_ahead>=0 && loaded_bytes >= bytes_ahead)){
388 			subread_lock_release(&fp -> write_lock );
389 			break;
390 		}
391 		int oneblk_ret = seekgz_load_1_block(fp, empty_block_no);
392 
393 		//SUBREADprintf("LOAD_1BLK: RET=%d EMPTY_NO=%d\n", oneblk_ret, empty_block_no);
394 		// 1block ret < 0 : bad error
395 		// 1block ret > 0 : no data is loaded
396 		// 1block ret == 0 : normal
397 		if(oneblk_ret < 0){
398 			subread_lock_release(&fp -> write_lock );
399 			ret = oneblk_ret;
400 			break;
401 		}else if(0 < oneblk_ret){
402 			if(read_lock) subread_lock_occupy(read_lock);
403 			fp -> blocks_in_chain++;
404 			if(read_lock) subread_lock_release(read_lock);
405 			loaded_bytes+= oneblk_ret;
406 		}else if(seekgz_eof(fp)){
407 			subread_lock_release(&fp -> write_lock );
408 			break;
409 		}
410 		subread_lock_release(&fp -> write_lock );
411 	}
412 	return ret;
413 }
414 
415 int seqs = 0;
416 
seekgz_preload_buffer(seekable_zfile_t * fp,subread_lock_t * read_lock)417 int seekgz_preload_buffer( seekable_zfile_t * fp , subread_lock_t * read_lock){
418 	int do_preload = 0;
419 	seqs+=1;// (unsigned int)(fp->rolling_dict_window[0]);
420 	if(( read_lock || !fp -> has_multi_thread_accessed) && fp -> blocks_in_chain < 3 && !seekgz_eof(fp) ) do_preload =1;
421 	else if(read_lock && fp -> blocks_in_chain < SEEKGZ_CHAIN_BLOCKS_NO ){
422 		if(seqs >= 2000) {
423 			do_preload = 1;
424 			seqs = 0;
425 		}
426 	}
427 	if(!fp -> has_multi_thread_accessed && read_lock) fp -> has_multi_thread_accessed =1;
428 
429 	if(do_preload){
430 		int decompress_ret = seekgz_load_more_blocks(fp, -1, read_lock);
431 		return decompress_ret;
432 	}
433 	return 0;
434 }
435 
436 // return : > 0  : N bytes loaded
437 //          == 0 : EOF
438 //          < 0  :  data error
seekgz_gets(seekable_zfile_t * fp,char * buff,int buff_len)439 int seekgz_gets(seekable_zfile_t * fp, char * buff, int buff_len){
440 	//if(fp -> blocks_in_chain<3)SUBREADprintf("GTS: %d BLK, %d AVI\n", fp -> blocks_in_chain, fp -> stem.avail_in);
441 	int line_write_ptr = 0, is_end_line = 0;
442 	if(fp->blocks_in_chain < 1 && seekgz_eof(fp)){
443 		return 0;
444 	}
445 	while(1){
446 		int consumed_bytes;
447 
448 		assert(fp->blocks_in_chain>0);
449 		seekable_decompressed_block_t *cblk = fp -> block_rolling_chain+fp -> block_chain_current_no;
450 		if( cblk -> linebreaks >0 && fp-> current_block_txt_read_ptr <= cblk->linebreak_positions[cblk -> linebreaks-1] ){
451 			if(fp-> current_block_txt_read_ptr <=cblk->linebreak_positions[0]){
452 				consumed_bytes = cblk->linebreak_positions[0] - fp-> current_block_txt_read_ptr +1;
453 			}else{
454 				int high_idx = cblk -> linebreaks - 1 , low_idx = 0, idx = -1;
455 				while(1){
456 					if(high_idx <= low_idx+1) {
457 						idx = min(low_idx, high_idx);
458 						break;
459 					}
460 					int mid_idx = (high_idx+low_idx)/2;
461 					if(cblk->linebreak_positions[mid_idx] > fp -> current_block_txt_read_ptr)
462 						high_idx = mid_idx;
463 					else if( cblk->linebreak_positions[mid_idx] < fp -> current_block_txt_read_ptr )
464 						low_idx = mid_idx;
465 					else{
466 						idx = high_idx;
467 						break;
468 					}
469 				}
470 
471 				if(idx>0)idx--;
472 				while( cblk->linebreak_positions[idx+1] < fp -> current_block_txt_read_ptr) idx++;
473 				assert( idx <  cblk -> linebreaks-1 );
474 				consumed_bytes = cblk->linebreak_positions[idx+1] - fp-> current_block_txt_read_ptr +1;
475 			}
476 			is_end_line = 1;
477 		} else
478 			consumed_bytes = cblk -> block_txt_size - fp-> current_block_txt_read_ptr ;
479 
480 		if(buff){
481 			int cp_bytes = min(consumed_bytes, buff_len - line_write_ptr);
482 			memcpy( buff + line_write_ptr , cblk-> block_txt + fp-> current_block_txt_read_ptr, cp_bytes );
483 			line_write_ptr += cp_bytes;
484 			if(buff)buff[line_write_ptr] = 0;
485 		}
486 
487 		//SUBREADprintf("OUT_TXT  in BLC %d / %d %p  ENDL=%d CUNS_BYTES=%d / %d / %d : '''%s'''\n", fp -> block_chain_current_no, fp -> blocks_in_chain, fp, is_end_line, consumed_bytes,  fp-> current_block_txt_read_ptr, cblk -> block_txt_size ,  buff);
488 
489 		fp-> current_block_txt_read_ptr += consumed_bytes;
490 
491 		if( fp-> current_block_txt_read_ptr >= cblk -> block_txt_size ){
492 			//SUBREADprintf("CHAIN CONSUME_0 : %d ( - 1 ) %p\n", fp -> block_chain_current_no, cblk->block_txt);
493 			free(cblk->block_txt);
494 			//SUBREADprintf("CHAIN CONSUME_X : %d ( - 1 ) %p\n", fp -> block_chain_current_no, cblk->linebreak_positions);
495 			free(cblk->linebreak_positions);
496 			//SUBREADprintf("CHAIN CONSUME_Z : %d ( - 1 )\n", fp -> block_chain_current_no);
497 			fp-> current_block_txt_read_ptr = 0;
498 			fp -> block_chain_current_no++;
499 			if(fp -> block_chain_current_no>= SEEKGZ_CHAIN_BLOCKS_NO) fp -> block_chain_current_no=0;
500 			fp->blocks_in_chain --;
501 		}
502 
503 		if(is_end_line) break;
504 	}
505 	return line_write_ptr;
506 }
507 
seekgz_next_int8(seekable_zfile_t * fp)508 int seekgz_next_int8(seekable_zfile_t * fp){
509 	if(fp -> blocks_in_chain<1){
510 		seekgz_load_more_blocks(fp, -1, NULL);
511 		if(fp -> blocks_in_chain<1) return -1;
512 	}
513 	seekable_decompressed_block_t *cblk = fp -> block_rolling_chain+fp -> block_chain_current_no;
514 	int ret = cblk->block_txt[fp-> current_block_txt_read_ptr];
515 	fp-> current_block_txt_read_ptr++;
516 	if(fp-> current_block_txt_read_ptr == cblk -> block_txt_size){
517 		free(cblk->block_txt);
518 		free(cblk->linebreak_positions);
519 		fp-> current_block_txt_read_ptr = 0;
520 		fp -> block_chain_current_no++;
521 		if(fp -> block_chain_current_no>= SEEKGZ_CHAIN_BLOCKS_NO) fp -> block_chain_current_no=0;
522 		fp->blocks_in_chain --;
523 	}
524 	return ret<0?256+ret:ret;
525 }
526 
527 
seekgz_next_char(seekable_zfile_t * fp)528 int seekgz_next_char(seekable_zfile_t * fp){ // MUST BE PROTECTED BY read_lock
529 	//SUBREADprintf("gen_next_char: block_in_chain=%d\n", fp -> blocks_in_chain);
530 	//if(fp -> blocks_in_chain<3)SUBREADprintf("1CH: %d BLK, %d AVI\n", fp -> blocks_in_chain, fp -> stem.avail_in);
531 	if(fp -> blocks_in_chain<1){
532 		return EOF;
533 	}
534 	seekable_decompressed_block_t *cblk = fp -> block_rolling_chain+fp -> block_chain_current_no;
535 	int ret = cblk->block_txt[fp-> current_block_txt_read_ptr];
536 	fp-> current_block_txt_read_ptr++;
537 	if(fp-> current_block_txt_read_ptr == cblk -> block_txt_size){
538 		free(cblk->block_txt);
539 		free(cblk->linebreak_positions);
540 		fp-> current_block_txt_read_ptr = 0;
541 		fp -> block_chain_current_no++;
542 		if(fp -> block_chain_current_no>= SEEKGZ_CHAIN_BLOCKS_NO) fp -> block_chain_current_no=0;
543 		fp->blocks_in_chain --;
544 	}
545 	return ret;
546 }
547 
seekgz_close(seekable_zfile_t * fp)548 void seekgz_close(seekable_zfile_t * fp){
549 	int ii;
550 	fclose(fp -> gz_fp);
551 	free(fp -> in_zipped_buffer);
552 	for(ii = 0; ii < fp -> blocks_in_chain ; ii++){
553 		int iv = ii + fp -> block_chain_current_no;
554 		if(iv >=SEEKGZ_CHAIN_BLOCKS_NO) iv-=SEEKGZ_CHAIN_BLOCKS_NO;
555 		free(fp -> block_rolling_chain[iv].block_txt);
556 		free(fp -> block_rolling_chain[iv].linebreak_positions);
557 	}
558 
559 	inflateEnd(&fp -> stem);
560 	subread_destroy_lock(&fp -> write_lock);
561 }
562 
autozip_open(const char * fname,autozip_fp * fp)563 int autozip_open(const char * fname, autozip_fp * fp){
564 	int ret = -1;
565 	memset(fp, 0, sizeof(autozip_fp));
566 	strcpy(fp -> filename, fname);
567 
568 	FILE * tstfp = fopen(fname,"rb");
569 	if(!tstfp) return ret;
570 
571 	int cc1, cc2;
572 	cc1 = fgetc(tstfp);
573 	cc2 = fgetc(tstfp);
574 
575 	if(cc2 == 0x8b && cc1 == 0x1f){
576 		//fclose(tstfp);
577 		fp -> is_plain = 0;
578 		int iret = seekgz_open(fname, &fp -> gz_fp, tstfp);
579 		if(iret >= 0) ret = 1;
580 		else ret = -1;
581 	}else{
582 		if(cc1 != EOF && cc2 != EOF){
583 			fp -> first_chars[0] = cc1;
584 			fp -> first_chars[1] = cc2;
585 			fp -> is_first_chars = 2;
586 		}
587 		fp -> plain_fp = tstfp;
588 		fp -> is_plain = 1;
589 		ret = 0;
590 	}
591 
592 	return ret;
593 }
594 
autozip_close(autozip_fp * fp)595 void autozip_close(autozip_fp * fp){
596 	if(fp -> is_plain) fclose(fp -> plain_fp);
597 	else
598 		seekgz_close(&fp -> gz_fp);
599 }
600 
autozip_gets(autozip_fp * fp,char * buf,int buf_size)601 int autozip_gets(autozip_fp * fp, char * buf, int buf_size){
602 	int ret = 0;
603 	if(fp -> is_plain) {
604 		int base0 = 0;
605 		if( fp -> is_first_chars ){
606 			buf[0]=fp -> first_chars[0];
607 			buf[1]=fp -> first_chars[1];
608 			base0 = 2;
609 			fp -> is_first_chars = 0;
610 		}
611 		buf[2]=0;
612 
613 		char * retc = fgets(buf + base0, buf_size, fp -> plain_fp);
614 		if(retc == NULL && base0 == 0) ret = 0;
615 		else ret = strlen(buf);
616 	}else{
617 		seekgz_preload_buffer(&fp->gz_fp, NULL);
618 		ret = seekgz_gets(&fp -> gz_fp, buf, buf_size);
619 	}
620 	//SUBREADprintf("READBUF '%s'\n", buf);
621 	return ret;
622 }
623 
autozip_getch(autozip_fp * fp)624 int autozip_getch(autozip_fp * fp){
625 	int ret = 0;
626 	if(fp -> is_plain) {
627 		if( fp -> is_first_chars ) return fp -> first_chars[2-( fp -> is_first_chars -- ) ];
628 		ret = fgetc(fp -> plain_fp);
629 		if(ret==EOF)ret = -1;
630 	}else{
631 		ret = seekgz_next_int8(&fp -> gz_fp);
632 	}
633 	return ret;
634 }
635 
autozip_rewind(autozip_fp * fp)636 void autozip_rewind(autozip_fp * fp){
637 	char fname [MAX_FILE_NAME_LENGTH+1];
638 	strcpy(fname, fp -> filename);
639 
640 	autozip_close(fp);
641 	autozip_open(fname, fp);
642 }
643 
parallel_gzip_writer_init(parallel_gzip_writer_t * pzwtr,char * output_filename,int total_threads)644 void parallel_gzip_writer_init(parallel_gzip_writer_t * pzwtr, char * output_filename, int total_threads){
645 	memset(pzwtr, 0, sizeof(parallel_gzip_writer_t));
646 	pzwtr -> threads = total_threads;
647 	pzwtr -> thread_objs = calloc(sizeof(parallel_gzip_writer_thread_t), total_threads);
648 
649 	pzwtr -> os_file = f_subr_open(output_filename, "wb");
650 	fputc(31, pzwtr -> os_file);
651 	fputc(139, pzwtr -> os_file);
652 	fputc(8, pzwtr -> os_file);
653 	fputc(0, pzwtr -> os_file);
654 	fputc(0, pzwtr -> os_file); // MTIME x 4
655 	fputc(0, pzwtr -> os_file);
656 	fputc(0, pzwtr -> os_file);
657 	fputc(0, pzwtr -> os_file);
658 	fputc(4, pzwtr -> os_file); // XFL : fastest
659 	fputc(255, pzwtr -> os_file); // OS=unknown
660 	int x1;
661 	for(x1=0; x1<total_threads; x1++){
662 		pzwtr -> thread_objs[x1].thread_no = x1;
663 		deflateInit2(&pzwtr -> thread_objs[x1].zipper, SAMBAM_COMPRESS_LEVEL_NORMAL, Z_DEFLATED, SAMBAM_GZIP_WINDOW_BITS, 8, Z_DEFAULT_STRATEGY);
664 	}
665 	pzwtr -> CRC32 = crc32(0, NULL, 0);
666 }
667 
parallel_gzip_writer_add_text(parallel_gzip_writer_t * pzwtr,char * text,int tlen,int thread_no)668 void parallel_gzip_writer_add_text(parallel_gzip_writer_t * pzwtr, char * text, int tlen, int thread_no){
669 	parallel_gzip_writer_thread_t *tho = pzwtr -> thread_objs + thread_no;
670 	if(tlen + tho -> in_buffer_used >= PARALLEL_GZIP_TXT_BUFFER_SIZE){
671 		SUBREADprintf("Insufficient gzip buffer.\n");
672 		return;
673 	}
674 	memcpy(tho -> in_buffer + tho -> in_buffer_used, text, tlen);
675 	tho -> in_buffer_used += tlen;
676 }
677 
parallel_gzip_zip_texts(parallel_gzip_writer_t * pzwtr,int thread_no,int eof_marker)678 void parallel_gzip_zip_texts(parallel_gzip_writer_t * pzwtr, int thread_no, int eof_marker){
679 	parallel_gzip_writer_thread_t *tho = pzwtr -> thread_objs + thread_no;
680 	int write_txt_ptr = 0;
681 
682 	tho -> out_buffer_used = 0;
683 	tho -> CRC32 = crc_pos(tho -> in_buffer, tho -> in_buffer_used);
684 
685 	while(tho -> in_buffer_used - write_txt_ptr > 0 || eof_marker){
686 		tho -> zipper . next_in = (unsigned char*)tho -> in_buffer + write_txt_ptr;
687 		tho -> zipper . avail_in = tho -> in_buffer_used - write_txt_ptr;
688 		tho -> zipper . next_out = (unsigned char*)tho -> out_buffer + tho -> out_buffer_used;
689 		tho -> zipper . avail_out = PARALLEL_GZIP_ZIPPED_BUFFER_SIZE - tho -> out_buffer_used;
690 		int defret = deflate(&tho -> zipper, eof_marker?Z_FINISH:Z_FULL_FLUSH);
691 		int consumed_input = tho -> in_buffer_used - write_txt_ptr - tho -> zipper . avail_in;
692 		int generated_output = PARALLEL_GZIP_ZIPPED_BUFFER_SIZE - tho -> out_buffer_used - tho -> zipper.avail_out;
693 
694 		if(defret == Z_OK || defret == Z_STREAM_END ){
695 			write_txt_ptr += consumed_input;
696 			tho -> out_buffer_used += generated_output;
697 		}else{
698 			SUBREADprintf("Cannot compress the zipped output: %d with in_len=%d, consumed=%d and out_aval=%d\n", defret, tho -> in_buffer_used, consumed_input, tho -> zipper.avail_out);
699 			break;
700 		}
701 		if(eof_marker)break;
702 	}
703 
704 	tho -> plain_length = tho -> in_buffer_used;
705 	tho -> in_buffer_used =0;
706 }
707 
708 // because we have to keep sync between three fastq files, the flush function has to be manually called three times at the same time point.
709 // otherwise R1, I2 and R2 files will have inconsistent read orders.
710 // the outer program has to check if any of the three in_buffers is full.
parallel_gzip_writer_flush(parallel_gzip_writer_t * pzwtr,int thread_no)711 void parallel_gzip_writer_flush(parallel_gzip_writer_t * pzwtr, int thread_no){
712 	parallel_gzip_writer_thread_t *tho = pzwtr -> thread_objs + thread_no;
713 
714 	if(tho -> out_buffer_used>0){
715 		int fret = fwrite(tho -> out_buffer, 1, tho -> out_buffer_used, pzwtr -> os_file);
716 		if(fret != tho ->out_buffer_used) SUBREADprintf("Cannot write the zipped output: %d\n", fret);
717 
718 		if(tho -> plain_length){
719 			unsigned int CRCnew = crc32_combine( pzwtr -> CRC32, tho -> CRC32, tho -> plain_length );
720 			pzwtr -> plain_length += tho -> plain_length;
721 			pzwtr -> CRC32 = CRCnew;
722 		}
723 	}
724 	tho -> out_buffer_used =0;
725 	tho -> plain_length = 0;
726 }
727 
plgz_finish_in_buffers(parallel_gzip_writer_t * pzwtr)728 void plgz_finish_in_buffers(parallel_gzip_writer_t * pzwtr){
729 	int x1;
730 	for(x1=0; x1<pzwtr -> threads; x1++){
731 		if(pzwtr -> thread_objs[x1].in_buffer_used<1) continue;
732 		parallel_gzip_zip_texts(pzwtr, x1, 0);
733 		parallel_gzip_writer_flush(pzwtr, x1);
734 	}
735 }
736 
parallel_gzip_writer_close(parallel_gzip_writer_t * pzwtr)737 void parallel_gzip_writer_close(parallel_gzip_writer_t * pzwtr){
738 	int x1;
739 	plgz_finish_in_buffers(pzwtr);
740 
741 	// write the last "Z_FINISH" 0-len block to tell gzip that the zipped chunk end
742 	pzwtr -> thread_objs[0].in_buffer_used = 0;
743 	parallel_gzip_zip_texts(pzwtr, 0, 1);
744 	parallel_gzip_writer_flush(pzwtr, 0);
745 
746 	for(x1=0; x1<pzwtr -> threads; x1++)
747 		deflateEnd(&pzwtr -> thread_objs[x1].zipper);
748 
749 	fwrite(&pzwtr -> CRC32,        4, 1, pzwtr -> os_file);
750 	fwrite(&pzwtr -> plain_length, 4, 1, pzwtr -> os_file); // write the first (lowest) 4 bytes in a 64-bit integer -- as gzip file format required original (uncompressed) input data modulo 2^32.
751 	fclose(pzwtr -> os_file);
752 	free(pzwtr -> thread_objs);
753 }
754 
755 
parallel_gzip_writer_add_read_fqs_scRNA(parallel_gzip_writer_t ** outfps,char * bambin,int thread_no)756 int parallel_gzip_writer_add_read_fqs_scRNA(parallel_gzip_writer_t**outfps, char * bambin, int thread_no){
757 	int reclen=0;
758 	parallel_gzip_writer_t * outR1fp = outfps[0];
759 	parallel_gzip_writer_t * outI1fp = outfps[1];
760 	parallel_gzip_writer_t * outR2fp = outfps[2];
761 
762 	memcpy(&reclen, bambin,4);
763 	int flag = 0, l_seq = 0, l_read_name = 0, n_cigar_ops = 0;
764 	memcpy(&l_read_name, bambin+12,1);
765 	memcpy(&n_cigar_ops, bambin+16,2);
766 	memcpy(&flag, bambin+18,2);
767 	memcpy(&l_seq, bambin+20,4);
768 
769 	int x1=0;
770 
771 	parallel_gzip_writer_add_text(outR2fp,"@",1,thread_no);
772 	parallel_gzip_writer_add_text(outR1fp,"@",1,thread_no);
773 	parallel_gzip_writer_add_text(outI1fp,"@",1,thread_no);
774 	char * readname = bambin+36;
775 	parallel_gzip_writer_add_text(outR1fp,readname, 12,thread_no);
776 	parallel_gzip_writer_add_text(outR2fp,readname, 12,thread_no);
777 	parallel_gzip_writer_add_text(outI1fp,readname, 12,thread_no);
778 	parallel_gzip_writer_add_text(outR1fp,"\n",1,thread_no);
779 	parallel_gzip_writer_add_text(outR2fp,"\n",1,thread_no);
780 	parallel_gzip_writer_add_text(outI1fp,"\n",1,thread_no);
781 
782 	char * R1seq = bambin+36+13;
783 	int R1len = 0;
784 	for(R1len=0; R1seq[R1len] && R1seq[R1len]!='|' ;R1len++);
785 	char * R1qual = R1seq + R1len + 1;
786 	parallel_gzip_writer_add_text(outR1fp,R1seq, R1len,thread_no);
787 	parallel_gzip_writer_add_text(outR1fp,"\n+\n",3,thread_no);
788 	parallel_gzip_writer_add_text(outR1fp,R1qual, R1len,thread_no);
789 	parallel_gzip_writer_add_text(outR1fp,"\n",1,thread_no);
790 
791 	char * I1seq = R1qual + R1len + 1;
792 	int I1len = 0;
793 	for(I1len=0; I1seq[I1len] && I1seq[I1len]!='|' ;I1len++);
794 	char * I1qual = I1seq + I1len + 1;
795 	parallel_gzip_writer_add_text(outI1fp,I1seq, I1len,thread_no);
796 	parallel_gzip_writer_add_text(outI1fp,"\n+\n",3,thread_no);
797 	parallel_gzip_writer_add_text(outI1fp,I1qual, I1len,thread_no);
798 	parallel_gzip_writer_add_text(outI1fp,"\n",1,thread_no);
799 
800 	char oseq[l_seq+1];
801 	int seqbase = 36+l_read_name+n_cigar_ops*4;
802 	for(x1=0; x1<l_seq;x1++)oseq[x1]="=ACMGRSVTWYHKDBN"[(bambin[ seqbase+(x1/2)] >> (((x1%2)?0:1) *4) )&0xf];
803 	oseq[l_seq]=0;
804 	if(flag & 16) reverse_read(oseq,l_seq, GENE_SPACE_BASE);
805 	parallel_gzip_writer_add_text(outR2fp, oseq, l_seq,thread_no);
806 	parallel_gzip_writer_add_text(outR2fp,"\n+\n",3,thread_no);
807 
808 	seqbase = 36+l_read_name+n_cigar_ops*4 + ( l_seq+1 )/2;
809 	for(x1=0; x1<l_seq;x1++)oseq[x1]=33+bambin[ seqbase+x1];
810 	if(flag & 16)reverse_quality(oseq, l_seq);
811 	oseq[l_seq]=0;
812 	parallel_gzip_writer_add_text(outR2fp, oseq, l_seq,thread_no);
813 	parallel_gzip_writer_add_text(outR2fp,"\n",1,thread_no);
814 	return 0;
815 }
816 
817