1 /*===========================================================================
2 *
3 * PUBLIC DOMAIN NOTICE
4 * National Center for Biotechnology Information
5 *
6 * This software/database is a "United States Government Work" under the
7 * terms of the United States Copyright Act. It was written as part of
8 * the author's official duties as a United States Government employee and
9 * thus cannot be copyrighted. This software/database is freely available
10 * to the public for use. The National Library of Medicine and the U.S.
11 * Government have not placed any restriction on its use or reproduction.
12 *
13 * Although all reasonable efforts have been taken to ensure the accuracy
14 * and reliability of the software and data, the NLM and the U.S.
15 * Government do not and cannot warrant the performance or results that
16 * may be obtained by using this software or data. The NLM and the U.S.
17 * Government disclaim all warranties, express or implied, including
18 * warranties of performance, merchantability or fitness for any particular
19 * purpose.
20 *
21 * Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26 #include "merge_sorter.h"
27 #include "lookup_reader.h"
28 #include "lookup_writer.h"
29 #include "index.h"
30 #include "helper.h"
31
32 #include <klib/out.h>
33 #include <klib/text.h>
34 #include <klib/status.h>
35 #include <klib/printf.h>
36 #include <klib/progressbar.h>
37 #include <klib/time.h>
38
39 #include <kproc/thread.h>
40 #include <kproc/queue.h>
41 #include <kproc/timeout.h>
42
43 typedef struct merge_src
44 {
45 struct lookup_reader * reader;
46 uint64_t key;
47 SBuffer packed_bases;
48 rc_t rc;
49 } merge_src;
50
51
get_min_merge_src(merge_src * src,uint32_t count)52 static merge_src * get_min_merge_src( merge_src * src, uint32_t count )
53 {
54 merge_src * res = NULL;
55 uint32_t i;
56 for ( i = 0; i < count; ++i )
57 {
58 merge_src * item = &src[ i ];
59 if ( 0 == item -> rc )
60 {
61 if ( NULL == res )
62 {
63 /* pick the first one */
64 res = item;
65 }
66 else if ( item -> key < res -> key )
67 {
68 /* if this src has a smaller key... */
69 res = item;
70 }
71 }
72 }
73 return res;
74 }
75
76 /* ================================================================================= */
77
78 typedef struct merge_sorter
79 {
80
81 struct lookup_writer * dst; /* lookup_writer.h */
82 struct index_writer * idx; /* index.h */
83 merge_src * src; /* vector of input-files to be merged */
84 struct bg_update * gap; /* indicator of running merge */
85 uint64_t total_size, total_entries;
86 uint32_t num_src;
87 } merge_sorter;
88
89
init_merge_sorter(merge_sorter * self,KDirectory * dir,const char * output,const char * index,VNamelist * files,size_t buf_size,uint32_t num_src,struct bg_update * gap)90 static rc_t init_merge_sorter( merge_sorter * self,
91 KDirectory * dir,
92 const char * output,
93 const char * index,
94 VNamelist * files,
95 size_t buf_size,
96 uint32_t num_src,
97 struct bg_update * gap )
98 {
99 rc_t rc = 0;
100 uint32_t i;
101
102 if ( NULL != index )
103 {
104 rc = make_index_writer( dir, &( self -> idx ), buf_size,
105 DFLT_INDEX_FREQUENCY, "%s", index ); /* index.h */
106 }
107 else
108 {
109 self -> idx = NULL;
110 }
111
112 self -> total_size = 0;
113 self -> total_entries = 0;
114 self -> num_src = num_src;
115 self -> gap = gap;
116
117 if ( 0 == rc )
118 {
119 rc = make_lookup_writer( dir, self -> idx, &( self -> dst ), buf_size, "%s", output ); /* lookup_writer.h */
120 }
121
122 if ( 0 == rc )
123 {
124 self -> src = calloc( self -> num_src, sizeof * self-> src );
125 if ( NULL == self -> src )
126 {
127 rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
128 ErrMsg( "init_merge_sorter2.calloc( %d ) failed", ( ( sizeof * self -> src ) * self -> num_src ) );
129 }
130 }
131
132 for ( i = 0; 0 == rc && i < self -> num_src; ++i )
133 {
134 const char * filename;
135 rc = VNameListGet ( files, i, &filename );
136 if ( 0 == rc )
137 {
138 merge_src * s = &self -> src[ i ];
139 if ( 0 == rc )
140 {
141 rc = make_lookup_reader( dir, NULL, &s -> reader, buf_size, "%s", filename ); /* lookup_reader.h */
142 }
143 if ( 0 == rc )
144 {
145 rc = make_SBuffer( &s -> packed_bases, 4096 );
146 if ( 0 == rc )
147 {
148 s -> rc = lookup_reader_get( s -> reader, &s -> key, &s -> packed_bases ); /* lookup_reader.h */
149 }
150 }
151 }
152 }
153 return rc;
154 }
155
release_merge_sorter(merge_sorter * self)156 static void release_merge_sorter( merge_sorter * self )
157 {
158 release_lookup_writer( self -> dst );
159 release_index_writer( self -> idx );
160 if ( NULL != self -> src )
161 {
162 uint32_t i;
163 for ( i = 0; i < self -> num_src; ++i )
164 {
165 merge_src * s = &self -> src[ i ];
166 release_lookup_reader( s -> reader );
167 release_SBuffer( &s -> packed_bases );
168 }
169 free( ( void * ) self -> src );
170 }
171 }
172
run_merge_sorter(merge_sorter * self)173 static rc_t run_merge_sorter( merge_sorter * self )
174 {
175 rc_t rc = 0;
176 uint64_t last_key = 0;
177 uint64_t loop_nr = 0;
178 merge_src * to_write = get_min_merge_src( self -> src, self -> num_src ); /* above */
179
180 while( 0 == rc && NULL != to_write )
181 {
182 rc = get_quitting(); /* helper.c */
183 if ( 0 == rc )
184 {
185 if ( last_key > to_write -> key )
186 {
187 rc = RC( rcVDB, rcNoTarg, rcWriting, rcFormat, rcInvalid );
188 ErrMsg( "run_merge_sorter() %lu -> %lu in loop #%lu", last_key, to_write -> key, loop_nr );
189 }
190 else
191 {
192 loop_nr ++;
193 last_key = to_write -> key;
194 rc = write_packed_to_lookup_writer( self -> dst,
195 to_write -> key,
196 &to_write -> packed_bases . S ); /* lookup_writer.h */
197
198 if ( 0 == rc )
199 {
200 to_write -> rc = lookup_reader_get( to_write -> reader,
201 &to_write -> key,
202 &to_write -> packed_bases ); /* lookup_reader.h */
203 }
204 to_write = get_min_merge_src( self -> src, self -> num_src ); /* above */
205 }
206 if ( 0 != rc )
207 {
208 set_quitting(); /* helper.c */
209 }
210 bg_update_update( self -> gap, 1 ); /* signal to gap-update */
211 }
212 }
213 self -> total_entries += loop_nr;
214 return rc;
215 }
216
217
218 /* =================================================================================
219 The background-merger is composed from 1 background-thread, which is the consumer
220 of a job_q. The producer-pool in sorter.c puts KVector-instances into the queue.
221 The background-merger pops the jobs out of the queue until it has assembled
222 a batch of jobs. It then processes this batch by merge-sorting the content of
223 the KVector's into a temporary file. The entries are key-value pairs with a 64-bit
224 key which is composed from the SEQID and one bit: first or second read in a spot.
225 The value is the packed READ ( pack_4na() in helper.c ).
226 The background-merger terminates when it's input-queue is sealed in perform_fastdump()
227 in fastdump.c after all sorter-threads ( producers ) have been joined.
228 The final output of the background-merger is a list of temporary files produced
229 in the temp-directory.
230 ================================================================================= */
231
232 typedef struct background_vector_merger
233 {
234 KDirectory * dir; /* needed to perform the merge-sort */
235 const struct temp_dir * temp_dir; /* needed to create temp. files */
236 KQueue * job_q; /* the KVector objects arrive here from the lookup-producer */
237 KThread * thread; /* the thread that performs the merge-sort */
238 struct background_file_merger * file_merger; /* below */
239 struct KFastDumpCleanupTask * cleanup_task; /* add the produced temp_files here too */
240 uint32_t product_id; /* increased by one for each batch-run, used in temp-file-name */
241 uint32_t batch_size; /* how many KVectors have to arrive to run a batch */
242 uint32_t q_wait_time; /* timeout in milliseconds to get something out of in_q */
243 size_t buf_size; /* needed to perform the merge-sort */
244 struct bg_update * gap; /* visualize the gap after the producer finished */
245 uint64_t total; /* how many entries have been merged... */
246 uint64_t total_rowcount_prod; /* updated by the producer, informs the vector-merger about the
247 rowcount to be processed */
248 } background_vector_merger;
249
250
wait_for_background_vector_merger(background_vector_merger * self)251 static rc_t wait_for_background_vector_merger( background_vector_merger * self )
252 {
253 rc_t rc_status;
254 rc_t rc = KThreadWait ( self -> thread, &rc_status );
255 if ( 0 == rc )
256 {
257 rc = rc_status;
258 }
259 return rc;
260 }
261
release_background_vector_merger(background_vector_merger * self)262 static void release_background_vector_merger( background_vector_merger * self )
263 {
264 if ( NULL != self -> job_q )
265 {
266 KQueueRelease ( self -> job_q );
267 }
268 free( self );
269 }
270
wait_for_and_release_background_vector_merger(background_vector_merger * self)271 rc_t wait_for_and_release_background_vector_merger( background_vector_merger * self )
272 {
273 rc_t rc = 0;
274 if ( NULL == self )
275 {
276 rc = RC( rcVDB, rcNoTarg, rcReleasing, rcSelf, rcNull );
277 }
278 else
279 {
280 /* while we are waiting on the background-vector-merger to finish,
281 show a progress-indicator... */
282 rc = wait_for_background_vector_merger( self );
283
284 /* now we can signal to the file-merger that nothing will be pushed into the
285 queue any more... */
286
287 if ( 0 == rc )
288 {
289 rc = seal_background_file_merger( self -> file_merger );
290 }
291
292 if ( 0 == rc && ( self -> total != self -> total_rowcount_prod ) )
293 {
294 rc = RC( rcVDB, rcNoTarg, rcConstructing, rcSize, rcInvalid );
295 ErrMsg( "merge_sorter.c wait_for_and_release_background_vector_merger() : processed lookup rows: %lu of %lu",
296 self -> total, self -> total_rowcount_prod );
297 }
298 release_background_vector_merger( self );
299 }
300
301 if ( 0 != rc )
302 {
303 ErrMsg( "merge_sorter.c wait_for_and_release_background_vector_merger()", rc );
304 }
305 return rc;
306 }
307
308 typedef struct bg_vec_merge_src
309 {
310 KVector * store;
311 uint64_t key;
312 const String * bases;
313 rc_t rc;
314 } bg_vec_merge_src;
315
316
init_bg_vec_merge_src(bg_vec_merge_src * src,KVector * store)317 static rc_t init_bg_vec_merge_src( bg_vec_merge_src * src, KVector * store )
318 {
319 src -> store = store;
320 src -> rc = KVectorGetFirstPtr ( src -> store, &( src -> key ), ( void ** )&( src -> bases ) );
321 return src -> rc;
322 }
323
release_bg_vec_merge_src(bg_vec_merge_src * src)324 static void release_bg_vec_merge_src( bg_vec_merge_src * src )
325 {
326 if ( NULL != src -> store )
327 {
328 KVectorRelease( src -> store );
329 }
330 }
331
get_min_bg_vec_merge_src(bg_vec_merge_src * batch,uint32_t count)332 static bg_vec_merge_src * get_min_bg_vec_merge_src( bg_vec_merge_src * batch, uint32_t count )
333 {
334 bg_vec_merge_src * res = NULL;
335 uint32_t i;
336 for ( i = 0; i < count; ++i )
337 {
338 bg_vec_merge_src * item = &batch[ i ];
339 if ( 0 == item -> rc )
340 {
341 if ( NULL == res )
342 {
343 res = item;
344 }
345 else if ( item -> key < res -> key )
346 {
347 res = item;
348 }
349 }
350 }
351 return res;
352 }
353
write_bg_vec_merge_src(bg_vec_merge_src * src,struct lookup_writer * writer)354 static rc_t write_bg_vec_merge_src( bg_vec_merge_src * src, struct lookup_writer * writer )
355 {
356 rc_t rc = src -> rc;
357 if ( 0 == rc )
358 {
359 rc = write_packed_to_lookup_writer( writer, src -> key, src -> bases ); /* lookup_writer.c */
360 StringWhack ( src -> bases );
361 src -> bases = NULL;
362 }
363 if ( 0 == rc )
364 {
365 uint64_t next_key;
366 src -> rc = KVectorGetNextPtr ( src -> store, &next_key, src -> key, ( void ** )&( src -> bases ) );
367 src -> key = next_key;
368 }
369 return rc;
370 }
371
background_vector_merger_collect_batch(background_vector_merger * self,bg_vec_merge_src ** batch,uint32_t * count)372 static rc_t background_vector_merger_collect_batch( background_vector_merger * self,
373 bg_vec_merge_src ** batch,
374 uint32_t * count )
375 {
376 rc_t rc = 0;
377 bg_vec_merge_src * b = calloc( self -> batch_size, sizeof * b );
378 *batch = NULL;
379 *count = 0;
380
381 if ( NULL == b )
382 {
383 rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
384 }
385 else
386 {
387 bool sealed = false;
388 while ( 0 == rc && *count < self -> batch_size && !sealed )
389 {
390 struct timeout_t tm;
391 rc = TimeoutInit ( &tm, self -> q_wait_time );
392 if ( 0 == rc )
393 {
394 KVector * store = NULL;
395 rc = KQueuePop ( self -> job_q, ( void ** )&store, &tm );
396 if ( 0 == rc )
397 {
398 /* we pulled out a store from the Q */
399 STATUS ( STAT_USR, "KQueuePop() : store = %p", store );
400 rc = init_bg_vec_merge_src( &( b[ *count ] ), store );
401 if ( 0 == rc )
402 {
403 *count += 1;
404 }
405 }
406 else
407 {
408 STATUS ( STAT_USR, "KQueuePop() : %R, store = %p", rc, store );
409 if ( rcDone == GetRCState( rc ) && ( enum RCObject )rcData == GetRCObject( rc ) )
410 {
411 /* the other side has sealed the Q */
412 sealed = true;
413 rc = 0;
414 }
415 else if ( rcExhausted == GetRCState( rc ) && ( enum RCObject )rcTimeout == GetRCObject( rc ) )
416 {
417 /* we had a timeout while trying to get a store from the Q */
418 rc = 0;
419 }
420 }
421 }
422 }
423 }
424 if ( 0 != *count )
425 {
426 *batch = b;
427 rc = 0;
428 }
429 else
430 {
431 free( b );
432 }
433 return rc;
434 }
435
batch_valid(bg_vec_merge_src * batch,uint32_t count)436 static bool batch_valid( bg_vec_merge_src * batch, uint32_t count )
437 {
438 bool res = false;
439 uint32_t i;
440 for ( i = 0; i < count && !res; ++i )
441 {
442 if ( 0 == batch[ i ] . rc )
443 {
444 res = true;
445 }
446 }
447 return res;
448 }
449
background_vector_merger_process_batch(background_vector_merger * self,bg_vec_merge_src * batch,uint32_t count)450 static rc_t background_vector_merger_process_batch( background_vector_merger * self,
451 bg_vec_merge_src * batch,
452 uint32_t count )
453 {
454 char buffer[ 4096 ];
455 rc_t rc = generate_bg_sub_filename( self -> temp_dir, buffer, sizeof buffer, self -> product_id );
456 if ( 0 != rc )
457 {
458 ErrMsg( "merge_sorter.c background_vector_merger_process_batch() -> %R", rc );
459 }
460 else
461 {
462 STATUS ( STAT_USR, "batch output filename is : %s", buffer );
463 rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, buffer );
464
465 if ( 0 == rc )
466 {
467 struct lookup_writer * writer; /* lookup_writer.h */
468 rc = make_lookup_writer( self -> dir, NULL, &writer,
469 self -> buf_size, "%s", buffer ); /* lookup_writer.c */
470 if ( 0 == rc )
471 {
472 self -> product_id += 1;
473 }
474 if ( 0 == rc )
475 {
476 bg_vec_merge_src * to_write = get_min_bg_vec_merge_src( batch, count ); /* above */
477 while( 0 == rc && NULL != to_write )
478 {
479 rc = get_quitting(); /* helper.c */
480 if ( 0 == rc )
481 {
482 rc = write_bg_vec_merge_src( to_write, writer ); /* above */
483 if ( 0 == rc )
484 {
485 self -> total++;
486 to_write = get_min_bg_vec_merge_src( batch, count ); /* above */
487 }
488 else
489 {
490 to_write = NULL;
491 }
492 bg_update_update( self -> gap, 1 );
493 if ( 0 != rc )
494 {
495 set_quitting(); /* helper.c */
496 }
497 }
498 }
499 release_lookup_writer( writer ); /* lookup_writer.c */
500 }
501 }
502
503 /*
504 if ( rc == 0 && self -> file_merger != NULL )
505 rc = lookup_check_file( self -> dir, self -> buf_size, buffer );
506 */
507
508 if ( 0 == rc && NULL != self -> file_merger )
509 {
510 rc = push_to_background_file_merger( self -> file_merger, buffer ); /* below */
511 }
512 }
513 return rc;
514 }
515
background_vector_merger_thread_func(const KThread * thread,void * data)516 static rc_t CC background_vector_merger_thread_func( const KThread * thread, void *data )
517 {
518 rc_t rc = 0;
519 background_vector_merger * self = data;
520 bool done = false;
521
522 STATUS ( STAT_USR, "starting background thread loop" );
523 while( 0 == rc && !done )
524 {
525 bg_vec_merge_src * batch = NULL;
526 uint32_t count = 0;
527
528 /* Step 1 : get n = batch_size KVector's out of the in_q */
529 STATUS ( STAT_USR, "collecting batch" );
530 rc = background_vector_merger_collect_batch( self, &batch, &count );
531 STATUS ( STAT_USR, "done collectin batch: rc = %R, count = %u", rc, count );
532 if ( 0 == rc )
533 {
534 done = ( 0 == count );
535 if ( !done )
536 {
537 if ( batch_valid( batch, count ) )
538 {
539 /* Step 2 : process the batch */
540 STATUS ( STAT_USR, "processing batch of %u vectors", count );
541 rc = background_vector_merger_process_batch( self, batch, count );
542 STATUS ( STAT_USR, "finished processing: rc = %R", rc );
543 }
544 else
545 {
546 STATUS ( STAT_USR, "we have an invalid batch!" );
547 rc = RC( rcVDB, rcNoTarg, rcConstructing, rcParam, rcInvalid );
548 }
549 }
550 }
551 if ( NULL != batch )
552 {
553 uint32_t i;
554 for ( i = 0; i < count; ++i )
555 {
556 release_bg_vec_merge_src( &( batch[ i ] ) );
557 }
558 free( batch );
559 }
560 }
561 STATUS ( STAT_USR, "exiting background thread loop" );
562 return rc;
563 }
564
make_background_vector_merger(struct background_vector_merger ** merger,KDirectory * dir,const struct temp_dir * temp_dir,struct KFastDumpCleanupTask * cleanup_task,struct background_file_merger * file_merger,uint32_t batch_size,uint32_t q_wait_time,size_t buf_size,struct bg_update * gap)565 rc_t make_background_vector_merger( struct background_vector_merger ** merger,
566 KDirectory * dir,
567 const struct temp_dir * temp_dir,
568 struct KFastDumpCleanupTask * cleanup_task,
569 struct background_file_merger * file_merger,
570 uint32_t batch_size,
571 uint32_t q_wait_time,
572 size_t buf_size,
573 struct bg_update * gap )
574 {
575 rc_t rc = 0;
576 background_vector_merger * b = calloc( 1, sizeof * b );
577 *merger = NULL;
578 if ( NULL == b )
579 {
580 rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
581 }
582 else
583 {
584 b -> dir = dir;
585 b -> temp_dir = temp_dir;
586 b -> batch_size = batch_size;
587 b -> q_wait_time = q_wait_time;
588 b -> buf_size = buf_size;
589 b -> file_merger = file_merger;
590 b -> cleanup_task = cleanup_task;
591 b -> gap = gap;
592 b -> total = 0;
593 b -> total_rowcount_prod = 0;
594
595 rc = KQueueMake ( &( b -> job_q ), batch_size );
596 if ( 0 == rc )
597 {
598 rc = helper_make_thread( &( b -> thread ), background_vector_merger_thread_func,
599 b, THREAD_DFLT_STACK_SIZE );
600 if ( 0 != rc )
601 {
602 ErrMsg( "merge_sorter.c helper_make_thread( vector-merger ) -> %R", rc );
603 }
604 }
605
606 if ( 0 == rc )
607 {
608 *merger = b;
609 }
610 else
611 {
612 release_background_vector_merger( b );
613 }
614 }
615 return rc;
616 }
617
tell_total_rowcount_to_vector_merger(background_vector_merger * self,uint64_t value)618 void tell_total_rowcount_to_vector_merger( background_vector_merger * self, uint64_t value )
619 {
620 if ( NULL != self )
621 {
622 self -> total_rowcount_prod = value;
623 tell_total_rowcount_to_file_merger( self -> file_merger, value );
624 }
625 }
626
seal_background_vector_merger(background_vector_merger * self)627 rc_t seal_background_vector_merger( background_vector_merger * self )
628 {
629 rc_t rc = KQueueSeal ( self -> job_q );
630 if ( 0 != rc )
631 {
632 ErrMsg( "merge_sorter.c seal_background_vector_merger() -> %R", rc );
633 }
634 return rc;
635 }
636
push_to_background_vector_merger(background_vector_merger * self,KVector * store)637 rc_t push_to_background_vector_merger( background_vector_merger * self, KVector * store )
638 {
639 rc_t rc;
640 bool running = true;
641 while ( running )
642 {
643 struct timeout_t tm;
644 rc = TimeoutInit ( &tm, self -> q_wait_time );
645 if ( 0 != rc )
646 {
647 ErrMsg( "merge_sorter.c push_to_background_vector_merger().TimeoutInit( %u ) -> %R", self -> q_wait_time, rc );
648 running = false;
649 }
650 else
651 {
652 rc = KQueuePush ( self -> job_q, store, &tm );
653 if ( 0 == rc )
654 {
655 running = false;
656 }
657 else
658 {
659 bool timed_out = ( GetRCState( rc ) == rcExhausted && GetRCObject( rc ) == ( enum RCObject )rcTimeout );
660 if ( timed_out )
661 {
662 KSleepMs( self -> q_wait_time );
663 }
664 else
665 {
666 ErrMsg( "merge_sorter.c push_to_background_vector_merger().KQueuePush() -> %R", rc );
667 running = false;
668 }
669 }
670 }
671 }
672 return rc;
673 }
674
675 /* =================================================================================
676 The background-file is composed from 1 background-thread, which is the consumer
677 of a job_q. The background_vector_merger above puts strings into the queue.
678 The background-merger pops the jobs out of the queue until it has assembled
679 a batch of jobs. It then processes this batch by merge-sorting the content of
680 the the files into a temporary file. The file-entries are key-value pairs with a 64-bit
681 key which is composed from the SEQID and one bit: first or second read in a spot.
682 The value is the packed READ ( pack_4na() in helper.c ).
683 The background-merger terminates when it's input-queue is sealed in perform_fastdump()
684 in fastdump.c after all background-vector-merger-threads ( producers ) have been joined.
685 The final output of the background-merger is a list of temporary files produced
686 in the temp-directory.
687 ================================================================================= */
688 typedef struct background_file_merger
689 {
690 KDirectory * dir; /* needed to perform the merge-sort */
691 const struct temp_dir * temp_dir; /* needed to create temp. files */
692 const char * lookup_filename;
693 const char * index_filename;
694 locked_file_list files; /* a locked file-list */
695 locked_value sealed; /* flag to signal if the input is sealed */
696 struct KFastDumpCleanupTask * cleanup_task; /* add the produced temp_files here too */
697 KThread * thread; /* the thread that performs the merge-sort */
698 uint32_t product_id; /* increased by one for each batch-run, used in temp-file-name */
699 uint32_t batch_size; /* how many KVectors have to arrive to run a batch */
700 uint32_t wait_time; /* time in milliseconds to sleep if waiting for files to process */
701 size_t buf_size; /* needed to perform the merge-sort */
702 struct bg_update * gap; /* visualize the gap after the producer finished */
703 uint64_t total_rows; /* how many rows have we processed */
704 uint64_t total_rowcount_prod; /* updated by the producer, informs the file-merger about the
705 rowcount to be processed */
706 } background_file_merger;
707
708
release_background_file_merger(background_file_merger * self)709 static void release_background_file_merger( background_file_merger * self )
710 {
711 if ( NULL != self )
712 {
713 locked_file_list_release( &( self -> files ), self -> dir );
714 locked_value_release( &( self -> sealed ) );
715 free( self );
716 }
717 }
718
wait_for_background_file_merger(background_file_merger * self)719 static rc_t wait_for_background_file_merger( background_file_merger * self )
720 {
721 rc_t rc_status;
722 rc_t rc = KThreadWait ( self -> thread, &rc_status );
723 if ( 0 == rc )
724 {
725 rc = rc_status;
726 }
727 return rc;
728 }
729
wait_for_and_release_background_file_merger(background_file_merger * self)730 rc_t wait_for_and_release_background_file_merger( background_file_merger * self )
731 {
732 rc_t rc = 0;
733 if ( NULL == self )
734 {
735 rc = RC( rcVDB, rcNoTarg, rcReleasing, rcSelf, rcNull );
736 }
737 else
738 {
739 /* while we are waiting on the background-file-merger to finish,
740 show a progress-indicator... */
741 rc = wait_for_background_file_merger( self );
742 if ( 0 == rc && ( self -> total_rows != self -> total_rowcount_prod ) )
743 {
744 rc = RC( rcVDB, rcNoTarg, rcConstructing, rcSize, rcInvalid );
745 ErrMsg( "merge_sorter.c wait_for_and_release_background_file_merger() %lu of %lu",
746 self -> total_rows, self -> total_rowcount_prod );
747 }
748 release_background_file_merger( self );
749 }
750 return rc;
751 }
752
753 /* called from the background-thread */
process_background_file_merger(background_file_merger * self)754 static rc_t process_background_file_merger( background_file_merger * self )
755 {
756 char tmp_filename[ 4096 ];
757
758 rc_t rc = generate_bg_merge_filename( self -> temp_dir, tmp_filename, sizeof tmp_filename,
759 self -> product_id );
760
761 if ( 0 == rc )
762 {
763 rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, tmp_filename );
764 }
765
766 if ( 0 == rc )
767 {
768 uint32_t num_src = 0;
769 VNamelist * batch_files;
770 rc = VNamelistMake ( &batch_files, self -> batch_size );
771 if ( 0 == rc )
772 {
773 uint32_t i;
774 rc_t rc1 = 0;
775 for ( i = 0; 0 == rc && 0 == rc1 && i < self -> batch_size; ++i )
776 {
777 const String * filename = NULL;
778 rc1 = locked_file_list_pop( &( self -> files ), &filename );
779 if ( 0 == rc1 && NULL != filename )
780 {
781 rc = VNamelistAppendString ( batch_files, filename );
782 if ( 0 == rc )
783 {
784 num_src++;
785 }
786 }
787 }
788
789 if ( 0 == rc )
790 {
791 merge_sorter sorter;
792 rc = init_merge_sorter( &sorter,
793 self -> dir,
794 tmp_filename, /* the output file */
795 NULL, /* opt. index_filename */
796 batch_files, /* the input files */
797 self -> buf_size,
798 num_src,
799 self -> gap );
800 if ( 0 == rc )
801 {
802 rc = run_merge_sorter( &sorter );
803 release_merge_sorter( &sorter );
804 }
805 }
806
807 if ( 0 == rc )
808 {
809 rc = delete_files( self -> dir, batch_files );
810 }
811
812 VNamelistRelease( batch_files );
813 }
814 }
815
816 if ( 0 == rc )
817 {
818 rc = locked_file_list_append( &( self -> files ), tmp_filename );
819 if ( 0 == rc )
820 {
821 self -> product_id += 1;
822 }
823 }
824 return rc;
825 }
826
process_final_background_file_merger(background_file_merger * self,uint32_t count)827 static rc_t process_final_background_file_merger( background_file_merger * self, uint32_t count )
828 {
829 VNamelist * batch_files;
830 rc_t rc = VNamelistMake ( &batch_files, count );
831 if ( 0 == rc )
832 {
833 uint32_t i;
834 uint32_t num_src = 0;
835 rc_t rc1 = 0;
836 for ( i = 0; 0 == rc && 0 == rc1 && i < count; ++i )
837 {
838 const String * filename = NULL;
839 rc1 = locked_file_list_pop( &( self -> files ), &filename );
840 if ( 0 == rc1 && filename != NULL )
841 {
842 rc = VNamelistAppendString ( batch_files, filename );
843 if ( 0 == rc )
844 {
845 num_src++;
846 }
847 }
848 }
849
850 if ( 0 == rc )
851 {
852 rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, self -> lookup_filename );
853 }
854 if ( 0 == rc )
855 {
856 rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, self -> index_filename );
857 }
858 if ( 0 == rc )
859 {
860 merge_sorter sorter;
861 rc = init_merge_sorter( &sorter,
862 self -> dir,
863 self -> lookup_filename, /* the output file */
864 self -> index_filename, /* opt. index_filename */
865 batch_files, /* the input files */
866 self -> buf_size,
867 num_src,
868 self -> gap );
869 if ( 0 == rc )
870 {
871 rc = run_merge_sorter( &sorter );
872 if ( 0 == rc )
873 {
874 self -> total_rows += sorter . total_entries;
875 }
876 release_merge_sorter( &sorter );
877 }
878 }
879
880 if ( 0 == rc )
881 {
882 rc = delete_files( self -> dir, batch_files );
883 }
884 VNamelistRelease( batch_files );
885 }
886 return rc;
887 }
888
background_file_merger_thread_func(const KThread * thread,void * data)889 static rc_t CC background_file_merger_thread_func( const KThread * thread, void *data )
890 {
891 rc_t rc = 0;
892 background_file_merger * self = data;
893 bool done = false;
894 while( 0 == rc && !done )
895 {
896 uint64_t sealed;
897 rc = locked_value_get( &( self -> sealed ), &sealed );
898 if ( 0 == rc )
899 {
900 uint32_t count;
901 rc = locked_file_list_count( &( self -> files ), &count );
902 if ( 0 == rc )
903 {
904 if ( sealed > 0 )
905 {
906 /* we are sealed... */
907 if ( 0 == count )
908 {
909 /* this should not happen, but for the sake of completeness */
910 done = true;
911 }
912 else if ( count > ( self -> batch_size ) )
913 {
914 /* we still have more than we can open, do one batch */
915 rc = process_background_file_merger( self );
916 }
917 else
918 {
919 /* we can do the final batch */
920 rc = process_final_background_file_merger( self, count );
921 done = true;
922 }
923 }
924 else
925 {
926 /* we are not sealed... */
927 if ( count < ( self -> batch_size ) )
928 {
929 /* let us take a little nap, until we get enough files, or get sealed */
930 KSleepMs( self -> wait_time );
931 }
932 else
933 {
934 /* we have enough files to process one batch */
935 rc = process_background_file_merger( self );
936 }
937 }
938 }
939 }
940 }
941 return rc;
942 }
943
make_background_file_merger(background_file_merger ** merger,KDirectory * dir,const struct temp_dir * temp_dir,struct KFastDumpCleanupTask * cleanup_task,const char * lookup_filename,const char * index_filename,uint32_t batch_size,uint32_t wait_time,size_t buf_size,struct bg_update * gap)944 rc_t make_background_file_merger( background_file_merger ** merger,
945 KDirectory * dir,
946 const struct temp_dir * temp_dir,
947 struct KFastDumpCleanupTask * cleanup_task,
948 const char * lookup_filename,
949 const char * index_filename,
950 uint32_t batch_size,
951 uint32_t wait_time,
952 size_t buf_size,
953 struct bg_update * gap )
954 {
955 rc_t rc = 0;
956 background_file_merger * b = calloc( 1, sizeof * b );
957 *merger = NULL;
958 if ( NULL == b )
959 {
960 rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
961 }
962 else
963 {
964 b -> dir = dir;
965 b -> temp_dir = temp_dir;
966 b -> lookup_filename = lookup_filename;
967 b -> index_filename = index_filename;
968 b -> batch_size = batch_size;
969 b -> wait_time = wait_time;
970 b -> buf_size = buf_size;
971 b -> cleanup_task = cleanup_task;
972 b -> gap = gap;
973 b -> total_rows = 0;
974 b -> total_rowcount_prod = 0;
975
976 rc = locked_file_list_init( &( b -> files ), 25 );
977 if ( 0 == rc )
978 {
979 rc = locked_value_init( &( b -> sealed ), 0 );
980 }
981 if ( 0 == rc )
982 {
983 rc = helper_make_thread( &( b -> thread ), background_file_merger_thread_func,
984 b, THREAD_DFLT_STACK_SIZE );
985 if ( 0 != rc )
986 {
987 ErrMsg( "merge_sorter.c helper_make_thread( file-mergerr ) -> %R", rc );
988 }
989 }
990
991 if ( 0 == rc )
992 {
993 *merger = b;
994 }
995 else
996 {
997 release_background_file_merger( b );
998 }
999 }
1000 return rc;
1001 }
1002
tell_total_rowcount_to_file_merger(background_file_merger * self,uint64_t value)1003 void tell_total_rowcount_to_file_merger( background_file_merger * self, uint64_t value )
1004 {
1005 if ( NULL != self )
1006 {
1007 self -> total_rowcount_prod = value;
1008 }
1009 }
1010
push_to_background_file_merger(background_file_merger * self,const char * filename)1011 rc_t push_to_background_file_merger( background_file_merger * self, const char * filename )
1012 {
1013 return locked_file_list_append( &( self -> files ), filename );
1014 }
1015
seal_background_file_merger(background_file_merger * self)1016 rc_t seal_background_file_merger( background_file_merger * self )
1017 {
1018 return locked_value_set( &( self -> sealed ), 1 );
1019 }
1020