1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <assert.h>
5 #include <unistd.h>
6 #ifndef __MINGW32__
7 #include <sys/mman.h>
8 #endif
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <fcntl.h>
12 #include "core.h"
13 #include "core-bigtable.h"
14 #include "gene-algorithms.h"
15 
16 #define TABLE_PRELOAD_SIZE 24
17 
18 #define calc_offset(pair_NO)  ((pair_NO) * (1 + global_context -> input_reads.is_paired_end_reads) + is_second_read) * (sizeof(short) * 3 * global_context -> config.big_margin_record_size + (sizeof(mapping_result_t) + global_context -> config.do_breakpoint_detection*sizeof(subjunc_result_t)) * global_context -> config.multi_best_reads)
19 
20 
bigtable_lock(global_context_t * global_context)21 void bigtable_lock(global_context_t * global_context){
22 	subread_lock_occupy(&global_context -> bigtable_lock);
23 }
24 
bigtable_unlock(global_context_t * global_context)25 void bigtable_unlock(global_context_t * global_context){
26 	subread_lock_release(&global_context -> bigtable_lock);
27 }
28 
get_handling_thread_number(global_context_t * global_context,subread_read_number_t pair_number)29 unsigned int get_handling_thread_number(global_context_t * global_context , subread_read_number_t pair_number){
30 
31 	/*
32 	if(global_context -> config.all_threads<2) return 0;
33 
34 	for(ret = 0; ret < global_context -> config.all_threads; ret++){
35 		if(global_context -> input_reads.start_read_number_blocks[ret] > pair_number) return ret - 1;
36 	}
37 	return global_context -> config.all_threads - 1;
38 	*/return 0;
39 }
40 
41 #define get_inner_pair(a,b) (b)
42 
43 bigtable_cached_result_t * bigtable_retrieve_cache(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int is_second_read, int load_more);
44 
bigtable_readonly_result(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int result_number,int is_second_read,mapping_result_t * return_ptr,subjunc_result_t * return_junction_ptr)45 void bigtable_readonly_result(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int result_number, int is_second_read, mapping_result_t * return_ptr, subjunc_result_t * return_junction_ptr){
46 	int rlen = -1;
47 
48 	if(global_context -> bigtable_cache_file_fp){
49 		int loadjunc;
50 		long long inner_pair_number = get_inner_pair(global_context, pair_number);
51 
52 		if(global_context -> bigtable_cache_file_loaded_fragments_begin >=0){
53 			bigtable_write_thread_cache(global_context);
54 			global_context -> bigtable_cache_file_loaded_fragments_begin = -1;
55 		}
56 
57 		for(loadjunc = 0; loadjunc < 2; loadjunc++){
58 			if(loadjunc){ if(!return_junction_ptr) continue; }
59 			else{ if(!return_ptr) continue; }
60 
61 			unsigned long long offset = calc_offset(inner_pair_number);
62 			offset += sizeof(short) * 3 * global_context -> config.big_margin_record_size ;
63 			if(loadjunc) offset += sizeof(mapping_result_t) * global_context -> config.multi_best_reads + sizeof(subjunc_result_t) * result_number;
64 			else	offset += sizeof(mapping_result_t) * result_number;
65 			fseeko(global_context -> bigtable_cache_file_fp , offset , SEEK_SET);
66 
67 			void * write_ptr = return_ptr;
68 			if(loadjunc) write_ptr = return_junction_ptr;
69 
70 			rlen = fread(write_ptr, loadjunc?sizeof(subjunc_result_t):sizeof(mapping_result_t), 1,  global_context -> bigtable_cache_file_fp);
71 			if(rlen <1)
72 				SUBREADprintf("UNABLE TO READ RESULT\n");
73 		}
74 	}else{
75 		bigtable_cached_result_t * rett = bigtable_retrieve_cache(global_context , thread_context , pair_number, is_second_read,0);
76 
77 		int best_offset = result_number;
78 		if(return_ptr)memcpy(return_ptr, rett -> alignment_res + best_offset, sizeof(mapping_result_t));
79 		if(return_junction_ptr)memcpy(return_junction_ptr, rett -> subjunc_res + best_offset, sizeof(subjunc_result_t));
80 	}
81 }
82 
83 
init_bigtable_results(global_context_t * global_context,int is_rewinding)84 int init_bigtable_results(global_context_t * global_context, int is_rewinding)
85 {
86 
87 	if(global_context -> config.use_memory_buffer) {
88 		global_context -> bigtable_chunked_fragments = global_context -> config.reads_per_chunk+1;
89 		global_context -> bigtable_cache_size = global_context -> bigtable_chunked_fragments * (1+global_context -> input_reads.is_paired_end_reads);
90 	} else {
91 		global_context -> bigtable_chunked_fragments = 110llu*1024*1024;
92 		global_context -> bigtable_cache_size = (1+0*global_context -> config.all_threads) * global_context -> bigtable_chunked_fragments * (1+global_context -> input_reads.is_paired_end_reads);
93 	}
94 
95 
96 	//SUBREADprintf("reads_per_chunk = %u ; cached_single_reads = %u ; size of each read = %d + %d\n",  global_context -> config.reads_per_chunk,  global_context -> bigtable_cache_size, sizeof(mapping_result_t) , sizeof(subjunc_result_t));
97 	//
98 	if(is_rewinding){
99 		memset( global_context -> bigtable_cache_malloc_ptr, 0, sizeof(mapping_result_t) *  global_context -> config.multi_best_reads * global_context -> bigtable_cache_size );
100 		if(global_context -> config.do_breakpoint_detection)
101 			memset(global_context -> bigtable_cache_malloc_ptr_junc , 0, sizeof(subjunc_result_t) * global_context -> config.multi_best_reads *  global_context -> bigtable_cache_size);
102 	}else{
103 		global_context -> bigtable_cache = malloc(sizeof(bigtable_cached_result_t) * global_context -> bigtable_cache_size);
104 		global_context -> bigtable_cache_malloc_ptr = calloc( sizeof(mapping_result_t) ,  global_context -> config.multi_best_reads * global_context -> bigtable_cache_size );
105 		if(global_context -> config.do_breakpoint_detection)
106 			global_context -> bigtable_cache_malloc_ptr_junc = calloc(sizeof(subjunc_result_t)  , global_context -> config.multi_best_reads *  global_context -> bigtable_cache_size);
107 		assert(NULL != global_context -> bigtable_cache_malloc_ptr );
108 	}
109 
110 	int xk1;
111 	if(!is_rewinding) for(xk1 = 0; xk1 < global_context -> bigtable_cache_size; xk1++){
112 		global_context -> bigtable_cache [xk1].alignment_res = (mapping_result_t*) global_context -> bigtable_cache_malloc_ptr + global_context -> config.multi_best_reads * xk1;
113 
114 		if(global_context -> config.do_breakpoint_detection)
115 			global_context -> bigtable_cache [xk1].subjunc_res = (subjunc_result_t*)global_context -> bigtable_cache_malloc_ptr_junc + global_context -> config.multi_best_reads * xk1;
116 	}
117 	if(global_context->config.do_big_margin_filtering_for_junctions)for(xk1 = 0; xk1 < global_context -> bigtable_cache_size; xk1++)
118 		memset(global_context -> bigtable_cache [xk1].big_margin_data, 0, sizeof(global_context -> bigtable_cache [xk1].big_margin_data));
119 
120 	subread_init_lock(&global_context -> bigtable_lock);
121 
122 	assert(global_context -> config.use_memory_buffer);
123 	global_context -> bigtable_cache_file_fp = NULL;
124 	return 0;
125 }
126 
_global_retrieve_alignment_ptr(global_context_t * global_context,subread_read_number_t pair_number,int is_second_read,int best_read_id)127 mapping_result_t * _global_retrieve_alignment_ptr(global_context_t * global_context, subread_read_number_t pair_number, int is_second_read, int best_read_id){
128 	mapping_result_t * ret;
129 	bigtable_retrieve_result(global_context, NULL, pair_number, best_read_id, is_second_read, &ret, NULL);
130 	return ret;
131 }
132 
_global_retrieve_subjunc_ptr(global_context_t * global_context,subread_read_number_t pair_number,int is_second_read,int best_read_id)133 subjunc_result_t * _global_retrieve_subjunc_ptr(global_context_t * global_context, subread_read_number_t pair_number, int is_second_read, int best_read_id){
134 	subjunc_result_t * ret;
135 	bigtable_retrieve_result(global_context, NULL, pair_number, best_read_id, is_second_read, NULL, &ret);
136 	return ret;
137 }
138 
139 
140 #define calc_file_location(pair_no) (pair_no)* (1 + global_context -> input_reads.is_paired_end_reads) *  (sizeof(short) * 3 * global_context -> config.big_margin_record_size + (sizeof(mapping_result_t) + global_context -> config.do_breakpoint_detection*sizeof(subjunc_result_t)) * global_context -> config.multi_best_reads)
141 
142 
bigtable_write_thread_cache(global_context_t * global_context)143 void bigtable_write_thread_cache(global_context_t * global_context){
144 	assert(global_context -> bigtable_cache_file_fp == NULL);
145 	return;
146 }
147 
148 
wait_occupied(global_context_t * global_context,unsigned long long old_begin)149 void wait_occupied(global_context_t * global_context , unsigned long long old_begin){
150 	while(old_begin == global_context -> bigtable_cache_file_loaded_fragments_begin){
151 		int i, all_released = 1;
152 		for(i=0; i< global_context -> bigtable_chunked_fragments ; i++)
153 		{
154 			if(global_context -> bigtable_cache[i].status == CACHE_STATUS_OCCUPIED)
155 				all_released = 0;
156 		}
157 		if(all_released) break;
158 	}
159 }
160 
bigtable_retrieve_cache(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int is_second_read,int load_more)161 bigtable_cached_result_t * bigtable_retrieve_cache(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int is_second_read, int load_more)
162 {
163 	long long inner_pair_number = get_inner_pair(global_context, pair_number);
164 	long long load_start_pair_no = inner_pair_number - inner_pair_number % global_context -> bigtable_chunked_fragments;
165 	bigtable_cached_result_t * ret_cache = global_context -> bigtable_cache + (inner_pair_number - load_start_pair_no)* (1+global_context -> input_reads.is_paired_end_reads) + is_second_read;
166 	return ret_cache;
167 }
168 
bigtable_retrieve_result(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int result_number,int is_second_read,mapping_result_t ** return_ptr,subjunc_result_t ** return_junction_ptr)169 int bigtable_retrieve_result(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int result_number, int is_second_read, mapping_result_t ** return_ptr, subjunc_result_t ** return_junction_ptr)
170 {
171 	bigtable_cached_result_t * cache = bigtable_retrieve_cache(global_context, thread_context, pair_number, is_second_read, 0);
172 
173 	int best_offset = result_number;
174 
175 	if(return_ptr)(*return_ptr)= cache -> alignment_res + best_offset;
176 	if(return_junction_ptr)(*return_junction_ptr)= cache -> subjunc_res + best_offset;
177 
178 	return 0;
179 }
180 
181 
_global_retrieve_big_margin_ptr(global_context_t * global_context,subread_read_number_t pair_number,subread_read_number_t is_second_read)182 unsigned short * _global_retrieve_big_margin_ptr(global_context_t * global_context, subread_read_number_t pair_number, subread_read_number_t is_second_read){
183 	bigtable_cached_result_t * cache = bigtable_retrieve_cache(global_context, NULL,  pair_number, is_second_read, 0);
184 	return cache -> big_margin_data;
185 }
186 
187 
bigtable_release_result(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int commit_change)188 void bigtable_release_result(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int commit_change){
189 	assert(NULL == "File result cache is no longer used. Do not call this function.");
190 }
191 
192 
finalise_bigtable_results(global_context_t * global_context)193 int finalise_bigtable_results(global_context_t * global_context){
194 	assert(global_context -> bigtable_cache_file_fp == NULL);
195 	free(global_context -> bigtable_cache_malloc_ptr);
196 	if(global_context -> config.do_breakpoint_detection) free(global_context -> bigtable_cache_malloc_ptr_junc);
197 	free(global_context -> bigtable_cache);
198 	return 0;
199 }
200 
201 
202 
203 
204 
205 
206 
207 
208 
209 
210 
211 
212 
213 
214 
215 
216 
217 
218 
219 
220 
221 
222 
223 
224 
bktable_key_deallocator(void * key)225 void bktable_key_deallocator(void * key){
226 	free(key);
227 }
228 
bktable_bucket_deallocator(void * vbuk)229 void bktable_bucket_deallocator(void * vbuk){
230 	bucketed_table_bucket_t * buk = vbuk;
231 	free(buk -> positions);
232 	free(buk -> details);
233 	free(buk);
234 }
235 
bktable_init(bucketed_table_t * tab,unsigned int maximum_interval_length,unsigned int expected_items)236 void bktable_init(bucketed_table_t * tab, unsigned int maximum_interval_length, unsigned int expected_items){
237 	memset(tab, 0, sizeof(bucketed_table_t));
238 	tab -> expected_items = expected_items;
239 	tab -> maximum_interval_length = maximum_interval_length;
240 	tab -> entry_table = HashTableCreate(expected_items / 3);
241 
242 	HashTableSetDeallocationFunctions(tab->entry_table, bktable_key_deallocator, bktable_bucket_deallocator);
243 	HashTableSetKeyComparisonFunction(tab->entry_table, fc_strcmp_chro);
244 	HashTableSetHashFunction(tab->entry_table, HashTableStringHashFunction);
245 }
246 
bktable_destroy(bucketed_table_t * tab)247 void bktable_destroy(bucketed_table_t * tab){
248 	HashTableDestroy(tab -> entry_table);
249 }
250 
bktable_append(bucketed_table_t * tab,char * chro,unsigned int pos,void * detail)251 void bktable_append(bucketed_table_t * tab, char * chro, unsigned int pos, void * detail){
252 	unsigned int twokeys[2], keyi;
253 	//assert(detail != NULL);
254 	if(detail == NULL) return;
255 	twokeys[0] = pos - pos % tab -> maximum_interval_length;
256 	if(twokeys[0] > tab -> maximum_interval_length)
257 		twokeys[1] = twokeys[0] - tab -> maximum_interval_length;
258 	else	twokeys[1] = 0xffffffff;
259 
260 	for(keyi = 0; keyi < 2 ; keyi++)
261 	{
262 		char static_key [20 + MAX_CHROMOSOME_NAME_LEN];
263 		unsigned int curr_key_pos = twokeys[keyi];
264 		if(curr_key_pos == 0xffffffff) continue;
265 
266 		sprintf(static_key, "%s:%u", chro, curr_key_pos);
267 
268 		bucketed_table_bucket_t * had_items = HashTableGet(tab -> entry_table, static_key);
269 
270 		//SUBREADprintf("INSERT STATIC_KEY [%d] = %s ; FOUND = %p ; ITEMS=%d\n", keyi, static_key, had_items, had_items?had_items->items:-1);
271 
272 		if(NULL == had_items)
273 		{
274 			had_items = malloc(sizeof(bucketed_table_bucket_t));
275 			memset(had_items, 0, sizeof(bucketed_table_bucket_t));
276 
277 			had_items -> capacity = BUCKETED_TABLE_INIT_ITEMS;
278 			had_items -> positions = malloc(sizeof(int) * BUCKETED_TABLE_INIT_ITEMS);
279 			had_items -> details  = malloc(sizeof(void *) *BUCKETED_TABLE_INIT_ITEMS);
280 			had_items -> keyed_bucket = curr_key_pos;
281 			had_items -> maximum_interval_length = tab -> maximum_interval_length;
282 
283 			char * dynamic_key = malloc(strlen(static_key) + 1);
284 			strcpy(dynamic_key, static_key);
285 			HashTablePut(tab->entry_table, dynamic_key, had_items);
286 		}
287 		if(had_items -> capacity <= had_items -> items){
288 			had_items -> capacity = max(had_items -> capacity + 5, had_items -> capacity * 1.3);
289 			had_items -> positions = realloc(had_items -> positions, had_items -> capacity * sizeof(int));
290 			had_items -> details = realloc(had_items -> details, had_items -> capacity * sizeof(void *));
291 		}
292 
293 		had_items -> positions[ had_items -> items ] = pos;
294 		had_items -> details[ had_items -> items ] = detail;
295 		had_items -> items++;
296 	}
297 
298 	tab -> fragments ++;
299 }
300 
301 
bktable_free_ptrs(void * bukey,void * buckv,HashTable * tab)302 void bktable_free_ptrs(void * bukey, void * buckv, HashTable * tab){
303 	int x1;
304 	bucketed_table_bucket_t * buck = buckv;
305 	for(x1 = 0; x1 < buck -> items; x1++)
306 	{
307 		if(buck->positions[x1] - buck->positions[x1] % buck -> maximum_interval_length == buck -> keyed_bucket)
308 		{
309 			//SUBREADprintf("FREE : %u ~ %u\n",  buck->positions[x1] , buck -> keyed_bucket);
310 			free(buck->details[x1]);
311 		}
312 	}
313 }
314 
bktable_lookup(bucketed_table_t * tab,char * chro,unsigned int start_pos,unsigned int interval_length,unsigned int * hit_pos_list,void ** hit_ptr_list,int max_hits)315 int bktable_lookup(bucketed_table_t * tab, char * chro, unsigned int start_pos, unsigned int interval_length, unsigned int * hit_pos_list, void ** hit_ptr_list, int max_hits){
316 	unsigned int my_key_pos;
317 	my_key_pos = start_pos - start_pos % tab -> maximum_interval_length;
318 
319 	char static_key [20 + MAX_CHROMOSOME_NAME_LEN];
320 	sprintf(static_key, "%s:%u", chro, my_key_pos);
321 
322 	bucketed_table_bucket_t * had_items = HashTableGet(tab -> entry_table, static_key);
323 
324 	//if(strcmp(chro, "X")==0 && abs(start_pos - 21067381 - 500) < 10){
325 	//	SUBREADprintf("LOOK STATIC_KEY = %s, BUCK=%p, ITEMS=%d\n", static_key, had_items, had_items?had_items->items:-1);
326 	//}
327 
328 	if(!had_items) // no bucket at all.
329 		return 0;
330 
331 	int item_i, ret = 0;
332 	for(item_i = 0 ; item_i < had_items->items; item_i++){
333 		unsigned int potential_pos = had_items -> positions[item_i];
334 		//if(strcmp(chro, "X")==0 && abs(start_pos - 21067381 - 500) < 10){
335 		//	SUBREADprintf("POTENTIAL_POS=%u,  ACCEPT=%u ~ %u\n", potential_pos, start_pos, start_pos + interval_length);
336 		//}
337 		if(potential_pos >= start_pos && potential_pos < start_pos + interval_length)
338 		{
339 			hit_pos_list[ret] = potential_pos;
340 			hit_ptr_list[ret] = had_items -> details[item_i];
341 			ret ++;
342 			if(ret >= max_hits) break;
343 		}
344 	}
345 
346 	return ret;
347 }
348 
349 
fraglist_init(fragment_list_t * list)350 void fraglist_init(fragment_list_t * list){
351 	memset(list, 0, sizeof(fragment_list_t));
352 	list -> capacity = FRAGMENT_LIST_INIT_ITEMS;
353 	list -> fragment_numbers = malloc(sizeof(subread_read_number_t) * list -> capacity);
354 }
355 
fraglist_destroy(fragment_list_t * list)356 void fraglist_destroy(fragment_list_t * list){
357 	free(list -> fragment_numbers);
358 }
359 
fraglist_append(fragment_list_t * list,subread_read_number_t fragment_number)360 void fraglist_append(fragment_list_t * list, subread_read_number_t fragment_number){
361 	if(list -> fragments >= list -> capacity){
362 		//#warning "============== COMMENT DEBUG INFO ====================="
363 		//SUBREADprintf("REALLOC_PRT_IN = %d , %p\n",list -> capacity , list -> fragment_numbers );
364 		list -> capacity = max(list -> capacity + 5, list -> capacity * 1.3);
365 		list -> fragment_numbers = realloc(list -> fragment_numbers, sizeof(subread_read_number_t) * list -> capacity);
366 	//	SUBREADprintf("REALLOC_PRT_OUT = %d , %p\n",list -> capacity , list -> fragment_numbers );
367 	}
368 
369 	list -> fragment_numbers[ list -> fragments ++ ] = fragment_number;
370 }
371 
372