1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <assert.h>
5 #include <unistd.h>
6 #ifndef __MINGW32__
7 #include <sys/mman.h>
8 #endif
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <fcntl.h>
12 #include "core.h"
13 #include "core-bigtable.h"
14 #include "gene-algorithms.h"
15
16 #define TABLE_PRELOAD_SIZE 24
17
18 #define calc_offset(pair_NO) ((pair_NO) * (1 + global_context -> input_reads.is_paired_end_reads) + is_second_read) * (sizeof(short) * 3 * global_context -> config.big_margin_record_size + (sizeof(mapping_result_t) + global_context -> config.do_breakpoint_detection*sizeof(subjunc_result_t)) * global_context -> config.multi_best_reads)
19
20
bigtable_lock(global_context_t * global_context)21 void bigtable_lock(global_context_t * global_context){
22 subread_lock_occupy(&global_context -> bigtable_lock);
23 }
24
bigtable_unlock(global_context_t * global_context)25 void bigtable_unlock(global_context_t * global_context){
26 subread_lock_release(&global_context -> bigtable_lock);
27 }
28
get_handling_thread_number(global_context_t * global_context,subread_read_number_t pair_number)29 unsigned int get_handling_thread_number(global_context_t * global_context , subread_read_number_t pair_number){
30
31 /*
32 if(global_context -> config.all_threads<2) return 0;
33
34 for(ret = 0; ret < global_context -> config.all_threads; ret++){
35 if(global_context -> input_reads.start_read_number_blocks[ret] > pair_number) return ret - 1;
36 }
37 return global_context -> config.all_threads - 1;
38 */return 0;
39 }
40
41 #define get_inner_pair(a,b) (b)
42
43 bigtable_cached_result_t * bigtable_retrieve_cache(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int is_second_read, int load_more);
44
bigtable_readonly_result(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int result_number,int is_second_read,mapping_result_t * return_ptr,subjunc_result_t * return_junction_ptr)45 void bigtable_readonly_result(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int result_number, int is_second_read, mapping_result_t * return_ptr, subjunc_result_t * return_junction_ptr){
46 int rlen = -1;
47
48 if(global_context -> bigtable_cache_file_fp){
49 int loadjunc;
50 long long inner_pair_number = get_inner_pair(global_context, pair_number);
51
52 if(global_context -> bigtable_cache_file_loaded_fragments_begin >=0){
53 bigtable_write_thread_cache(global_context);
54 global_context -> bigtable_cache_file_loaded_fragments_begin = -1;
55 }
56
57 for(loadjunc = 0; loadjunc < 2; loadjunc++){
58 if(loadjunc){ if(!return_junction_ptr) continue; }
59 else{ if(!return_ptr) continue; }
60
61 unsigned long long offset = calc_offset(inner_pair_number);
62 offset += sizeof(short) * 3 * global_context -> config.big_margin_record_size ;
63 if(loadjunc) offset += sizeof(mapping_result_t) * global_context -> config.multi_best_reads + sizeof(subjunc_result_t) * result_number;
64 else offset += sizeof(mapping_result_t) * result_number;
65 fseeko(global_context -> bigtable_cache_file_fp , offset , SEEK_SET);
66
67 void * write_ptr = return_ptr;
68 if(loadjunc) write_ptr = return_junction_ptr;
69
70 rlen = fread(write_ptr, loadjunc?sizeof(subjunc_result_t):sizeof(mapping_result_t), 1, global_context -> bigtable_cache_file_fp);
71 if(rlen <1)
72 SUBREADprintf("UNABLE TO READ RESULT\n");
73 }
74 }else{
75 bigtable_cached_result_t * rett = bigtable_retrieve_cache(global_context , thread_context , pair_number, is_second_read,0);
76
77 int best_offset = result_number;
78 if(return_ptr)memcpy(return_ptr, rett -> alignment_res + best_offset, sizeof(mapping_result_t));
79 if(return_junction_ptr)memcpy(return_junction_ptr, rett -> subjunc_res + best_offset, sizeof(subjunc_result_t));
80 }
81 }
82
83
init_bigtable_results(global_context_t * global_context,int is_rewinding)84 int init_bigtable_results(global_context_t * global_context, int is_rewinding)
85 {
86
87 if(global_context -> config.use_memory_buffer) {
88 global_context -> bigtable_chunked_fragments = global_context -> config.reads_per_chunk+1;
89 global_context -> bigtable_cache_size = global_context -> bigtable_chunked_fragments * (1+global_context -> input_reads.is_paired_end_reads);
90 } else {
91 global_context -> bigtable_chunked_fragments = 110llu*1024*1024;
92 global_context -> bigtable_cache_size = (1+0*global_context -> config.all_threads) * global_context -> bigtable_chunked_fragments * (1+global_context -> input_reads.is_paired_end_reads);
93 }
94
95
96 //SUBREADprintf("reads_per_chunk = %u ; cached_single_reads = %u ; size of each read = %d + %d\n", global_context -> config.reads_per_chunk, global_context -> bigtable_cache_size, sizeof(mapping_result_t) , sizeof(subjunc_result_t));
97 //
98 if(is_rewinding){
99 memset( global_context -> bigtable_cache_malloc_ptr, 0, sizeof(mapping_result_t) * global_context -> config.multi_best_reads * global_context -> bigtable_cache_size );
100 if(global_context -> config.do_breakpoint_detection)
101 memset(global_context -> bigtable_cache_malloc_ptr_junc , 0, sizeof(subjunc_result_t) * global_context -> config.multi_best_reads * global_context -> bigtable_cache_size);
102 }else{
103 global_context -> bigtable_cache = malloc(sizeof(bigtable_cached_result_t) * global_context -> bigtable_cache_size);
104 global_context -> bigtable_cache_malloc_ptr = calloc( sizeof(mapping_result_t) , global_context -> config.multi_best_reads * global_context -> bigtable_cache_size );
105 if(global_context -> config.do_breakpoint_detection)
106 global_context -> bigtable_cache_malloc_ptr_junc = calloc(sizeof(subjunc_result_t) , global_context -> config.multi_best_reads * global_context -> bigtable_cache_size);
107 assert(NULL != global_context -> bigtable_cache_malloc_ptr );
108 }
109
110 int xk1;
111 if(!is_rewinding) for(xk1 = 0; xk1 < global_context -> bigtable_cache_size; xk1++){
112 global_context -> bigtable_cache [xk1].alignment_res = (mapping_result_t*) global_context -> bigtable_cache_malloc_ptr + global_context -> config.multi_best_reads * xk1;
113
114 if(global_context -> config.do_breakpoint_detection)
115 global_context -> bigtable_cache [xk1].subjunc_res = (subjunc_result_t*)global_context -> bigtable_cache_malloc_ptr_junc + global_context -> config.multi_best_reads * xk1;
116 }
117 if(global_context->config.do_big_margin_filtering_for_junctions)for(xk1 = 0; xk1 < global_context -> bigtable_cache_size; xk1++)
118 memset(global_context -> bigtable_cache [xk1].big_margin_data, 0, sizeof(global_context -> bigtable_cache [xk1].big_margin_data));
119
120 subread_init_lock(&global_context -> bigtable_lock);
121
122 assert(global_context -> config.use_memory_buffer);
123 global_context -> bigtable_cache_file_fp = NULL;
124 return 0;
125 }
126
_global_retrieve_alignment_ptr(global_context_t * global_context,subread_read_number_t pair_number,int is_second_read,int best_read_id)127 mapping_result_t * _global_retrieve_alignment_ptr(global_context_t * global_context, subread_read_number_t pair_number, int is_second_read, int best_read_id){
128 mapping_result_t * ret;
129 bigtable_retrieve_result(global_context, NULL, pair_number, best_read_id, is_second_read, &ret, NULL);
130 return ret;
131 }
132
_global_retrieve_subjunc_ptr(global_context_t * global_context,subread_read_number_t pair_number,int is_second_read,int best_read_id)133 subjunc_result_t * _global_retrieve_subjunc_ptr(global_context_t * global_context, subread_read_number_t pair_number, int is_second_read, int best_read_id){
134 subjunc_result_t * ret;
135 bigtable_retrieve_result(global_context, NULL, pair_number, best_read_id, is_second_read, NULL, &ret);
136 return ret;
137 }
138
139
140 #define calc_file_location(pair_no) (pair_no)* (1 + global_context -> input_reads.is_paired_end_reads) * (sizeof(short) * 3 * global_context -> config.big_margin_record_size + (sizeof(mapping_result_t) + global_context -> config.do_breakpoint_detection*sizeof(subjunc_result_t)) * global_context -> config.multi_best_reads)
141
142
bigtable_write_thread_cache(global_context_t * global_context)143 void bigtable_write_thread_cache(global_context_t * global_context){
144 assert(global_context -> bigtable_cache_file_fp == NULL);
145 return;
146 }
147
148
wait_occupied(global_context_t * global_context,unsigned long long old_begin)149 void wait_occupied(global_context_t * global_context , unsigned long long old_begin){
150 while(old_begin == global_context -> bigtable_cache_file_loaded_fragments_begin){
151 int i, all_released = 1;
152 for(i=0; i< global_context -> bigtable_chunked_fragments ; i++)
153 {
154 if(global_context -> bigtable_cache[i].status == CACHE_STATUS_OCCUPIED)
155 all_released = 0;
156 }
157 if(all_released) break;
158 }
159 }
160
bigtable_retrieve_cache(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int is_second_read,int load_more)161 bigtable_cached_result_t * bigtable_retrieve_cache(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int is_second_read, int load_more)
162 {
163 long long inner_pair_number = get_inner_pair(global_context, pair_number);
164 long long load_start_pair_no = inner_pair_number - inner_pair_number % global_context -> bigtable_chunked_fragments;
165 bigtable_cached_result_t * ret_cache = global_context -> bigtable_cache + (inner_pair_number - load_start_pair_no)* (1+global_context -> input_reads.is_paired_end_reads) + is_second_read;
166 return ret_cache;
167 }
168
bigtable_retrieve_result(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int result_number,int is_second_read,mapping_result_t ** return_ptr,subjunc_result_t ** return_junction_ptr)169 int bigtable_retrieve_result(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int result_number, int is_second_read, mapping_result_t ** return_ptr, subjunc_result_t ** return_junction_ptr)
170 {
171 bigtable_cached_result_t * cache = bigtable_retrieve_cache(global_context, thread_context, pair_number, is_second_read, 0);
172
173 int best_offset = result_number;
174
175 if(return_ptr)(*return_ptr)= cache -> alignment_res + best_offset;
176 if(return_junction_ptr)(*return_junction_ptr)= cache -> subjunc_res + best_offset;
177
178 return 0;
179 }
180
181
_global_retrieve_big_margin_ptr(global_context_t * global_context,subread_read_number_t pair_number,subread_read_number_t is_second_read)182 unsigned short * _global_retrieve_big_margin_ptr(global_context_t * global_context, subread_read_number_t pair_number, subread_read_number_t is_second_read){
183 bigtable_cached_result_t * cache = bigtable_retrieve_cache(global_context, NULL, pair_number, is_second_read, 0);
184 return cache -> big_margin_data;
185 }
186
187
bigtable_release_result(global_context_t * global_context,thread_context_t * thread_context,subread_read_number_t pair_number,int commit_change)188 void bigtable_release_result(global_context_t * global_context , thread_context_t * thread_context , subread_read_number_t pair_number, int commit_change){
189 assert(NULL == "File result cache is no longer used. Do not call this function.");
190 }
191
192
finalise_bigtable_results(global_context_t * global_context)193 int finalise_bigtable_results(global_context_t * global_context){
194 assert(global_context -> bigtable_cache_file_fp == NULL);
195 free(global_context -> bigtable_cache_malloc_ptr);
196 if(global_context -> config.do_breakpoint_detection) free(global_context -> bigtable_cache_malloc_ptr_junc);
197 free(global_context -> bigtable_cache);
198 return 0;
199 }
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
bktable_key_deallocator(void * key)225 void bktable_key_deallocator(void * key){
226 free(key);
227 }
228
bktable_bucket_deallocator(void * vbuk)229 void bktable_bucket_deallocator(void * vbuk){
230 bucketed_table_bucket_t * buk = vbuk;
231 free(buk -> positions);
232 free(buk -> details);
233 free(buk);
234 }
235
bktable_init(bucketed_table_t * tab,unsigned int maximum_interval_length,unsigned int expected_items)236 void bktable_init(bucketed_table_t * tab, unsigned int maximum_interval_length, unsigned int expected_items){
237 memset(tab, 0, sizeof(bucketed_table_t));
238 tab -> expected_items = expected_items;
239 tab -> maximum_interval_length = maximum_interval_length;
240 tab -> entry_table = HashTableCreate(expected_items / 3);
241
242 HashTableSetDeallocationFunctions(tab->entry_table, bktable_key_deallocator, bktable_bucket_deallocator);
243 HashTableSetKeyComparisonFunction(tab->entry_table, fc_strcmp_chro);
244 HashTableSetHashFunction(tab->entry_table, HashTableStringHashFunction);
245 }
246
bktable_destroy(bucketed_table_t * tab)247 void bktable_destroy(bucketed_table_t * tab){
248 HashTableDestroy(tab -> entry_table);
249 }
250
bktable_append(bucketed_table_t * tab,char * chro,unsigned int pos,void * detail)251 void bktable_append(bucketed_table_t * tab, char * chro, unsigned int pos, void * detail){
252 unsigned int twokeys[2], keyi;
253 //assert(detail != NULL);
254 if(detail == NULL) return;
255 twokeys[0] = pos - pos % tab -> maximum_interval_length;
256 if(twokeys[0] > tab -> maximum_interval_length)
257 twokeys[1] = twokeys[0] - tab -> maximum_interval_length;
258 else twokeys[1] = 0xffffffff;
259
260 for(keyi = 0; keyi < 2 ; keyi++)
261 {
262 char static_key [20 + MAX_CHROMOSOME_NAME_LEN];
263 unsigned int curr_key_pos = twokeys[keyi];
264 if(curr_key_pos == 0xffffffff) continue;
265
266 sprintf(static_key, "%s:%u", chro, curr_key_pos);
267
268 bucketed_table_bucket_t * had_items = HashTableGet(tab -> entry_table, static_key);
269
270 //SUBREADprintf("INSERT STATIC_KEY [%d] = %s ; FOUND = %p ; ITEMS=%d\n", keyi, static_key, had_items, had_items?had_items->items:-1);
271
272 if(NULL == had_items)
273 {
274 had_items = malloc(sizeof(bucketed_table_bucket_t));
275 memset(had_items, 0, sizeof(bucketed_table_bucket_t));
276
277 had_items -> capacity = BUCKETED_TABLE_INIT_ITEMS;
278 had_items -> positions = malloc(sizeof(int) * BUCKETED_TABLE_INIT_ITEMS);
279 had_items -> details = malloc(sizeof(void *) *BUCKETED_TABLE_INIT_ITEMS);
280 had_items -> keyed_bucket = curr_key_pos;
281 had_items -> maximum_interval_length = tab -> maximum_interval_length;
282
283 char * dynamic_key = malloc(strlen(static_key) + 1);
284 strcpy(dynamic_key, static_key);
285 HashTablePut(tab->entry_table, dynamic_key, had_items);
286 }
287 if(had_items -> capacity <= had_items -> items){
288 had_items -> capacity = max(had_items -> capacity + 5, had_items -> capacity * 1.3);
289 had_items -> positions = realloc(had_items -> positions, had_items -> capacity * sizeof(int));
290 had_items -> details = realloc(had_items -> details, had_items -> capacity * sizeof(void *));
291 }
292
293 had_items -> positions[ had_items -> items ] = pos;
294 had_items -> details[ had_items -> items ] = detail;
295 had_items -> items++;
296 }
297
298 tab -> fragments ++;
299 }
300
301
bktable_free_ptrs(void * bukey,void * buckv,HashTable * tab)302 void bktable_free_ptrs(void * bukey, void * buckv, HashTable * tab){
303 int x1;
304 bucketed_table_bucket_t * buck = buckv;
305 for(x1 = 0; x1 < buck -> items; x1++)
306 {
307 if(buck->positions[x1] - buck->positions[x1] % buck -> maximum_interval_length == buck -> keyed_bucket)
308 {
309 //SUBREADprintf("FREE : %u ~ %u\n", buck->positions[x1] , buck -> keyed_bucket);
310 free(buck->details[x1]);
311 }
312 }
313 }
314
bktable_lookup(bucketed_table_t * tab,char * chro,unsigned int start_pos,unsigned int interval_length,unsigned int * hit_pos_list,void ** hit_ptr_list,int max_hits)315 int bktable_lookup(bucketed_table_t * tab, char * chro, unsigned int start_pos, unsigned int interval_length, unsigned int * hit_pos_list, void ** hit_ptr_list, int max_hits){
316 unsigned int my_key_pos;
317 my_key_pos = start_pos - start_pos % tab -> maximum_interval_length;
318
319 char static_key [20 + MAX_CHROMOSOME_NAME_LEN];
320 sprintf(static_key, "%s:%u", chro, my_key_pos);
321
322 bucketed_table_bucket_t * had_items = HashTableGet(tab -> entry_table, static_key);
323
324 //if(strcmp(chro, "X")==0 && abs(start_pos - 21067381 - 500) < 10){
325 // SUBREADprintf("LOOK STATIC_KEY = %s, BUCK=%p, ITEMS=%d\n", static_key, had_items, had_items?had_items->items:-1);
326 //}
327
328 if(!had_items) // no bucket at all.
329 return 0;
330
331 int item_i, ret = 0;
332 for(item_i = 0 ; item_i < had_items->items; item_i++){
333 unsigned int potential_pos = had_items -> positions[item_i];
334 //if(strcmp(chro, "X")==0 && abs(start_pos - 21067381 - 500) < 10){
335 // SUBREADprintf("POTENTIAL_POS=%u, ACCEPT=%u ~ %u\n", potential_pos, start_pos, start_pos + interval_length);
336 //}
337 if(potential_pos >= start_pos && potential_pos < start_pos + interval_length)
338 {
339 hit_pos_list[ret] = potential_pos;
340 hit_ptr_list[ret] = had_items -> details[item_i];
341 ret ++;
342 if(ret >= max_hits) break;
343 }
344 }
345
346 return ret;
347 }
348
349
fraglist_init(fragment_list_t * list)350 void fraglist_init(fragment_list_t * list){
351 memset(list, 0, sizeof(fragment_list_t));
352 list -> capacity = FRAGMENT_LIST_INIT_ITEMS;
353 list -> fragment_numbers = malloc(sizeof(subread_read_number_t) * list -> capacity);
354 }
355
fraglist_destroy(fragment_list_t * list)356 void fraglist_destroy(fragment_list_t * list){
357 free(list -> fragment_numbers);
358 }
359
fraglist_append(fragment_list_t * list,subread_read_number_t fragment_number)360 void fraglist_append(fragment_list_t * list, subread_read_number_t fragment_number){
361 if(list -> fragments >= list -> capacity){
362 //#warning "============== COMMENT DEBUG INFO ====================="
363 //SUBREADprintf("REALLOC_PRT_IN = %d , %p\n",list -> capacity , list -> fragment_numbers );
364 list -> capacity = max(list -> capacity + 5, list -> capacity * 1.3);
365 list -> fragment_numbers = realloc(list -> fragment_numbers, sizeof(subread_read_number_t) * list -> capacity);
366 // SUBREADprintf("REALLOC_PRT_OUT = %d , %p\n",list -> capacity , list -> fragment_numbers );
367 }
368
369 list -> fragment_numbers[ list -> fragments ++ ] = fragment_number;
370 }
371
372