1 /*===========================================================================
2 *
3 *                            PUBLIC DOMAIN NOTICE
4 *               National Center for Biotechnology Information
5 *
6 *  This software/database is a "United States Government Work" under the
7 *  terms of the United States Copyright Act.  It was written as part of
8 *  the author's official duties as a United States Government employee and
9 *  thus cannot be copyrighted.  This software/database is freely available
10 *  to the public for use. The National Library of Medicine and the U.S.
11 *  Government have not placed any restriction on its use or reproduction.
12 *
13 *  Although all reasonable efforts have been taken to ensure the accuracy
14 *  and reliability of the software and data, the NLM and the U.S.
15 *  Government do not and cannot warrant the performance or results that
16 *  may be obtained by using this software or data. The NLM and the U.S.
17 *  Government disclaim all warranties, express or implied, including
18 *  warranties of performance, merchantability or fitness for any particular
19 *  purpose.
20 *
21 *  Please cite the author in any work or product based on this material.
22 *
23 * ===========================================================================
24 *
25 */
26 #include "merge_sorter.h"
27 #include "lookup_reader.h"
28 #include "lookup_writer.h"
29 #include "index.h"
30 #include "helper.h"
31 
32 #include <klib/out.h>
33 #include <klib/text.h>
34 #include <klib/status.h>
35 #include <klib/printf.h>
36 #include <klib/progressbar.h>
37 #include <klib/time.h>
38 
39 #include <kproc/thread.h>
40 #include <kproc/queue.h>
41 #include <kproc/timeout.h>
42 
43 typedef struct merge_src
44 {
45     struct lookup_reader * reader;
46     uint64_t key;
47     SBuffer packed_bases;
48     rc_t rc;
49 } merge_src;
50 
51 
get_min_merge_src(merge_src * src,uint32_t count)52 static merge_src * get_min_merge_src( merge_src * src, uint32_t count )
53 {
54     merge_src * res = NULL;
55     uint32_t i;
56     for ( i = 0; i < count; ++i )
57     {
58         merge_src * item = &src[ i ];
59         if ( 0 == item -> rc )
60         {
61             if ( NULL == res )
62             {
63                 /* pick the first one */
64                 res = item;
65             }
66             else if ( item -> key < res -> key )
67             {
68                 /* if this src has a smaller key... */
69                 res = item;
70             }
71         }
72     }
73     return res;
74 }
75 
76 /* ================================================================================= */
77 
78 typedef struct merge_sorter
79 {
80 
81     struct lookup_writer * dst; /* lookup_writer.h */
82     struct index_writer * idx;  /* index.h */
83     merge_src * src;            /* vector of input-files to be merged */
84     struct bg_update * gap;     /* indicator of running merge */
85     uint64_t total_size, total_entries;
86     uint32_t num_src;
87 } merge_sorter;
88 
89 
init_merge_sorter(merge_sorter * self,KDirectory * dir,const char * output,const char * index,VNamelist * files,size_t buf_size,uint32_t num_src,struct bg_update * gap)90 static rc_t init_merge_sorter( merge_sorter * self,
91                                KDirectory * dir,
92                                const char * output,
93                                const char * index,
94                                VNamelist * files,
95                                size_t buf_size,
96                                uint32_t num_src,
97                                struct bg_update * gap )
98 {
99     rc_t rc = 0;
100     uint32_t i;
101 
102     if ( NULL != index )
103     {
104         rc = make_index_writer( dir, &( self -> idx ), buf_size,
105                         DFLT_INDEX_FREQUENCY, "%s", index ); /* index.h */
106     }
107     else
108     {
109         self -> idx = NULL;
110     }
111 
112     self -> total_size = 0;
113     self -> total_entries = 0;
114     self -> num_src = num_src;
115     self -> gap = gap;
116 
117     if ( 0 == rc )
118     {
119         rc = make_lookup_writer( dir, self -> idx, &( self -> dst ), buf_size, "%s", output ); /* lookup_writer.h */
120     }
121 
122     if ( 0 == rc )
123     {
124         self -> src = calloc( self -> num_src, sizeof * self-> src );
125         if ( NULL == self -> src )
126         {
127             rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
128             ErrMsg( "init_merge_sorter2.calloc( %d ) failed", ( ( sizeof * self -> src ) * self -> num_src ) );
129         }
130     }
131 
132     for ( i = 0; 0 == rc && i < self -> num_src; ++i )
133     {
134         const char * filename;
135         rc = VNameListGet ( files, i, &filename );
136         if ( 0 == rc )
137         {
138             merge_src * s = &self -> src[ i ];
139             if ( 0 == rc )
140             {
141                 rc = make_lookup_reader( dir, NULL, &s -> reader, buf_size, "%s", filename ); /* lookup_reader.h */
142             }
143             if ( 0 == rc )
144             {
145                 rc = make_SBuffer( &s -> packed_bases, 4096 );
146                 if ( 0 == rc )
147                 {
148                     s -> rc = lookup_reader_get( s -> reader, &s -> key, &s -> packed_bases ); /* lookup_reader.h */
149                 }
150             }
151         }
152     }
153     return rc;
154 }
155 
release_merge_sorter(merge_sorter * self)156 static void release_merge_sorter( merge_sorter * self )
157 {
158     release_lookup_writer( self -> dst );
159     release_index_writer( self -> idx );
160     if ( NULL != self -> src )
161     {
162         uint32_t i;
163         for ( i = 0; i < self -> num_src; ++i )
164         {
165             merge_src * s = &self -> src[ i ];
166             release_lookup_reader( s -> reader );
167             release_SBuffer( &s -> packed_bases );
168         }
169         free( ( void * ) self -> src );
170     }
171 }
172 
run_merge_sorter(merge_sorter * self)173 static rc_t run_merge_sorter( merge_sorter * self )
174 {
175     rc_t rc = 0;
176     uint64_t last_key = 0;
177     uint64_t loop_nr = 0;
178     merge_src * to_write = get_min_merge_src( self -> src, self -> num_src ); /* above */
179 
180     while( 0 == rc && NULL != to_write )
181     {
182         rc = get_quitting();    /* helper.c */
183         if ( 0 == rc )
184         {
185             if ( last_key > to_write -> key )
186             {
187                 rc = RC( rcVDB, rcNoTarg, rcWriting, rcFormat, rcInvalid );
188                 ErrMsg( "run_merge_sorter() %lu -> %lu in loop #%lu", last_key, to_write -> key, loop_nr );
189             }
190             else
191             {
192                 loop_nr ++;
193                 last_key = to_write -> key;
194                 rc = write_packed_to_lookup_writer( self -> dst,
195                                                     to_write -> key,
196                                                     &to_write -> packed_bases . S ); /* lookup_writer.h */
197 
198                 if ( 0 == rc )
199                 {
200                     to_write -> rc = lookup_reader_get( to_write -> reader,
201                                                         &to_write -> key,
202                                                         &to_write -> packed_bases ); /* lookup_reader.h */
203                 }
204                 to_write = get_min_merge_src( self -> src, self -> num_src ); /* above */
205             }
206             if ( 0 != rc )
207             {
208                 set_quitting();     /* helper.c */
209             }
210             bg_update_update( self -> gap, 1 ); /* signal to gap-update */
211         }
212     }
213     self -> total_entries += loop_nr;
214     return rc;
215 }
216 
217 
218 /* =================================================================================
219     The background-merger is composed from 1 background-thread, which is the consumer
220     of a job_q. The producer-pool in sorter.c puts KVector-instances into the queue.
221     The background-merger pops the jobs out of the queue until it has assembled
222     a batch of jobs. It then processes this batch by merge-sorting the content of
223     the KVector's into a temporary file. The entries are key-value pairs with a 64-bit
224     key which is composed from the SEQID and one bit: first or second read in a spot.
225     The value is the packed READ ( pack_4na() in helper.c ).
226     The background-merger terminates when it's input-queue is sealed in perform_fastdump()
227     in fastdump.c after all sorter-threads ( producers ) have been joined.
228     The final output of the background-merger is a list of temporary files produced
229     in the temp-directory.
230    ================================================================================= */
231 
232 typedef struct background_vector_merger
233 {
234     KDirectory * dir;               /* needed to perform the merge-sort */
235     const struct temp_dir * temp_dir; /* needed to create temp. files */
236     KQueue * job_q;                 /* the KVector objects arrive here from the lookup-producer */
237     KThread * thread;               /* the thread that performs the merge-sort */
238     struct background_file_merger * file_merger;    /* below */
239     struct KFastDumpCleanupTask * cleanup_task;     /* add the produced temp_files here too */
240     uint32_t product_id;            /* increased by one for each batch-run, used in temp-file-name */
241     uint32_t batch_size;            /* how many KVectors have to arrive to run a batch */
242     uint32_t q_wait_time;           /* timeout in milliseconds to get something out of in_q */
243     size_t buf_size;                /* needed to perform the merge-sort */
244     struct bg_update * gap;         /* visualize the gap after the producer finished */
245     uint64_t total;                 /* how many entries have been merged... */
246     uint64_t total_rowcount_prod;   /* updated by the producer, informs the vector-merger about the
247                                        rowcount to be processed */
248 } background_vector_merger;
249 
250 
wait_for_background_vector_merger(background_vector_merger * self)251 static rc_t wait_for_background_vector_merger( background_vector_merger * self )
252 {
253     rc_t rc_status;
254     rc_t rc = KThreadWait ( self -> thread, &rc_status );
255     if ( 0 == rc )
256     {
257         rc = rc_status;
258     }
259     return rc;
260 }
261 
release_background_vector_merger(background_vector_merger * self)262 static void release_background_vector_merger( background_vector_merger * self )
263 {
264     if ( NULL != self -> job_q )
265     {
266         KQueueRelease ( self -> job_q );
267     }
268     free( self );
269 }
270 
wait_for_and_release_background_vector_merger(background_vector_merger * self)271 rc_t wait_for_and_release_background_vector_merger( background_vector_merger * self )
272 {
273     rc_t rc = 0;
274     if ( NULL == self )
275     {
276         rc = RC( rcVDB, rcNoTarg, rcReleasing, rcSelf, rcNull );
277     }
278     else
279     {
280         /* while we are waiting on the background-vector-merger to finish,
281            show a progress-indicator... */
282         rc = wait_for_background_vector_merger( self );
283 
284         /* now we can signal to the file-merger that nothing will be pushed into the
285            queue any more... */
286 
287         if ( 0 == rc )
288         {
289             rc = seal_background_file_merger( self -> file_merger );
290         }
291 
292         if ( 0 == rc && ( self -> total != self -> total_rowcount_prod ) )
293         {
294             rc = RC( rcVDB, rcNoTarg, rcConstructing, rcSize, rcInvalid );
295             ErrMsg( "merge_sorter.c wait_for_and_release_background_vector_merger() : processed lookup rows: %lu of %lu",
296                     self -> total, self -> total_rowcount_prod );
297         }
298         release_background_vector_merger( self );
299     }
300 
301     if ( 0 != rc )
302     {
303         ErrMsg( "merge_sorter.c wait_for_and_release_background_vector_merger()", rc );
304     }
305     return rc;
306 }
307 
308 typedef struct bg_vec_merge_src
309 {
310     KVector * store;
311     uint64_t key;
312     const String * bases;
313     rc_t rc;
314 } bg_vec_merge_src;
315 
316 
init_bg_vec_merge_src(bg_vec_merge_src * src,KVector * store)317 static rc_t init_bg_vec_merge_src( bg_vec_merge_src * src, KVector * store )
318 {
319     src -> store = store;
320     src -> rc = KVectorGetFirstPtr ( src -> store, &( src -> key ), ( void ** )&( src -> bases ) );
321     return src -> rc;
322 }
323 
release_bg_vec_merge_src(bg_vec_merge_src * src)324 static void release_bg_vec_merge_src( bg_vec_merge_src * src )
325 {
326     if ( NULL != src -> store )
327     {
328         KVectorRelease( src -> store );
329     }
330 }
331 
get_min_bg_vec_merge_src(bg_vec_merge_src * batch,uint32_t count)332 static bg_vec_merge_src * get_min_bg_vec_merge_src( bg_vec_merge_src * batch, uint32_t count )
333 {
334     bg_vec_merge_src * res = NULL;
335     uint32_t i;
336     for ( i = 0; i < count; ++i )
337     {
338         bg_vec_merge_src * item = &batch[ i ];
339         if ( 0 == item -> rc )
340         {
341             if ( NULL == res )
342             {
343                 res = item;
344             }
345             else if ( item -> key < res -> key )
346             {
347                 res = item;
348             }
349         }
350     }
351     return res;
352 }
353 
write_bg_vec_merge_src(bg_vec_merge_src * src,struct lookup_writer * writer)354 static rc_t write_bg_vec_merge_src( bg_vec_merge_src * src, struct lookup_writer * writer )
355 {
356     rc_t rc = src -> rc;
357     if ( 0 == rc )
358     {
359         rc = write_packed_to_lookup_writer( writer, src -> key, src -> bases ); /* lookup_writer.c */
360         StringWhack ( src -> bases );
361         src -> bases = NULL;
362     }
363     if ( 0 == rc )
364     {
365         uint64_t next_key;
366         src -> rc = KVectorGetNextPtr ( src -> store, &next_key, src -> key, ( void ** )&( src -> bases ) );
367         src -> key = next_key;
368     }
369     return rc;
370 }
371 
background_vector_merger_collect_batch(background_vector_merger * self,bg_vec_merge_src ** batch,uint32_t * count)372 static rc_t background_vector_merger_collect_batch( background_vector_merger * self,
373                                                     bg_vec_merge_src ** batch,
374                                                     uint32_t * count )
375 {
376     rc_t rc = 0;
377     bg_vec_merge_src * b = calloc( self -> batch_size, sizeof * b );
378     *batch = NULL;
379     *count = 0;
380 
381     if ( NULL == b )
382     {
383         rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
384     }
385     else
386     {
387         bool sealed = false;
388         while ( 0 == rc && *count < self -> batch_size && !sealed )
389         {
390             struct timeout_t tm;
391             rc = TimeoutInit ( &tm, self -> q_wait_time );
392             if ( 0 == rc )
393             {
394                 KVector * store = NULL;
395                 rc = KQueuePop ( self -> job_q, ( void ** )&store, &tm );
396                 if ( 0 == rc )
397                 {
398                     /* we pulled out a store from the Q */
399                     STATUS ( STAT_USR, "KQueuePop() : store = %p", store );
400                     rc = init_bg_vec_merge_src( &( b[ *count ] ), store );
401                     if ( 0 == rc )
402                     {
403                         *count += 1;
404                     }
405                 }
406                 else
407                 {
408                     STATUS ( STAT_USR, "KQueuePop() : %R, store = %p", rc, store );
409                     if ( rcDone == GetRCState( rc ) && ( enum RCObject )rcData == GetRCObject( rc ) )
410                     {
411                         /* the other side has sealed the Q */
412                         sealed = true;
413                         rc = 0;
414                     }
415                     else if ( rcExhausted == GetRCState( rc ) && ( enum RCObject )rcTimeout == GetRCObject( rc ) )
416                     {
417                         /* we had a timeout while trying to get a store from the Q */
418                         rc = 0;
419                     }
420                 }
421             }
422         }
423     }
424     if ( 0 != *count )
425     {
426         *batch = b;
427         rc = 0;
428     }
429     else
430     {
431         free( b );
432     }
433     return rc;
434 }
435 
batch_valid(bg_vec_merge_src * batch,uint32_t count)436 static bool batch_valid( bg_vec_merge_src * batch, uint32_t count )
437 {
438     bool res = false;
439     uint32_t i;
440     for ( i = 0; i < count && !res; ++i )
441     {
442         if ( 0 == batch[ i ] . rc )
443         {
444             res = true;
445         }
446     }
447     return res;
448 }
449 
background_vector_merger_process_batch(background_vector_merger * self,bg_vec_merge_src * batch,uint32_t count)450 static rc_t background_vector_merger_process_batch( background_vector_merger * self,
451                                                     bg_vec_merge_src * batch,
452                                                     uint32_t count )
453 {
454     char buffer[ 4096 ];
455     rc_t rc = generate_bg_sub_filename( self -> temp_dir, buffer, sizeof buffer, self -> product_id );
456     if ( 0 != rc )
457     {
458         ErrMsg( "merge_sorter.c background_vector_merger_process_batch() -> %R", rc );
459     }
460     else
461     {
462         STATUS ( STAT_USR, "batch output filename is : %s", buffer );
463         rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, buffer );
464 
465         if ( 0 == rc )
466         {
467             struct lookup_writer * writer; /* lookup_writer.h */
468             rc = make_lookup_writer( self -> dir, NULL, &writer,
469                                      self -> buf_size, "%s", buffer ); /* lookup_writer.c */
470             if ( 0 == rc )
471             {
472                 self -> product_id += 1;
473             }
474             if ( 0 == rc )
475             {
476                 bg_vec_merge_src * to_write = get_min_bg_vec_merge_src( batch, count ); /* above */
477                 while( 0 == rc && NULL != to_write )
478                 {
479                     rc = get_quitting();    /* helper.c */
480                     if ( 0 == rc )
481                     {
482                         rc = write_bg_vec_merge_src( to_write, writer ); /* above */
483                         if ( 0 == rc )
484                         {
485                             self -> total++;
486                             to_write = get_min_bg_vec_merge_src( batch, count ); /* above */
487                         }
488                         else
489                         {
490                             to_write = NULL;
491                         }
492                         bg_update_update( self -> gap, 1 );
493                         if ( 0 != rc )
494                         {
495                             set_quitting();     /* helper.c */
496                         }
497                     }
498                 }
499                 release_lookup_writer( writer ); /* lookup_writer.c */
500             }
501         }
502 
503         /*
504         if ( rc == 0 && self -> file_merger != NULL )
505             rc = lookup_check_file( self -> dir, self -> buf_size, buffer );
506         */
507 
508         if ( 0 == rc && NULL != self -> file_merger )
509         {
510             rc = push_to_background_file_merger( self -> file_merger, buffer ); /* below */
511         }
512     }
513     return rc;
514 }
515 
background_vector_merger_thread_func(const KThread * thread,void * data)516 static rc_t CC background_vector_merger_thread_func( const KThread * thread, void *data )
517 {
518     rc_t rc = 0;
519     background_vector_merger * self = data;
520     bool done = false;
521 
522     STATUS ( STAT_USR, "starting background thread loop" );
523     while( 0 == rc && !done )
524     {
525         bg_vec_merge_src * batch = NULL;
526         uint32_t count = 0;
527 
528         /* Step 1 : get n = batch_size KVector's out of the in_q */
529         STATUS ( STAT_USR, "collecting batch" );
530         rc = background_vector_merger_collect_batch( self, &batch, &count );
531         STATUS ( STAT_USR, "done collectin batch: rc = %R, count = %u", rc, count );
532         if ( 0 == rc )
533         {
534             done = ( 0 == count );
535             if ( !done )
536             {
537                 if ( batch_valid( batch, count ) )
538                 {
539                     /* Step 2 : process the batch */
540                     STATUS ( STAT_USR, "processing batch of %u vectors", count );
541                     rc = background_vector_merger_process_batch( self, batch, count );
542                     STATUS ( STAT_USR, "finished processing: rc = %R", rc );
543                 }
544                 else
545                 {
546                     STATUS ( STAT_USR, "we have an invalid batch!" );
547                     rc = RC( rcVDB, rcNoTarg, rcConstructing, rcParam, rcInvalid );
548                 }
549             }
550         }
551         if ( NULL != batch )
552         {
553             uint32_t i;
554             for ( i = 0; i < count; ++i )
555             {
556                 release_bg_vec_merge_src( &( batch[ i ] ) );
557             }
558             free( batch );
559         }
560     }
561     STATUS ( STAT_USR, "exiting background thread loop" );
562     return rc;
563 }
564 
make_background_vector_merger(struct background_vector_merger ** merger,KDirectory * dir,const struct temp_dir * temp_dir,struct KFastDumpCleanupTask * cleanup_task,struct background_file_merger * file_merger,uint32_t batch_size,uint32_t q_wait_time,size_t buf_size,struct bg_update * gap)565 rc_t make_background_vector_merger( struct background_vector_merger ** merger,
566                              KDirectory * dir,
567                              const struct temp_dir * temp_dir,
568                              struct KFastDumpCleanupTask * cleanup_task,
569                              struct background_file_merger * file_merger,
570                              uint32_t batch_size,
571                              uint32_t q_wait_time,
572                              size_t buf_size,
573                              struct bg_update * gap )
574 {
575     rc_t rc = 0;
576     background_vector_merger * b = calloc( 1, sizeof * b );
577     *merger = NULL;
578     if ( NULL == b )
579     {
580         rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
581     }
582     else
583     {
584         b -> dir = dir;
585         b -> temp_dir = temp_dir;
586         b -> batch_size = batch_size;
587         b -> q_wait_time = q_wait_time;
588         b -> buf_size = buf_size;
589         b -> file_merger = file_merger;
590         b -> cleanup_task = cleanup_task;
591         b -> gap = gap;
592         b -> total = 0;
593         b -> total_rowcount_prod = 0;
594 
595         rc = KQueueMake ( &( b -> job_q ), batch_size );
596         if ( 0 == rc )
597         {
598             rc = helper_make_thread( &( b -> thread ), background_vector_merger_thread_func,
599                                      b, THREAD_DFLT_STACK_SIZE );
600             if ( 0 != rc )
601             {
602                 ErrMsg( "merge_sorter.c helper_make_thread( vector-merger ) -> %R", rc );
603             }
604         }
605 
606         if ( 0 == rc )
607         {
608             *merger = b;
609         }
610         else
611         {
612             release_background_vector_merger( b );
613         }
614     }
615     return rc;
616 }
617 
tell_total_rowcount_to_vector_merger(background_vector_merger * self,uint64_t value)618 void tell_total_rowcount_to_vector_merger( background_vector_merger * self, uint64_t value )
619 {
620     if ( NULL != self )
621     {
622         self -> total_rowcount_prod = value;
623         tell_total_rowcount_to_file_merger( self -> file_merger, value );
624     }
625 }
626 
seal_background_vector_merger(background_vector_merger * self)627 rc_t seal_background_vector_merger( background_vector_merger * self )
628 {
629     rc_t rc = KQueueSeal ( self -> job_q );
630     if ( 0 != rc )
631     {
632         ErrMsg( "merge_sorter.c seal_background_vector_merger() -> %R", rc );
633     }
634     return rc;
635 }
636 
push_to_background_vector_merger(background_vector_merger * self,KVector * store)637 rc_t push_to_background_vector_merger( background_vector_merger * self, KVector * store )
638 {
639     rc_t rc;
640     bool running = true;
641     while ( running )
642     {
643         struct timeout_t tm;
644         rc = TimeoutInit ( &tm, self -> q_wait_time );
645         if ( 0 != rc )
646         {
647             ErrMsg( "merge_sorter.c push_to_background_vector_merger().TimeoutInit( %u ) -> %R", self -> q_wait_time, rc );
648             running = false;
649         }
650         else
651         {
652             rc = KQueuePush ( self -> job_q, store, &tm );
653             if ( 0 == rc )
654             {
655                 running = false;
656             }
657             else
658             {
659                 bool timed_out = ( GetRCState( rc ) == rcExhausted && GetRCObject( rc ) == ( enum RCObject )rcTimeout );
660                 if ( timed_out )
661                 {
662                     KSleepMs( self -> q_wait_time );
663                 }
664                 else
665                 {
666                     ErrMsg( "merge_sorter.c push_to_background_vector_merger().KQueuePush() -> %R", rc );
667                     running = false;
668                 }
669             }
670         }
671     }
672     return rc;
673 }
674 
675 /* =================================================================================
676     The background-file is composed from 1 background-thread, which is the consumer
677     of a job_q. The background_vector_merger above puts strings into the queue.
678     The background-merger pops the jobs out of the queue until it has assembled
679     a batch of jobs. It then processes this batch by merge-sorting the content of
680     the the files into a temporary file. The file-entries are key-value pairs with a 64-bit
681     key which is composed from the SEQID and one bit: first or second read in a spot.
682     The value is the packed READ ( pack_4na() in helper.c ).
683     The background-merger terminates when it's input-queue is sealed in perform_fastdump()
684     in fastdump.c after all background-vector-merger-threads ( producers ) have been joined.
685     The final output of the background-merger is a list of temporary files produced
686     in the temp-directory.
687    ================================================================================= */
688 typedef struct background_file_merger
689 {
690     KDirectory * dir;               /* needed to perform the merge-sort */
691     const struct temp_dir * temp_dir;      /* needed to create temp. files */
692     const char * lookup_filename;
693     const char * index_filename;
694     locked_file_list files;         /* a locked file-list */
695     locked_value sealed;            /* flag to signal if the input is sealed */
696     struct KFastDumpCleanupTask * cleanup_task;     /* add the produced temp_files here too */
697     KThread * thread;               /* the thread that performs the merge-sort */
698     uint32_t product_id;            /* increased by one for each batch-run, used in temp-file-name */
699     uint32_t batch_size;            /* how many KVectors have to arrive to run a batch */
700     uint32_t wait_time;             /* time in milliseconds to sleep if waiting for files to process */
701     size_t buf_size;                /* needed to perform the merge-sort */
702     struct bg_update * gap;         /* visualize the gap after the producer finished */
703     uint64_t total_rows;            /* how many rows have we processed */
704     uint64_t total_rowcount_prod;   /* updated by the producer, informs the file-merger about the
705                                        rowcount to be processed */
706 } background_file_merger;
707 
708 
release_background_file_merger(background_file_merger * self)709 static void release_background_file_merger( background_file_merger * self )
710 {
711     if ( NULL != self )
712     {
713         locked_file_list_release( &( self -> files ), self -> dir );
714         locked_value_release( &( self -> sealed ) );
715         free( self );
716     }
717 }
718 
wait_for_background_file_merger(background_file_merger * self)719 static rc_t wait_for_background_file_merger( background_file_merger * self )
720 {
721     rc_t rc_status;
722     rc_t rc = KThreadWait ( self -> thread, &rc_status );
723     if ( 0 == rc )
724     {
725         rc = rc_status;
726     }
727     return rc;
728 }
729 
wait_for_and_release_background_file_merger(background_file_merger * self)730 rc_t wait_for_and_release_background_file_merger( background_file_merger * self )
731 {
732     rc_t rc = 0;
733     if ( NULL == self )
734     {
735         rc = RC( rcVDB, rcNoTarg, rcReleasing, rcSelf, rcNull );
736     }
737     else
738     {
739         /* while we are waiting on the background-file-merger to finish,
740            show a progress-indicator... */
741         rc = wait_for_background_file_merger( self );
742         if ( 0 == rc && ( self -> total_rows != self -> total_rowcount_prod ) )
743         {
744             rc = RC( rcVDB, rcNoTarg, rcConstructing, rcSize, rcInvalid );
745             ErrMsg( "merge_sorter.c wait_for_and_release_background_file_merger() %lu of %lu",
746                      self -> total_rows, self -> total_rowcount_prod );
747         }
748         release_background_file_merger( self );
749     }
750     return rc;
751 }
752 
753 /* called from the background-thread */
process_background_file_merger(background_file_merger * self)754 static rc_t process_background_file_merger( background_file_merger * self )
755 {
756     char tmp_filename[ 4096 ];
757 
758     rc_t rc = generate_bg_merge_filename( self -> temp_dir, tmp_filename, sizeof tmp_filename,
759                                           self -> product_id );
760 
761     if ( 0 == rc )
762     {
763         rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, tmp_filename );
764     }
765 
766     if ( 0 == rc )
767     {
768         uint32_t num_src = 0;
769         VNamelist * batch_files;
770         rc = VNamelistMake ( &batch_files, self -> batch_size );
771         if ( 0 == rc )
772         {
773             uint32_t i;
774             rc_t rc1 = 0;
775             for ( i = 0; 0 == rc && 0 == rc1 && i < self -> batch_size; ++i )
776             {
777                 const String * filename = NULL;
778                 rc1 = locked_file_list_pop( &( self -> files ), &filename );
779                 if ( 0 == rc1 && NULL != filename )
780                 {
781                     rc = VNamelistAppendString ( batch_files, filename );
782                     if ( 0 == rc )
783                     {
784                         num_src++;
785                     }
786                 }
787             }
788 
789             if ( 0 == rc )
790             {
791                 merge_sorter sorter;
792                 rc = init_merge_sorter( &sorter,
793                                         self -> dir,
794                                         tmp_filename,   /* the output file */
795                                         NULL,           /* opt. index_filename */
796                                         batch_files,    /* the input files */
797                                         self -> buf_size,
798                                         num_src,
799                                         self -> gap );
800                 if ( 0 == rc )
801                 {
802                     rc = run_merge_sorter( &sorter );
803                     release_merge_sorter( &sorter );
804                 }
805             }
806 
807             if ( 0 == rc )
808             {
809                 rc = delete_files( self -> dir, batch_files );
810             }
811 
812             VNamelistRelease( batch_files );
813         }
814     }
815 
816     if ( 0 == rc )
817     {
818         rc = locked_file_list_append( &( self -> files ), tmp_filename );
819         if ( 0 == rc )
820         {
821             self -> product_id += 1;
822         }
823     }
824     return rc;
825 }
826 
process_final_background_file_merger(background_file_merger * self,uint32_t count)827 static rc_t process_final_background_file_merger( background_file_merger * self, uint32_t count )
828 {
829     VNamelist * batch_files;
830     rc_t rc = VNamelistMake ( &batch_files, count );
831     if ( 0 == rc )
832     {
833         uint32_t i;
834         uint32_t num_src = 0;
835         rc_t rc1 = 0;
836         for ( i = 0; 0 == rc && 0 == rc1 && i < count; ++i )
837         {
838             const String * filename = NULL;
839             rc1 = locked_file_list_pop( &( self -> files ), &filename );
840             if ( 0 == rc1 && filename != NULL )
841             {
842                 rc = VNamelistAppendString ( batch_files, filename );
843                 if ( 0 == rc )
844                 {
845                     num_src++;
846                 }
847             }
848         }
849 
850         if ( 0 == rc )
851         {
852             rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, self -> lookup_filename );
853         }
854         if ( 0 == rc )
855         {
856             rc = Add_File_to_Cleanup_Task ( self -> cleanup_task, self -> index_filename );
857         }
858         if ( 0 == rc )
859         {
860             merge_sorter sorter;
861             rc = init_merge_sorter( &sorter,
862                                     self -> dir,
863                                     self -> lookup_filename,   /* the output file */
864                                     self -> index_filename,    /* opt. index_filename */
865                                     batch_files,               /* the input files */
866                                     self -> buf_size,
867                                     num_src,
868                                     self -> gap );
869             if ( 0 == rc )
870             {
871                 rc = run_merge_sorter( &sorter );
872                 if ( 0 == rc )
873                 {
874                     self -> total_rows += sorter . total_entries;
875                 }
876                 release_merge_sorter( &sorter );
877             }
878         }
879 
880         if ( 0 == rc )
881         {
882             rc = delete_files( self -> dir, batch_files );
883         }
884         VNamelistRelease( batch_files );
885     }
886     return rc;
887 }
888 
background_file_merger_thread_func(const KThread * thread,void * data)889 static rc_t CC background_file_merger_thread_func( const KThread * thread, void *data )
890 {
891     rc_t rc = 0;
892     background_file_merger * self = data;
893     bool done = false;
894     while( 0 == rc && !done )
895     {
896         uint64_t sealed;
897         rc = locked_value_get( &( self -> sealed ), &sealed );
898         if ( 0 == rc )
899         {
900             uint32_t count;
901             rc = locked_file_list_count( &( self -> files ), &count );
902             if ( 0 == rc )
903             {
904                 if ( sealed > 0 )
905                 {
906                     /* we are sealed... */
907                     if ( 0 == count )
908                     {
909                         /* this should not happen, but for the sake of completeness */
910                         done = true;
911                     }
912                     else if ( count > ( self -> batch_size ) )
913                     {
914                         /* we still have more than we can open, do one batch */
915                         rc = process_background_file_merger( self );
916                     }
917                     else
918                     {
919                         /* we can do the final batch */
920                         rc = process_final_background_file_merger( self, count );
921                         done = true;
922                     }
923                 }
924                 else
925                 {
926                     /* we are not sealed... */
927                     if ( count < ( self -> batch_size ) )
928                     {
929                         /* let us take a little nap, until we get enough files, or get sealed */
930                         KSleepMs( self -> wait_time );
931                     }
932                     else
933                     {
934                         /* we have enough files to process one batch */
935                         rc = process_background_file_merger( self );
936                     }
937                 }
938             }
939         }
940     }
941     return rc;
942 }
943 
make_background_file_merger(background_file_merger ** merger,KDirectory * dir,const struct temp_dir * temp_dir,struct KFastDumpCleanupTask * cleanup_task,const char * lookup_filename,const char * index_filename,uint32_t batch_size,uint32_t wait_time,size_t buf_size,struct bg_update * gap)944 rc_t make_background_file_merger( background_file_merger ** merger,
945                                 KDirectory * dir,
946                                 const struct temp_dir * temp_dir,
947                                 struct KFastDumpCleanupTask * cleanup_task,
948                                 const char * lookup_filename,
949                                 const char * index_filename,
950                                 uint32_t batch_size,
951                                 uint32_t wait_time,
952                                 size_t buf_size,
953                                 struct bg_update * gap )
954 {
955     rc_t rc = 0;
956     background_file_merger * b = calloc( 1, sizeof * b );
957     *merger = NULL;
958     if ( NULL == b )
959     {
960         rc = RC( rcVDB, rcNoTarg, rcConstructing, rcMemory, rcExhausted );
961     }
962     else
963     {
964         b -> dir = dir;
965         b -> temp_dir = temp_dir;
966         b -> lookup_filename = lookup_filename;
967         b -> index_filename = index_filename;
968         b -> batch_size = batch_size;
969         b -> wait_time = wait_time;
970         b -> buf_size = buf_size;
971         b -> cleanup_task = cleanup_task;
972         b -> gap = gap;
973         b -> total_rows = 0;
974         b -> total_rowcount_prod = 0;
975 
976         rc = locked_file_list_init( &( b -> files ), 25  );
977         if ( 0 == rc )
978         {
979             rc = locked_value_init( &( b -> sealed ), 0 );
980         }
981         if ( 0 == rc )
982         {
983             rc = helper_make_thread( &( b -> thread ), background_file_merger_thread_func,
984                                      b, THREAD_DFLT_STACK_SIZE );
985             if ( 0 != rc )
986             {
987                 ErrMsg( "merge_sorter.c helper_make_thread( file-mergerr ) -> %R", rc );
988             }
989         }
990 
991         if ( 0 == rc )
992         {
993             *merger = b;
994         }
995         else
996         {
997             release_background_file_merger( b );
998         }
999     }
1000     return rc;
1001 }
1002 
tell_total_rowcount_to_file_merger(background_file_merger * self,uint64_t value)1003 void tell_total_rowcount_to_file_merger( background_file_merger * self, uint64_t value )
1004 {
1005     if ( NULL != self )
1006     {
1007         self -> total_rowcount_prod = value;
1008     }
1009 }
1010 
push_to_background_file_merger(background_file_merger * self,const char * filename)1011 rc_t push_to_background_file_merger( background_file_merger * self, const char * filename )
1012 {
1013     return locked_file_list_append( &( self -> files ), filename );
1014 }
1015 
seal_background_file_merger(background_file_merger * self)1016 rc_t seal_background_file_merger( background_file_merger * self )
1017 {
1018     return locked_value_set( &( self -> sealed ), 1 );
1019 }
1020