1 #include <assert.h>
2 #include "core.h"
3 #include "seek-zlib.h"
4 #include "input-files.h"
5 #include "gene-algorithms.h"
6
7 #define SEEKGZ_INIT_TEXT_SIZE (1024*1024)
8 #define SEEKGZ_BINBUFF_SIZE (1*1024*1024)
9
seekgz_ftello(seekable_zfile_t * fp)10 unsigned long long seekgz_ftello(seekable_zfile_t * fp){
11 unsigned long long ret = ftello(fp -> gz_fp);
12 ret -= fp -> stem.avail_in;
13 return ret;
14 }
15
crc_pos(char * bin,int len)16 unsigned int crc_pos(char * bin, int len){
17 unsigned int crc0 = 0;//crc32(0, NULL, 0);
18 unsigned int CRC32 = crc32(crc0, (unsigned char *) bin, len);
19 return CRC32;
20 }
21
seekgz_try_read_some_zipped_data(seekable_zfile_t * fp,int * is_eof)22 void seekgz_try_read_some_zipped_data(seekable_zfile_t * fp, int * is_eof){
23 if(feof(fp->gz_fp)){
24 if(is_eof) (*is_eof)=1;
25 return;
26 }
27
28 assert(fp -> stem.avail_in >= 0);
29 if(fp -> stem.avail_in < SEEKGZ_BINBUFF_SIZE / 2 ) {
30 if(fp -> in_zipped_buff_read_ptr > 0 && fp -> stem.avail_in > 0){
31 int i;
32 for(i = 0 ; i < fp -> stem.avail_in ; i ++){
33 fp -> in_zipped_buffer[i] = fp -> in_zipped_buffer[i + fp -> in_zipped_buff_read_ptr];
34 }
35 }
36 fp -> in_zipped_buff_read_ptr = 0;
37
38 int readlen = fread(fp -> in_zipped_buffer + fp -> stem.avail_in, 1 , SEEKGZ_BINBUFF_SIZE - fp -> stem.avail_in , fp -> gz_fp);
39 if(readlen>0)
40 fp -> stem.avail_in += readlen;
41 fp -> stem.next_in = (unsigned char *)fp -> in_zipped_buffer;
42 }
43 }
44
seekgz_get_next_zipped_char(seekable_zfile_t * fp)45 int seekgz_get_next_zipped_char(seekable_zfile_t * fp){
46 seekgz_try_read_some_zipped_data(fp, NULL);
47 int ret = -1;
48
49 if(fp -> stem.avail_in > 0)
50 {
51 ret = fp -> in_zipped_buffer [ fp -> in_zipped_buff_read_ptr ++];
52 fp -> stem.next_in = (unsigned char *)(fp -> in_zipped_buffer + fp -> in_zipped_buff_read_ptr);
53 fp -> stem.avail_in --;
54 if(ret<0) ret=256+ret;
55 }
56 return ret;
57
58 }
59
seekgz_skip_gzfile_header(seekable_zfile_t * fp,int tail_size)60 int seekgz_skip_gzfile_header(seekable_zfile_t * fp, int tail_size){
61 int id1, id2;
62
63 if(tail_size){
64 for(id1=0; id1<tail_size; id1++)
65 seekgz_get_next_zipped_char(fp);
66 }
67 id1 = seekgz_get_next_zipped_char(fp);
68 id2 = seekgz_get_next_zipped_char(fp);
69
70 if(id1 != 31 || id2 != 139){
71 //SUBREADprintf("WRONG header:%d,%d\n", id1, id2);
72 return 1;
73 }
74
75 seekgz_get_next_zipped_char(fp); // CM
76 int FLG= seekgz_get_next_zipped_char(fp); // FLG
77 seekgz_get_next_zipped_char(fp);
78 seekgz_get_next_zipped_char(fp);
79 seekgz_get_next_zipped_char(fp);
80 seekgz_get_next_zipped_char(fp);
81 seekgz_get_next_zipped_char(fp); // XFL
82 seekgz_get_next_zipped_char(fp); // OS
83
84 //fprintf(stderr, "FLG=%d, XFL=%d\n" , FLG, XFL);
85
86 if(FLG & (1<<2)){ // FEXT
87 unsigned short XLEN=0;
88 XLEN = seekgz_get_next_zipped_char(fp);
89 XLEN += seekgz_get_next_zipped_char(fp)*256;
90 for(; XLEN>0; XLEN--)
91 seekgz_get_next_zipped_char(fp);
92 }
93
94 for(id1 = 3; id1 <=4; id1++){
95 if(FLG & (1<<id1)){ // FNAME or FCOMMENT
96 while(1){
97 int namec = seekgz_get_next_zipped_char(fp);
98 if(0==namec) break;
99 }
100 }
101 }
102 if(FLG & (1<<1)){ // FCRC
103 seekgz_get_next_zipped_char(fp);
104 seekgz_get_next_zipped_char(fp);
105 }
106
107 fp -> next_block_file_offset = seekgz_ftello(fp);
108 fp -> next_block_file_bits = 0;
109 fp -> rolling_dict_window_used = 0;
110 return 0;
111 }
112 int seekgz_load_more_blocks( seekable_zfile_t * fp , int bytes_ahead, subread_lock_t * read_lock );
113
seekgz_open(const char * fname,seekable_zfile_t * fp,FILE * old_fp)114 int seekgz_open(const char * fname, seekable_zfile_t * fp, FILE * old_fp){
115 memset(fp, 0, sizeof(seekable_zfile_t));
116 if(old_fp) fp -> gz_fp = old_fp;
117 else fp -> gz_fp = f_subr_open(fname, "rb");
118 if(NULL==fp -> gz_fp)return -1;
119
120 fp -> in_zipped_buffer = malloc(SEEKGZ_BINBUFF_SIZE);
121 subread_init_lock(&fp -> write_lock);
122 //fp -> txt_buffer_size = SEEKGZ_INIT_TEXT_SIZE;
123
124 fp -> blocks_in_chain = 0;
125 fp -> block_chain_current_no = 0; // no blocks in the chain
126 fp -> rolling_dict_window_used = 0;
127 fp -> current_block_txt_read_ptr = 0;
128
129 fp -> stem.zalloc = Z_NULL;
130 fp -> stem.zfree = Z_NULL;
131 fp -> stem.opaque = Z_NULL;
132 fp -> stem.avail_in = 0;
133 fp -> stem.next_in = Z_NULL;
134 if( old_fp ){
135 fp -> stem.avail_in = 2;
136 fp -> in_zipped_buffer[0] = 0x1f;
137 fp -> in_zipped_buffer[1] = 0x8b;
138 }
139
140 int ret = seekgz_skip_gzfile_header(fp,0);
141 if(ret) return 1;
142 ret = inflateInit2(&(fp -> stem), -15);
143 if(ret) return 1;
144
145 fp -> next_block_file_offset = seekgz_ftello( fp );
146 fp -> next_block_file_bits = 0;
147 ret = seekgz_load_more_blocks(fp, 300000, NULL);
148 //SUBREADprintf("Gzip : opened %s of %p\n", fname, fp);
149 return ret;
150 }
151
seekgz_tell(seekable_zfile_t * fp,seekable_position_t * pos)152 void seekgz_tell(seekable_zfile_t * fp, seekable_position_t * pos){
153 if(fp -> blocks_in_chain>0){
154 seekable_decompressed_block_t * curr_block = fp -> block_rolling_chain+fp -> block_chain_current_no;
155
156 pos -> block_gzfile_offset = curr_block -> block_start_in_file_offset;
157 pos -> block_gzfile_bits = curr_block -> block_start_in_file_bits;
158 memcpy(pos -> dict_window, curr_block -> block_dict_window, curr_block -> block_dict_window_size);
159 pos -> block_dict_window_size = curr_block -> block_dict_window_size;
160 pos -> in_block_text_offset = fp -> current_block_txt_read_ptr;
161 }else{
162 pos -> block_gzfile_offset = 0xffffffffffffffffllu;
163 }
164 }
165
166 int seekgz_load_1_block( seekable_zfile_t * fp , int empty_block_no);
seekgz_seek(seekable_zfile_t * fp,seekable_position_t * pos)167 void seekgz_seek(seekable_zfile_t * fp, seekable_position_t * pos){
168 //SUBREADprintf("SEEK:gzip -> fpos=%llu, block_offset=%d\n", pos -> block_gzfile_offset, pos -> in_block_text_offset);
169 //
170 if(pos -> block_gzfile_offset== 0xffffffffffffffffllu){
171 fp -> block_chain_current_no = 0;
172 fp -> blocks_in_chain = 0;
173 fp -> stem.avail_in = 0;
174 fseeko(fp->gz_fp, 0, SEEK_END);
175 fgetc(fp->gz_fp);
176 return ;
177 }
178 fseeko(fp->gz_fp, pos -> block_gzfile_offset - (pos -> block_gzfile_bits?1:0), SEEK_SET);
179
180 if(Z_OK!=inflateReset(&fp->stem))
181 SUBREADprintf("FATAL: UNABLE TO INIT STREAM.\n\n\n");
182 if(pos -> block_dict_window_size>0){
183 if(pos -> block_gzfile_bits){
184 char nch = fgetc(fp->gz_fp);
185 //fprintf(stderr, "SEEK 2 FPPOS:%llu, NCH=%d\n", ftello(fp->gz_fp) , nch);
186 inflatePrime(&fp->stem, pos -> block_gzfile_bits, nch>>(8-pos -> block_gzfile_bits));
187 }
188 if(Z_OK != inflateSetDictionary(&fp->stem, (unsigned char *)pos -> dict_window, pos -> block_dict_window_size))
189 SUBREADprintf("FATAL: UNABLE TO RESET STREAM.\n\n\n");
190 }
191
192 fp -> stem.avail_in = 0;
193
194 int ii;
195 for(ii = 0; ii < fp -> blocks_in_chain ; ii++){
196 int iv = ii + fp -> block_chain_current_no;
197 if(iv >=SEEKGZ_CHAIN_BLOCKS_NO) iv-=SEEKGZ_CHAIN_BLOCKS_NO;
198 free(fp -> block_rolling_chain[iv].block_txt);
199 free(fp -> block_rolling_chain[iv].linebreak_positions);
200 }
201
202 fp -> block_chain_current_no = 0;
203 fp -> blocks_in_chain = 0;
204 fp -> in_zipped_buff_read_ptr = 0;
205 fp -> current_block_txt_read_ptr = 0;
206
207 memcpy(fp -> rolling_dict_window, pos -> dict_window, pos -> block_dict_window_size);
208 fp -> rolling_dict_window_used= pos -> block_dict_window_size;
209 fp -> next_block_file_offset = pos -> block_gzfile_offset;
210 fp -> next_block_file_bits = pos -> block_gzfile_bits;
211 seekgz_load_more_blocks(fp, 30000, NULL);
212 fp -> current_block_txt_read_ptr = pos -> in_block_text_offset;
213
214 //SUBREADprintf("SEK: %d WIN=%d BLK (%d / %d), %d AVI\n", fp -> blocks_in_chain, pos -> block_dict_window_size, fp -> current_block_txt_read_ptr , fp -> block_rolling_chain[0].block_txt_size, fp -> stem.avail_in);
215 }
216
217
seekgz_find_linebreaks(seekable_zfile_t * fp,int empty_block_no)218 void seekgz_find_linebreaks(seekable_zfile_t * fp, int empty_block_no){
219 int lbks= 0, arrsize = 5000;
220 unsigned int * arr = malloc(sizeof(int) * arrsize);
221
222 int ii;
223 for(ii = 0; ii < fp -> block_rolling_chain[empty_block_no].block_txt_size ; ii++){
224 char nch = fp-> block_rolling_chain[empty_block_no].block_txt[ii];
225 if(nch == '\n'){
226 if(lbks >= arrsize){
227 arrsize*=2;
228 arr = realloc(arr, sizeof(int) * arrsize);
229 }
230
231 arr[lbks++] = ii;
232 }
233 }
234 fp -> block_rolling_chain[empty_block_no].linebreak_positions = arr;
235 fp -> block_rolling_chain[empty_block_no].linebreaks = lbks;
236 }
237
seekgz_update_current_window(seekable_zfile_t * fp,char * txt,int tlen)238 void seekgz_update_current_window(seekable_zfile_t * fp , char * txt, int tlen){
239 int fp_start=0, new_start=0, cplen=0;
240
241 int moveback = 0;
242 if(tlen + fp -> rolling_dict_window_used > SEEKGZ_ZLIB_WINDOW_SIZE)
243 moveback = tlen + fp -> rolling_dict_window_used - SEEKGZ_ZLIB_WINDOW_SIZE;
244 if(moveback>0 && moveback < fp -> rolling_dict_window_used){
245 int ii;
246 for(ii = 0; ii < fp -> rolling_dict_window_used - moveback; ii++){
247 fp -> rolling_dict_window[ii] = fp -> rolling_dict_window[ii + moveback];
248 }
249 }
250
251 fp -> rolling_dict_window_used = moveback < fp -> rolling_dict_window_used ? (fp -> rolling_dict_window_used - moveback):0;
252
253 if(tlen > SEEKGZ_ZLIB_WINDOW_SIZE){
254 fp_start =0;
255 new_start = tlen - SEEKGZ_ZLIB_WINDOW_SIZE;
256 cplen = SEEKGZ_ZLIB_WINDOW_SIZE;
257 }else{
258 new_start = 0;
259 fp_start = fp -> rolling_dict_window_used;
260 cplen = tlen;
261 }
262
263 memcpy(fp -> rolling_dict_window + fp_start, txt + new_start, cplen);
264 fp -> rolling_dict_window_used += cplen;
265 }
266
seekgz_eof(seekable_zfile_t * fp)267 int seekgz_eof(seekable_zfile_t * fp){
268 if(fp -> stem.avail_in < 1 && feof(fp->gz_fp))return 1;
269 return 0;
270 }
271
272
273 #define SEEKGZ_ONE_BLOCK_TEXT_INIT_SIZE (256*1024)
274 // 1block ret < 0 : bad error
275 // 1block ret > 0 : data is loaded
276 // 1block ret == 0 : no data is loaded
277
278 // A block MUST have at least 10000 bytes otherwise multiple blocks are concatinated
279
280 #define MINIMUM_GZIP_BLOCK_SIZE 10000
281
seekgz_load_1_block(seekable_zfile_t * fp,int empty_block_no)282 int seekgz_load_1_block( seekable_zfile_t * fp , int empty_block_no){
283 if(seekgz_eof(fp))return 0;
284
285 char * out_txt = malloc(SEEKGZ_ONE_BLOCK_TEXT_INIT_SIZE);
286 int out_txt_size = SEEKGZ_ONE_BLOCK_TEXT_INIT_SIZE;
287 int out_txt_used = 0;
288
289 memcpy(fp -> block_rolling_chain[empty_block_no].block_dict_window, fp -> rolling_dict_window, fp -> rolling_dict_window_used);
290 fp -> block_rolling_chain[empty_block_no].block_dict_window_size = fp -> rolling_dict_window_used;
291 //fp -> rolling_dict_window_used = 0;
292
293 fp -> block_rolling_chain[empty_block_no].block_start_in_file_offset = fp -> next_block_file_offset;
294 fp -> block_rolling_chain[empty_block_no].block_start_in_file_bits = fp -> next_block_file_bits;
295
296 int is_data_error = 0;
297 int is_eof = 0;
298 while(1){
299 int is_block_end = 0;
300 int is_gzip_unit_end = 0;
301 while(1){
302 seekgz_try_read_some_zipped_data(fp, &is_eof);
303 if(fp -> stem.avail_in < 1 && is_eof)break;
304
305 if(out_txt_size < out_txt_used *4/3){
306 out_txt_size *= 2;
307 out_txt = realloc(out_txt,out_txt_size);
308 }
309
310 int old_avail_in = fp -> stem.avail_in;
311 fp -> stem.avail_out = out_txt_size - out_txt_used;
312 fp -> stem.next_out = (unsigned char *)out_txt + out_txt_used;
313
314 void * old_next_in = fp -> stem.next_in;
315
316 //SUBREADprintf("i INFLATINHG: FILEPOS=%llu IN_AVAIL=%d OUT_AVAIL=%d\n", seekgz_ftello(fp), fp -> stem.avail_in, fp -> stem.avail_out);
317 int ret_ifl = inflate(&(fp -> stem), Z_BLOCK);
318 int have = (out_txt_size - out_txt_used) - fp -> stem.avail_out;
319 if(ret_ifl != Z_OK && ret_ifl != Z_STREAM_END){
320 SUBREADprintf("ERR INFLATINHG: RET=%d BY IN %d zipped bytes ==> %d PLAIN TXT\n", ret_ifl, old_avail_in - fp -> stem.avail_in, have);
321 SUBREADprintf("ERR INFLATINHG: BEND=%d\n", ( fp -> stem.data_type & 128 )&& !(fp -> stem.data_type & 64));
322 }
323
324 //if(ret_ifl != Z_STREAM_END){
325 // SUBREADprintf("INF: STREAM_END\n");
326 //}
327 if(ret_ifl != Z_OK && ret_ifl != Z_STREAM_END){
328 is_data_error = 1;
329 break;
330 }
331 int zipped_data_used = (void *)fp -> stem.next_in - old_next_in;
332
333 fp -> in_zipped_buff_read_ptr += zipped_data_used;
334 if( ( fp -> stem.data_type & 128 )&& !(fp -> stem.data_type & 64)) is_block_end = 1;
335 if(ret_ifl == Z_STREAM_END) is_gzip_unit_end = 1;
336
337 //if(is_block_end)
338 // fp -> rolling_dict_window_used = 0;
339 //else
340 seekgz_update_current_window(fp, out_txt + out_txt_used, have);
341
342 out_txt_used += have;
343
344 if(is_block_end){
345 fp -> next_block_file_offset = seekgz_ftello(fp);
346 fp -> next_block_file_bits = fp->stem.data_type & 7;
347 }
348
349 if(is_gzip_unit_end){
350 seekgz_skip_gzfile_header(fp, 8);
351 inflateReset(&fp->stem);
352 }
353
354 if(is_block_end || is_gzip_unit_end) break;
355 }
356 if(out_txt_used >= MINIMUM_GZIP_BLOCK_SIZE || is_data_error ) break;
357 if(fp -> stem.avail_in < 1 && is_eof)break;
358 }
359
360 if(is_data_error) return -1;
361
362 //SUBREADprintf("o END_1BLK: GOT %d PLAIN TXT i; first char: '%c' fp==%p\n\n", out_txt_used, out_txt[0], fp);
363 if(out_txt_used>0){
364 fp -> block_rolling_chain[empty_block_no].block_txt = out_txt;
365 fp -> block_rolling_chain[empty_block_no].block_txt_size = out_txt_used;
366 seekgz_find_linebreaks(fp, empty_block_no);
367 return out_txt_used;
368 }else{
369 free(out_txt);
370 return 0;
371 }
372 }
373
seekgz_load_more_blocks(seekable_zfile_t * fp,int bytes_ahead,subread_lock_t * read_lock)374 int seekgz_load_more_blocks( seekable_zfile_t * fp , int bytes_ahead, subread_lock_t * read_lock ){
375 int loaded_bytes=0, ret=0;
376
377 while(1){
378 int empty_block_no = -1;
379 subread_lock_occupy(&fp -> write_lock );
380 if(read_lock) subread_lock_occupy(read_lock);
381 if(fp -> blocks_in_chain < SEEKGZ_CHAIN_BLOCKS_NO){
382 empty_block_no = fp-> block_chain_current_no + fp -> blocks_in_chain;
383 if(empty_block_no >= SEEKGZ_CHAIN_BLOCKS_NO) empty_block_no -= SEEKGZ_CHAIN_BLOCKS_NO;
384 }
385 if(read_lock) subread_lock_release(read_lock);
386
387 if(empty_block_no < 0 || (bytes_ahead>=0 && loaded_bytes >= bytes_ahead)){
388 subread_lock_release(&fp -> write_lock );
389 break;
390 }
391 int oneblk_ret = seekgz_load_1_block(fp, empty_block_no);
392
393 //SUBREADprintf("LOAD_1BLK: RET=%d EMPTY_NO=%d\n", oneblk_ret, empty_block_no);
394 // 1block ret < 0 : bad error
395 // 1block ret > 0 : no data is loaded
396 // 1block ret == 0 : normal
397 if(oneblk_ret < 0){
398 subread_lock_release(&fp -> write_lock );
399 ret = oneblk_ret;
400 break;
401 }else if(0 < oneblk_ret){
402 if(read_lock) subread_lock_occupy(read_lock);
403 fp -> blocks_in_chain++;
404 if(read_lock) subread_lock_release(read_lock);
405 loaded_bytes+= oneblk_ret;
406 }else if(seekgz_eof(fp)){
407 subread_lock_release(&fp -> write_lock );
408 break;
409 }
410 subread_lock_release(&fp -> write_lock );
411 }
412 return ret;
413 }
414
415 int seqs = 0;
416
seekgz_preload_buffer(seekable_zfile_t * fp,subread_lock_t * read_lock)417 int seekgz_preload_buffer( seekable_zfile_t * fp , subread_lock_t * read_lock){
418 int do_preload = 0;
419 seqs+=1;// (unsigned int)(fp->rolling_dict_window[0]);
420 if(( read_lock || !fp -> has_multi_thread_accessed) && fp -> blocks_in_chain < 3 && !seekgz_eof(fp) ) do_preload =1;
421 else if(read_lock && fp -> blocks_in_chain < SEEKGZ_CHAIN_BLOCKS_NO ){
422 if(seqs >= 2000) {
423 do_preload = 1;
424 seqs = 0;
425 }
426 }
427 if(!fp -> has_multi_thread_accessed && read_lock) fp -> has_multi_thread_accessed =1;
428
429 if(do_preload){
430 int decompress_ret = seekgz_load_more_blocks(fp, -1, read_lock);
431 return decompress_ret;
432 }
433 return 0;
434 }
435
436 // return : > 0 : N bytes loaded
437 // == 0 : EOF
438 // < 0 : data error
seekgz_gets(seekable_zfile_t * fp,char * buff,int buff_len)439 int seekgz_gets(seekable_zfile_t * fp, char * buff, int buff_len){
440 //if(fp -> blocks_in_chain<3)SUBREADprintf("GTS: %d BLK, %d AVI\n", fp -> blocks_in_chain, fp -> stem.avail_in);
441 int line_write_ptr = 0, is_end_line = 0;
442 if(fp->blocks_in_chain < 1 && seekgz_eof(fp)){
443 return 0;
444 }
445 while(1){
446 int consumed_bytes;
447
448 assert(fp->blocks_in_chain>0);
449 seekable_decompressed_block_t *cblk = fp -> block_rolling_chain+fp -> block_chain_current_no;
450 if( cblk -> linebreaks >0 && fp-> current_block_txt_read_ptr <= cblk->linebreak_positions[cblk -> linebreaks-1] ){
451 if(fp-> current_block_txt_read_ptr <=cblk->linebreak_positions[0]){
452 consumed_bytes = cblk->linebreak_positions[0] - fp-> current_block_txt_read_ptr +1;
453 }else{
454 int high_idx = cblk -> linebreaks - 1 , low_idx = 0, idx = -1;
455 while(1){
456 if(high_idx <= low_idx+1) {
457 idx = min(low_idx, high_idx);
458 break;
459 }
460 int mid_idx = (high_idx+low_idx)/2;
461 if(cblk->linebreak_positions[mid_idx] > fp -> current_block_txt_read_ptr)
462 high_idx = mid_idx;
463 else if( cblk->linebreak_positions[mid_idx] < fp -> current_block_txt_read_ptr )
464 low_idx = mid_idx;
465 else{
466 idx = high_idx;
467 break;
468 }
469 }
470
471 if(idx>0)idx--;
472 while( cblk->linebreak_positions[idx+1] < fp -> current_block_txt_read_ptr) idx++;
473 assert( idx < cblk -> linebreaks-1 );
474 consumed_bytes = cblk->linebreak_positions[idx+1] - fp-> current_block_txt_read_ptr +1;
475 }
476 is_end_line = 1;
477 } else
478 consumed_bytes = cblk -> block_txt_size - fp-> current_block_txt_read_ptr ;
479
480 if(buff){
481 int cp_bytes = min(consumed_bytes, buff_len - line_write_ptr);
482 memcpy( buff + line_write_ptr , cblk-> block_txt + fp-> current_block_txt_read_ptr, cp_bytes );
483 line_write_ptr += cp_bytes;
484 if(buff)buff[line_write_ptr] = 0;
485 }
486
487 //SUBREADprintf("OUT_TXT in BLC %d / %d %p ENDL=%d CUNS_BYTES=%d / %d / %d : '''%s'''\n", fp -> block_chain_current_no, fp -> blocks_in_chain, fp, is_end_line, consumed_bytes, fp-> current_block_txt_read_ptr, cblk -> block_txt_size , buff);
488
489 fp-> current_block_txt_read_ptr += consumed_bytes;
490
491 if( fp-> current_block_txt_read_ptr >= cblk -> block_txt_size ){
492 //SUBREADprintf("CHAIN CONSUME_0 : %d ( - 1 ) %p\n", fp -> block_chain_current_no, cblk->block_txt);
493 free(cblk->block_txt);
494 //SUBREADprintf("CHAIN CONSUME_X : %d ( - 1 ) %p\n", fp -> block_chain_current_no, cblk->linebreak_positions);
495 free(cblk->linebreak_positions);
496 //SUBREADprintf("CHAIN CONSUME_Z : %d ( - 1 )\n", fp -> block_chain_current_no);
497 fp-> current_block_txt_read_ptr = 0;
498 fp -> block_chain_current_no++;
499 if(fp -> block_chain_current_no>= SEEKGZ_CHAIN_BLOCKS_NO) fp -> block_chain_current_no=0;
500 fp->blocks_in_chain --;
501 }
502
503 if(is_end_line) break;
504 }
505 return line_write_ptr;
506 }
507
seekgz_next_int8(seekable_zfile_t * fp)508 int seekgz_next_int8(seekable_zfile_t * fp){
509 if(fp -> blocks_in_chain<1){
510 seekgz_load_more_blocks(fp, -1, NULL);
511 if(fp -> blocks_in_chain<1) return -1;
512 }
513 seekable_decompressed_block_t *cblk = fp -> block_rolling_chain+fp -> block_chain_current_no;
514 int ret = cblk->block_txt[fp-> current_block_txt_read_ptr];
515 fp-> current_block_txt_read_ptr++;
516 if(fp-> current_block_txt_read_ptr == cblk -> block_txt_size){
517 free(cblk->block_txt);
518 free(cblk->linebreak_positions);
519 fp-> current_block_txt_read_ptr = 0;
520 fp -> block_chain_current_no++;
521 if(fp -> block_chain_current_no>= SEEKGZ_CHAIN_BLOCKS_NO) fp -> block_chain_current_no=0;
522 fp->blocks_in_chain --;
523 }
524 return ret<0?256+ret:ret;
525 }
526
527
seekgz_next_char(seekable_zfile_t * fp)528 int seekgz_next_char(seekable_zfile_t * fp){ // MUST BE PROTECTED BY read_lock
529 //SUBREADprintf("gen_next_char: block_in_chain=%d\n", fp -> blocks_in_chain);
530 //if(fp -> blocks_in_chain<3)SUBREADprintf("1CH: %d BLK, %d AVI\n", fp -> blocks_in_chain, fp -> stem.avail_in);
531 if(fp -> blocks_in_chain<1){
532 return EOF;
533 }
534 seekable_decompressed_block_t *cblk = fp -> block_rolling_chain+fp -> block_chain_current_no;
535 int ret = cblk->block_txt[fp-> current_block_txt_read_ptr];
536 fp-> current_block_txt_read_ptr++;
537 if(fp-> current_block_txt_read_ptr == cblk -> block_txt_size){
538 free(cblk->block_txt);
539 free(cblk->linebreak_positions);
540 fp-> current_block_txt_read_ptr = 0;
541 fp -> block_chain_current_no++;
542 if(fp -> block_chain_current_no>= SEEKGZ_CHAIN_BLOCKS_NO) fp -> block_chain_current_no=0;
543 fp->blocks_in_chain --;
544 }
545 return ret;
546 }
547
seekgz_close(seekable_zfile_t * fp)548 void seekgz_close(seekable_zfile_t * fp){
549 int ii;
550 fclose(fp -> gz_fp);
551 free(fp -> in_zipped_buffer);
552 for(ii = 0; ii < fp -> blocks_in_chain ; ii++){
553 int iv = ii + fp -> block_chain_current_no;
554 if(iv >=SEEKGZ_CHAIN_BLOCKS_NO) iv-=SEEKGZ_CHAIN_BLOCKS_NO;
555 free(fp -> block_rolling_chain[iv].block_txt);
556 free(fp -> block_rolling_chain[iv].linebreak_positions);
557 }
558
559 inflateEnd(&fp -> stem);
560 subread_destroy_lock(&fp -> write_lock);
561 }
562
autozip_open(const char * fname,autozip_fp * fp)563 int autozip_open(const char * fname, autozip_fp * fp){
564 int ret = -1;
565 memset(fp, 0, sizeof(autozip_fp));
566 strcpy(fp -> filename, fname);
567
568 FILE * tstfp = fopen(fname,"rb");
569 if(!tstfp) return ret;
570
571 int cc1, cc2;
572 cc1 = fgetc(tstfp);
573 cc2 = fgetc(tstfp);
574
575 if(cc2 == 0x8b && cc1 == 0x1f){
576 //fclose(tstfp);
577 fp -> is_plain = 0;
578 int iret = seekgz_open(fname, &fp -> gz_fp, tstfp);
579 if(iret >= 0) ret = 1;
580 else ret = -1;
581 }else{
582 if(cc1 != EOF && cc2 != EOF){
583 fp -> first_chars[0] = cc1;
584 fp -> first_chars[1] = cc2;
585 fp -> is_first_chars = 2;
586 }
587 fp -> plain_fp = tstfp;
588 fp -> is_plain = 1;
589 ret = 0;
590 }
591
592 return ret;
593 }
594
autozip_close(autozip_fp * fp)595 void autozip_close(autozip_fp * fp){
596 if(fp -> is_plain) fclose(fp -> plain_fp);
597 else
598 seekgz_close(&fp -> gz_fp);
599 }
600
autozip_gets(autozip_fp * fp,char * buf,int buf_size)601 int autozip_gets(autozip_fp * fp, char * buf, int buf_size){
602 int ret = 0;
603 if(fp -> is_plain) {
604 int base0 = 0;
605 if( fp -> is_first_chars ){
606 buf[0]=fp -> first_chars[0];
607 buf[1]=fp -> first_chars[1];
608 base0 = 2;
609 fp -> is_first_chars = 0;
610 }
611 buf[2]=0;
612
613 char * retc = fgets(buf + base0, buf_size, fp -> plain_fp);
614 if(retc == NULL && base0 == 0) ret = 0;
615 else ret = strlen(buf);
616 }else{
617 seekgz_preload_buffer(&fp->gz_fp, NULL);
618 ret = seekgz_gets(&fp -> gz_fp, buf, buf_size);
619 }
620 //SUBREADprintf("READBUF '%s'\n", buf);
621 return ret;
622 }
623
autozip_getch(autozip_fp * fp)624 int autozip_getch(autozip_fp * fp){
625 int ret = 0;
626 if(fp -> is_plain) {
627 if( fp -> is_first_chars ) return fp -> first_chars[2-( fp -> is_first_chars -- ) ];
628 ret = fgetc(fp -> plain_fp);
629 if(ret==EOF)ret = -1;
630 }else{
631 ret = seekgz_next_int8(&fp -> gz_fp);
632 }
633 return ret;
634 }
635
autozip_rewind(autozip_fp * fp)636 void autozip_rewind(autozip_fp * fp){
637 char fname [MAX_FILE_NAME_LENGTH+1];
638 strcpy(fname, fp -> filename);
639
640 autozip_close(fp);
641 autozip_open(fname, fp);
642 }
643
parallel_gzip_writer_init(parallel_gzip_writer_t * pzwtr,char * output_filename,int total_threads)644 void parallel_gzip_writer_init(parallel_gzip_writer_t * pzwtr, char * output_filename, int total_threads){
645 memset(pzwtr, 0, sizeof(parallel_gzip_writer_t));
646 pzwtr -> threads = total_threads;
647 pzwtr -> thread_objs = calloc(sizeof(parallel_gzip_writer_thread_t), total_threads);
648
649 pzwtr -> os_file = f_subr_open(output_filename, "wb");
650 fputc(31, pzwtr -> os_file);
651 fputc(139, pzwtr -> os_file);
652 fputc(8, pzwtr -> os_file);
653 fputc(0, pzwtr -> os_file);
654 fputc(0, pzwtr -> os_file); // MTIME x 4
655 fputc(0, pzwtr -> os_file);
656 fputc(0, pzwtr -> os_file);
657 fputc(0, pzwtr -> os_file);
658 fputc(4, pzwtr -> os_file); // XFL : fastest
659 fputc(255, pzwtr -> os_file); // OS=unknown
660 int x1;
661 for(x1=0; x1<total_threads; x1++){
662 pzwtr -> thread_objs[x1].thread_no = x1;
663 deflateInit2(&pzwtr -> thread_objs[x1].zipper, SAMBAM_COMPRESS_LEVEL_NORMAL, Z_DEFLATED, SAMBAM_GZIP_WINDOW_BITS, 8, Z_DEFAULT_STRATEGY);
664 }
665 pzwtr -> CRC32 = crc32(0, NULL, 0);
666 }
667
parallel_gzip_writer_add_text(parallel_gzip_writer_t * pzwtr,char * text,int tlen,int thread_no)668 void parallel_gzip_writer_add_text(parallel_gzip_writer_t * pzwtr, char * text, int tlen, int thread_no){
669 parallel_gzip_writer_thread_t *tho = pzwtr -> thread_objs + thread_no;
670 if(tlen + tho -> in_buffer_used >= PARALLEL_GZIP_TXT_BUFFER_SIZE){
671 SUBREADprintf("Insufficient gzip buffer.\n");
672 return;
673 }
674 memcpy(tho -> in_buffer + tho -> in_buffer_used, text, tlen);
675 tho -> in_buffer_used += tlen;
676 }
677
parallel_gzip_zip_texts(parallel_gzip_writer_t * pzwtr,int thread_no,int eof_marker)678 void parallel_gzip_zip_texts(parallel_gzip_writer_t * pzwtr, int thread_no, int eof_marker){
679 parallel_gzip_writer_thread_t *tho = pzwtr -> thread_objs + thread_no;
680 int write_txt_ptr = 0;
681
682 tho -> out_buffer_used = 0;
683 tho -> CRC32 = crc_pos(tho -> in_buffer, tho -> in_buffer_used);
684
685 while(tho -> in_buffer_used - write_txt_ptr > 0 || eof_marker){
686 tho -> zipper . next_in = (unsigned char*)tho -> in_buffer + write_txt_ptr;
687 tho -> zipper . avail_in = tho -> in_buffer_used - write_txt_ptr;
688 tho -> zipper . next_out = (unsigned char*)tho -> out_buffer + tho -> out_buffer_used;
689 tho -> zipper . avail_out = PARALLEL_GZIP_ZIPPED_BUFFER_SIZE - tho -> out_buffer_used;
690 int defret = deflate(&tho -> zipper, eof_marker?Z_FINISH:Z_FULL_FLUSH);
691 int consumed_input = tho -> in_buffer_used - write_txt_ptr - tho -> zipper . avail_in;
692 int generated_output = PARALLEL_GZIP_ZIPPED_BUFFER_SIZE - tho -> out_buffer_used - tho -> zipper.avail_out;
693
694 if(defret == Z_OK || defret == Z_STREAM_END ){
695 write_txt_ptr += consumed_input;
696 tho -> out_buffer_used += generated_output;
697 }else{
698 SUBREADprintf("Cannot compress the zipped output: %d with in_len=%d, consumed=%d and out_aval=%d\n", defret, tho -> in_buffer_used, consumed_input, tho -> zipper.avail_out);
699 break;
700 }
701 if(eof_marker)break;
702 }
703
704 tho -> plain_length = tho -> in_buffer_used;
705 tho -> in_buffer_used =0;
706 }
707
708 // because we have to keep sync between three fastq files, the flush function has to be manually called three times at the same time point.
709 // otherwise R1, I2 and R2 files will have inconsistent read orders.
710 // the outer program has to check if any of the three in_buffers is full.
parallel_gzip_writer_flush(parallel_gzip_writer_t * pzwtr,int thread_no)711 void parallel_gzip_writer_flush(parallel_gzip_writer_t * pzwtr, int thread_no){
712 parallel_gzip_writer_thread_t *tho = pzwtr -> thread_objs + thread_no;
713
714 if(tho -> out_buffer_used>0){
715 int fret = fwrite(tho -> out_buffer, 1, tho -> out_buffer_used, pzwtr -> os_file);
716 if(fret != tho ->out_buffer_used) SUBREADprintf("Cannot write the zipped output: %d\n", fret);
717
718 if(tho -> plain_length){
719 unsigned int CRCnew = crc32_combine( pzwtr -> CRC32, tho -> CRC32, tho -> plain_length );
720 pzwtr -> plain_length += tho -> plain_length;
721 pzwtr -> CRC32 = CRCnew;
722 }
723 }
724 tho -> out_buffer_used =0;
725 tho -> plain_length = 0;
726 }
727
plgz_finish_in_buffers(parallel_gzip_writer_t * pzwtr)728 void plgz_finish_in_buffers(parallel_gzip_writer_t * pzwtr){
729 int x1;
730 for(x1=0; x1<pzwtr -> threads; x1++){
731 if(pzwtr -> thread_objs[x1].in_buffer_used<1) continue;
732 parallel_gzip_zip_texts(pzwtr, x1, 0);
733 parallel_gzip_writer_flush(pzwtr, x1);
734 }
735 }
736
parallel_gzip_writer_close(parallel_gzip_writer_t * pzwtr)737 void parallel_gzip_writer_close(parallel_gzip_writer_t * pzwtr){
738 int x1;
739 plgz_finish_in_buffers(pzwtr);
740
741 // write the last "Z_FINISH" 0-len block to tell gzip that the zipped chunk end
742 pzwtr -> thread_objs[0].in_buffer_used = 0;
743 parallel_gzip_zip_texts(pzwtr, 0, 1);
744 parallel_gzip_writer_flush(pzwtr, 0);
745
746 for(x1=0; x1<pzwtr -> threads; x1++)
747 deflateEnd(&pzwtr -> thread_objs[x1].zipper);
748
749 fwrite(&pzwtr -> CRC32, 4, 1, pzwtr -> os_file);
750 fwrite(&pzwtr -> plain_length, 4, 1, pzwtr -> os_file); // write the first (lowest) 4 bytes in a 64-bit integer -- as gzip file format required original (uncompressed) input data modulo 2^32.
751 fclose(pzwtr -> os_file);
752 free(pzwtr -> thread_objs);
753 }
754
755
parallel_gzip_writer_add_read_fqs_scRNA(parallel_gzip_writer_t ** outfps,char * bambin,int thread_no)756 int parallel_gzip_writer_add_read_fqs_scRNA(parallel_gzip_writer_t**outfps, char * bambin, int thread_no){
757 int reclen=0;
758 parallel_gzip_writer_t * outR1fp = outfps[0];
759 parallel_gzip_writer_t * outI1fp = outfps[1];
760 parallel_gzip_writer_t * outR2fp = outfps[2];
761
762 memcpy(&reclen, bambin,4);
763 int flag = 0, l_seq = 0, l_read_name = 0, n_cigar_ops = 0;
764 memcpy(&l_read_name, bambin+12,1);
765 memcpy(&n_cigar_ops, bambin+16,2);
766 memcpy(&flag, bambin+18,2);
767 memcpy(&l_seq, bambin+20,4);
768
769 int x1=0;
770
771 parallel_gzip_writer_add_text(outR2fp,"@",1,thread_no);
772 parallel_gzip_writer_add_text(outR1fp,"@",1,thread_no);
773 parallel_gzip_writer_add_text(outI1fp,"@",1,thread_no);
774 char * readname = bambin+36;
775 parallel_gzip_writer_add_text(outR1fp,readname, 12,thread_no);
776 parallel_gzip_writer_add_text(outR2fp,readname, 12,thread_no);
777 parallel_gzip_writer_add_text(outI1fp,readname, 12,thread_no);
778 parallel_gzip_writer_add_text(outR1fp,"\n",1,thread_no);
779 parallel_gzip_writer_add_text(outR2fp,"\n",1,thread_no);
780 parallel_gzip_writer_add_text(outI1fp,"\n",1,thread_no);
781
782 char * R1seq = bambin+36+13;
783 int R1len = 0;
784 for(R1len=0; R1seq[R1len] && R1seq[R1len]!='|' ;R1len++);
785 char * R1qual = R1seq + R1len + 1;
786 parallel_gzip_writer_add_text(outR1fp,R1seq, R1len,thread_no);
787 parallel_gzip_writer_add_text(outR1fp,"\n+\n",3,thread_no);
788 parallel_gzip_writer_add_text(outR1fp,R1qual, R1len,thread_no);
789 parallel_gzip_writer_add_text(outR1fp,"\n",1,thread_no);
790
791 char * I1seq = R1qual + R1len + 1;
792 int I1len = 0;
793 for(I1len=0; I1seq[I1len] && I1seq[I1len]!='|' ;I1len++);
794 char * I1qual = I1seq + I1len + 1;
795 parallel_gzip_writer_add_text(outI1fp,I1seq, I1len,thread_no);
796 parallel_gzip_writer_add_text(outI1fp,"\n+\n",3,thread_no);
797 parallel_gzip_writer_add_text(outI1fp,I1qual, I1len,thread_no);
798 parallel_gzip_writer_add_text(outI1fp,"\n",1,thread_no);
799
800 char oseq[l_seq+1];
801 int seqbase = 36+l_read_name+n_cigar_ops*4;
802 for(x1=0; x1<l_seq;x1++)oseq[x1]="=ACMGRSVTWYHKDBN"[(bambin[ seqbase+(x1/2)] >> (((x1%2)?0:1) *4) )&0xf];
803 oseq[l_seq]=0;
804 if(flag & 16) reverse_read(oseq,l_seq, GENE_SPACE_BASE);
805 parallel_gzip_writer_add_text(outR2fp, oseq, l_seq,thread_no);
806 parallel_gzip_writer_add_text(outR2fp,"\n+\n",3,thread_no);
807
808 seqbase = 36+l_read_name+n_cigar_ops*4 + ( l_seq+1 )/2;
809 for(x1=0; x1<l_seq;x1++)oseq[x1]=33+bambin[ seqbase+x1];
810 if(flag & 16)reverse_quality(oseq, l_seq);
811 oseq[l_seq]=0;
812 parallel_gzip_writer_add_text(outR2fp, oseq, l_seq,thread_no);
813 parallel_gzip_writer_add_text(outR2fp,"\n",1,thread_no);
814 return 0;
815 }
816
817