1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include <toku_portability.h>
41 
42 #include <arpa/inet.h>
43 
44 #include <stdio.h>
45 #include <memory.h>
46 #include <errno.h>
47 #include <toku_assert.h>
48 #include <string.h>
49 #include <fcntl.h>
50 
51 #include "ft/ft.h"
52 #include "ft/ft-internal.h"
53 #include "ft/leafentry.h"
54 #include "ft/loader/loader-internal.h"
55 #include "ft/loader/pqueue.h"
56 #include "ft/loader/dbufio.h"
57 #include "ft/logger/log-internal.h"
58 #include "ft/node.h"
59 #include "ft/serialize/block_table.h"
60 #include "ft/serialize/ft-serialize.h"
61 #include "ft/serialize/ft_node-serialize.h"
62 #include "ft/serialize/sub_block.h"
63 
64 #include "util/x1764.h"
65 
66 toku_instr_key *loader_bl_mutex_key;
67 toku_instr_key *loader_fi_lock_mutex_key;
68 toku_instr_key *loader_out_mutex_key;
69 
70 toku_instr_key *extractor_thread_key;
71 toku_instr_key *fractal_thread_key;
72 
73 toku_instr_key *tokudb_file_tmp_key;
74 toku_instr_key *tokudb_file_load_key;
75 
76 // 1024 is the right size_factor for production.
77 // Different values for these sizes may be used for testing.
78 static uint32_t size_factor = 1024;
79 static uint32_t default_loader_nodesize = FT_DEFAULT_NODE_SIZE;
80 static uint32_t default_loader_basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
81 
82 void
toku_ft_loader_set_size_factor(uint32_t factor)83 toku_ft_loader_set_size_factor(uint32_t factor) {
84 // For test purposes only
85     size_factor = factor;
86     default_loader_nodesize = (size_factor==1) ? (1<<15) : FT_DEFAULT_NODE_SIZE;
87 }
88 
89 uint64_t
toku_ft_loader_get_rowset_budget_for_testing(void)90 toku_ft_loader_get_rowset_budget_for_testing (void)
91 // For test purposes only.  In production, the rowset size is determined by negotiation with the cachetable for some memory.  (See #2613).
92 {
93     return 16ULL*size_factor*1024ULL;
94 }
95 
ft_loader_lock_init(FTLOADER bl)96 void ft_loader_lock_init(FTLOADER bl) {
97     invariant(!bl->mutex_init);
98     toku_mutex_init(*loader_bl_mutex_key, &bl->mutex, nullptr);
99     bl->mutex_init = true;
100 }
101 
ft_loader_lock_destroy(FTLOADER bl)102 void ft_loader_lock_destroy(FTLOADER bl) {
103     if (bl->mutex_init) {
104         toku_mutex_destroy(&bl->mutex);
105         bl->mutex_init = false;
106     }
107 }
108 
ft_loader_lock(FTLOADER bl)109 static void ft_loader_lock(FTLOADER bl) {
110     invariant(bl->mutex_init);
111     toku_mutex_lock(&bl->mutex);
112 }
113 
ft_loader_unlock(FTLOADER bl)114 static void ft_loader_unlock(FTLOADER bl) {
115     invariant(bl->mutex_init);
116     toku_mutex_unlock(&bl->mutex);
117 }
118 
add_big_buffer(struct file_info * file)119 static int add_big_buffer(struct file_info *file) {
120     int result = 0;
121     bool newbuffer = false;
122     if (file->buffer == NULL) {
123         file->buffer = toku_malloc(file->buffer_size);
124         if (file->buffer == NULL)
125             result = get_error_errno();
126         else
127             newbuffer = true;
128     }
129     if (result == 0) {
130         int r = setvbuf(file->file->file,
131                         static_cast<char *>(file->buffer),
132                         _IOFBF,
133                         file->buffer_size);
134         if (r != 0) {
135             result = get_error_errno();
136             if (newbuffer) {
137                 toku_free(file->buffer);
138                 file->buffer = NULL;
139             }
140         }
141     }
142     return result;
143 }
144 
cleanup_big_buffer(struct file_info * file)145 static void cleanup_big_buffer(struct file_info *file) {
146     if (file->buffer) {
147         toku_free(file->buffer);
148         file->buffer = NULL;
149     }
150 }
151 
ft_loader_init_file_infos(struct file_infos * fi)152 int ft_loader_init_file_infos(struct file_infos *fi) {
153     int result = 0;
154     toku_mutex_init(*loader_fi_lock_mutex_key, &fi->lock, nullptr);
155     fi->n_files = 0;
156     fi->n_files_limit = 1;
157     fi->n_files_open = 0;
158     fi->n_files_extant = 0;
159     MALLOC_N(fi->n_files_limit, fi->file_infos);
160     if (fi->file_infos == NULL)
161         result = get_error_errno();
162     return result;
163 }
164 
ft_loader_fi_destroy(struct file_infos * fi,bool is_error)165 void ft_loader_fi_destroy (struct file_infos *fi, bool is_error)
166 // Effect: Free the resources in the fi.
167 // If is_error then we close and unlink all the temp files.
168 // If !is_error then requires that all the temp files have been closed and destroyed
169 // No error codes are returned.  If anything goes wrong with closing and unlinking then it's only in an is_error case, so we don't care.
170 {
171     if (fi->file_infos == NULL) {
172         // ft_loader_init_file_infos guarantees this isn't null, so if it is, we know it hasn't been inited yet and we don't need to destroy it.
173         return;
174     }
175     toku_mutex_destroy(&fi->lock);
176     if (!is_error) {
177         invariant(fi->n_files_open==0);
178         invariant(fi->n_files_extant==0);
179     }
180     for (int i=0; i<fi->n_files; i++) {
181         if (fi->file_infos[i].is_open) {
182             invariant(is_error);
183             toku_os_fclose(fi->file_infos[i].file); // don't check for errors, since we are in an error case.
184         }
185         if (fi->file_infos[i].is_extant) {
186             invariant(is_error);
187             unlink(fi->file_infos[i].fname);
188             toku_free(fi->file_infos[i].fname);
189         }
190         cleanup_big_buffer(&fi->file_infos[i]);
191     }
192     toku_free(fi->file_infos);
193     fi->n_files=0;
194     fi->n_files_limit=0;
195     fi->file_infos = NULL;
196 }
197 
open_file_add(struct file_infos * fi,TOKU_FILE * file,char * fname,FIDX * idx)198 static int open_file_add(struct file_infos *fi,
199                          TOKU_FILE *file,
200                          char *fname,
201                          /* out */ FIDX *idx) {
202     int result = 0;
203     toku_mutex_lock(&fi->lock);
204     if (fi->n_files >= fi->n_files_limit) {
205         fi->n_files_limit *=2;
206         XREALLOC_N(fi->n_files_limit, fi->file_infos);
207     }
208     invariant(fi->n_files < fi->n_files_limit);
209     fi->file_infos[fi->n_files].is_open   = true;
210     fi->file_infos[fi->n_files].is_extant = true;
211     fi->file_infos[fi->n_files].fname     = fname;
212     fi->file_infos[fi->n_files].file      = file;
213     fi->file_infos[fi->n_files].n_rows    = 0;
214     fi->file_infos[fi->n_files].buffer_size = FILE_BUFFER_SIZE;
215     fi->file_infos[fi->n_files].buffer    = NULL;
216     result = add_big_buffer(&fi->file_infos[fi->n_files]);
217     if (result == 0) {
218         idx->idx = fi->n_files;
219         fi->n_files++;
220         fi->n_files_extant++;
221         fi->n_files_open++;
222     }
223    toku_mutex_unlock(&fi->lock);
224     return result;
225 }
226 
ft_loader_fi_reopen(struct file_infos * fi,FIDX idx,const char * mode)227 int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) {
228     int result = 0;
229     toku_mutex_lock(&fi->lock);
230     int i = idx.idx;
231     invariant(i >= 0 && i < fi->n_files);
232     invariant(!fi->file_infos[i].is_open);
233     invariant(fi->file_infos[i].is_extant);
234     fi->file_infos[i].file =
235         toku_os_fopen(fi->file_infos[i].fname, mode, *tokudb_file_load_key);
236     if (fi->file_infos[i].file == NULL) {
237         result = get_error_errno();
238     } else {
239         fi->file_infos[i].is_open = true;
240         // No longer need the big buffer for reopened files.  Don't allocate the space, we need it elsewhere.
241         //add_big_buffer(&fi->file_infos[i]);
242         fi->n_files_open++;
243     }
244     toku_mutex_unlock(&fi->lock);
245     return result;
246 }
247 
ft_loader_fi_close(struct file_infos * fi,FIDX idx,bool require_open)248 int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open)
249 {
250     int result = 0;
251     toku_mutex_lock(&fi->lock);
252     invariant(idx.idx >=0 && idx.idx < fi->n_files);
253     if (fi->file_infos[idx.idx].is_open) {
254         invariant(fi->n_files_open>0);   // loader-cleanup-test failure
255         fi->n_files_open--;
256         fi->file_infos[idx.idx].is_open = false;
257         int r = toku_os_fclose(fi->file_infos[idx.idx].file);
258         if (r)
259             result = get_error_errno();
260         cleanup_big_buffer(&fi->file_infos[idx.idx]);
261     } else if (require_open)
262         result = EINVAL;
263     toku_mutex_unlock(&fi->lock);
264     return result;
265 }
266 
ft_loader_fi_unlink(struct file_infos * fi,FIDX idx)267 int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx) {
268     int result = 0;
269     toku_mutex_lock(&fi->lock);
270     int id = idx.idx;
271     invariant(id >=0 && id < fi->n_files);
272     if (fi->file_infos[id].is_extant) { // must still exist
273         invariant(fi->n_files_extant>0);
274         fi->n_files_extant--;
275         invariant(!fi->file_infos[id].is_open); // must be closed before we unlink
276         fi->file_infos[id].is_extant = false;
277         int r = unlink(fi->file_infos[id].fname);
278         if (r != 0)
279             result = get_error_errno();
280         toku_free(fi->file_infos[id].fname);
281         fi->file_infos[id].fname = NULL;
282     } else
283         result = EINVAL;
284     toku_mutex_unlock(&fi->lock);
285     return result;
286 }
287 
288 int
ft_loader_fi_close_all(struct file_infos * fi)289 ft_loader_fi_close_all(struct file_infos *fi) {
290     int rval = 0;
291     for (int i = 0; i < fi->n_files; i++) {
292         int r;
293         FIDX idx = { i };
294         r = ft_loader_fi_close(fi, idx, false);  // ignore files that are already closed
295         if (rval == 0 && r)
296             rval = r;  // capture first error
297     }
298     return rval;
299 }
300 
ft_loader_open_temp_file(FTLOADER bl,FIDX * file_idx)301 int ft_loader_open_temp_file (FTLOADER bl, FIDX *file_idx)
302 /* Effect: Open a temporary file in read-write mode.  Save enough information to close and delete the file later.
303  * Return value: 0 on success, an error number otherwise.
304  *  On error, *file_idx and *fnamep will be unmodified.
305  *  The open file will be saved in bl->file_infos so that even if errors happen we can free them all.
306  */
307 {
308     int result = 0;
309     if (result)  // debug hack
310         return result;
311     TOKU_FILE *f = NULL;
312     int fd = -1;
313     char *fname = toku_strdup(bl->temp_file_template);
314     if (fname == NULL)
315         result = get_error_errno();
316     else {
317         fd = mkstemp(fname);
318         if (fd < 0) {
319             result = get_error_errno();
320         } else {
321             f = toku_os_fdopen(fd, "r+", fname, *tokudb_file_tmp_key);
322             if (f->file == nullptr)
323                 result = get_error_errno();
324             else
325                 result = open_file_add(&bl->file_infos, f, fname, file_idx);
326         }
327     }
328     if (result != 0) {
329         if (fd >= 0) {
330             toku_os_close(fd);
331             unlink(fname);
332         }
333         if (f != NULL)
334             toku_os_fclose(f);  // don't check for error because we're already in an error case
335         if (fname != NULL)
336             toku_free(fname);
337     }
338     return result;
339 }
340 
toku_ft_loader_internal_destroy(FTLOADER bl,bool is_error)341 void toku_ft_loader_internal_destroy(FTLOADER bl, bool is_error) {
342     ft_loader_lock_destroy(bl);
343 
344     // These frees rely on the fact that if you free a NULL pointer then nothing bad happens.
345     toku_free(bl->dbs);
346     toku_free(bl->descriptors);
347     toku_free(bl->root_xids_that_created);
348     if (bl->new_fnames_in_env) {
349         for (int i = 0; i < bl->N; i++)
350             toku_free((char*)bl->new_fnames_in_env[i]);
351         toku_free(bl->new_fnames_in_env);
352     }
353     toku_free(bl->extracted_datasizes);
354     toku_free(bl->bt_compare_funs);
355     toku_free((char*)bl->temp_file_template);
356     ft_loader_fi_destroy(&bl->file_infos, is_error);
357 
358     for (int i = 0; i < bl->N; i++)
359         destroy_rowset(&bl->rows[i]);
360     toku_free(bl->rows);
361 
362     for (int i = 0; i < bl->N; i++)
363         destroy_merge_fileset(&bl->fs[i]);
364     toku_free(bl->fs);
365 
366     if (bl->last_key) {
367         for (int i=0; i < bl->N; i++) {
368             toku_free(bl->last_key[i].data);
369         }
370         toku_free(bl->last_key);
371         bl->last_key = NULL;
372     }
373 
374     destroy_rowset(&bl->primary_rowset);
375     if (bl->primary_rowset_queue) {
376         toku_queue_destroy(bl->primary_rowset_queue);
377         bl->primary_rowset_queue = nullptr;
378     }
379 
380     for (int i=0; i<bl->N; i++) {
381         if ( bl->fractal_queues ) {
382             invariant(bl->fractal_queues[i]==NULL);
383         }
384     }
385     toku_free(bl->fractal_threads);
386     toku_free(bl->fractal_queues);
387     toku_free(bl->fractal_threads_live);
388 
389     if (bl->did_reserve_memory) {
390         invariant(bl->cachetable);
391         toku_cachetable_release_reserved_memory(bl->cachetable, bl->reserved_memory);
392     }
393 
394     ft_loader_destroy_error_callback(&bl->error_callback);
395     ft_loader_destroy_poll_callback(&bl->poll_callback);
396 
397     //printf("Progress=%d/%d\n", bl->progress, PROGRESS_MAX);
398 
399     toku_free(bl);
400 }
401 
402 static void *extractor_thread (void*);
403 
404 #define MAX(a,b) (((a)<(b)) ? (b) : (a))
405 
memory_per_rowset_during_extract(FTLOADER bl)406 static uint64_t memory_per_rowset_during_extract (FTLOADER bl)
407 // Return how much memory can be allocated for each rowset.
408 {
409     if (size_factor==1) {
410         return 16*1024;
411     } else {
412         // There is a primary rowset being maintained by the foreground thread.
413         // There could be two more in the queue.
414         // There is one rowset for each index (bl->N) being filled in.
415         // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
416         int n_copies = (1 // primary rowset
417                         +EXTRACTOR_QUEUE_DEPTH  // the number of primaries in the queue
418                         +bl->N // the N rowsets being constructed by the extractor thread.
419                         +bl->N // the N sort buffers
420                         +1     // Give the extractor thread one more so that it can have temporary space for sorting.  This is overkill.
421                         );
422         int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE;  // for each index we are writing to a file at any given time.
423         int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies);
424         return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY);
425     }
426 }
427 
ft_loader_get_fractal_workers_count(FTLOADER bl)428 static unsigned ft_loader_get_fractal_workers_count(FTLOADER bl) {
429     unsigned w = 0;
430     while (1) {
431         ft_loader_lock(bl);
432         w = bl->fractal_workers;
433         ft_loader_unlock(bl);
434         if (w != 0)
435             break;
436         toku_pthread_yield();  // maybe use a cond var instead
437     }
438     return w;
439 }
440 
ft_loader_set_fractal_workers_count(FTLOADER bl)441 static void ft_loader_set_fractal_workers_count(FTLOADER bl) {
442     ft_loader_lock(bl);
443     if (bl->fractal_workers == 0)
444         bl->fractal_workers = 1;
445     ft_loader_unlock(bl);
446 }
447 
448 // To compute a merge, we have a certain amount of memory to work with.
449 // We perform only one fanin at a time.
450 // If the fanout is F then we are using
451 //   F merges.  Each merge uses
452 //   DBUFIO_DEPTH buffers for double buffering.  Each buffer is of size at least MERGE_BUF_SIZE
453 // so the memory is
454 //   F*MERGE_BUF_SIZE*DBUFIO_DEPTH storage.
455 // We use some additional space to buffer the outputs.
456 //  That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
457 //  And we have FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE per queue
458 //  And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
459 //
460 // DBUFIO_DEPTH*F*MERGE_BUF_SIZE + FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE + WORKERS*NODESIZE*2 <= RESERVED_MEMORY
461 
memory_avail_during_merge(FTLOADER bl,bool is_fractal_node)462 static int64_t memory_avail_during_merge(FTLOADER bl, bool is_fractal_node) {
463     // avail memory = reserved memory - WORKERS*NODESIZE*2 for the last merge stage only
464     int64_t avail_memory = bl->reserved_memory;
465     if (is_fractal_node) {
466         // reserve space for the fractal writer thread buffers
467         avail_memory -= (int64_t)ft_loader_get_fractal_workers_count(bl) * (int64_t)default_loader_nodesize * 2; // compressed and uncompressed buffers
468     }
469     return avail_memory;
470 }
471 
merge_fanin(FTLOADER bl,bool is_fractal_node)472 static int merge_fanin (FTLOADER bl, bool is_fractal_node) {
473     // return number of temp files to read in this pass
474     int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
475     int64_t nbuffers = memory_avail / (int64_t)TARGET_MERGE_BUF_SIZE;
476     if (is_fractal_node)
477         nbuffers -= FRACTAL_WRITER_ROWSETS;
478     return MAX(nbuffers / (int64_t)DBUFIO_DEPTH, (int)MIN_MERGE_FANIN);
479 }
480 
memory_per_rowset_during_merge(FTLOADER bl,int merge_factor,bool is_fractal_node)481 static uint64_t memory_per_rowset_during_merge (FTLOADER bl, int merge_factor, bool is_fractal_node // if it is being sent to a q
482                                                 ) {
483     int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
484     int64_t nbuffers = DBUFIO_DEPTH * merge_factor;
485     if (is_fractal_node)
486         nbuffers += FRACTAL_WRITER_ROWSETS;
487     return MAX(memory_avail / nbuffers, (int64_t)MIN_MERGE_BUF_SIZE);
488 }
489 
toku_ft_loader_internal_init(FTLOADER * blp,CACHETABLE cachetable,generate_row_for_put_func g,DB * src_db,int N,FT_HANDLE fts[],DB * dbs[],const char * new_fnames_in_env[],ft_compare_func bt_compare_functions[],const char * temp_file_template,LSN load_lsn,TOKUTXN txn,bool reserve_memory,uint64_t reserve_memory_size,bool compress_intermediates,bool allow_puts)490 int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
491                                    CACHETABLE cachetable,
492                                    generate_row_for_put_func g,
493                                    DB *src_db,
494                                    int N, FT_HANDLE fts[/*N*/], DB* dbs[/*N*/],
495                                    const char *new_fnames_in_env[/*N*/],
496                                    ft_compare_func bt_compare_functions[/*N*/],
497                                    const char *temp_file_template,
498                                    LSN load_lsn,
499                                    TOKUTXN txn,
500                                    bool reserve_memory,
501                                    uint64_t reserve_memory_size,
502                                    bool compress_intermediates,
503                                    bool allow_puts)
504 // Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
505 {
506     FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
507     if (!bl) return get_error_errno();
508 
509     bl->generate_row_for_put = g;
510     bl->cachetable = cachetable;
511     if (reserve_memory && bl->cachetable) {
512         bl->did_reserve_memory = true;
513         bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0, reserve_memory_size); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with).
514     }
515     else {
516         bl->did_reserve_memory = false;
517         bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
518     }
519     bl->compress_intermediates = compress_intermediates;
520     bl->allow_puts = allow_puts;
521     bl->src_db = src_db;
522     bl->N = N;
523     bl->load_lsn = load_lsn;
524     if (txn) {
525         bl->load_root_xid = txn->txnid.parent_id64;
526     }
527     else {
528         bl->load_root_xid = TXNID_NONE;
529     }
530 
531     ft_loader_init_error_callback(&bl->error_callback);
532     ft_loader_init_poll_callback(&bl->poll_callback);
533 
534 #define MY_CALLOC_N(n,v) CALLOC_N(n,v); if (!v) { int r = get_error_errno(); toku_ft_loader_internal_destroy(bl, true); return r; }
535 #define SET_TO_MY_STRDUP(lval, s) do { char *v = toku_strdup(s); if (!v) { int r = get_error_errno(); toku_ft_loader_internal_destroy(bl, true); return r; } lval = v; } while (0)
536 
537     MY_CALLOC_N(N, bl->root_xids_that_created);
538     for (int i=0; i<N; i++) if (fts[i]) bl->root_xids_that_created[i]=fts[i]->ft->h->root_xid_that_created;
539     MY_CALLOC_N(N, bl->dbs);
540     for (int i=0; i<N; i++) if (fts[i]) bl->dbs[i]=dbs[i];
541     MY_CALLOC_N(N, bl->descriptors);
542     for (int i=0; i<N; i++) if (fts[i]) bl->descriptors[i]=&fts[i]->ft->descriptor;
543     MY_CALLOC_N(N, bl->new_fnames_in_env);
544     for (int i=0; i<N; i++) SET_TO_MY_STRDUP(bl->new_fnames_in_env[i], new_fnames_in_env[i]);
545     MY_CALLOC_N(N, bl->extracted_datasizes); // the calloc_n zeroed everything, which is what we want
546     MY_CALLOC_N(N, bl->bt_compare_funs);
547     for (int i=0; i<N; i++) bl->bt_compare_funs[i] = bt_compare_functions[i];
548 
549     MY_CALLOC_N(N, bl->fractal_queues);
550     for (int i=0; i<N; i++) bl->fractal_queues[i]=NULL;
551     MY_CALLOC_N(N, bl->fractal_threads);
552     MY_CALLOC_N(N, bl->fractal_threads_live);
553     for (int i=0; i<N; i++) bl->fractal_threads_live[i] = false;
554 
555     {
556         int r = ft_loader_init_file_infos(&bl->file_infos);
557         if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
558     }
559 
560     SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template);
561 
562     bl->n_rows   = 0;
563     bl->progress = 0;
564     bl->progress_callback_result = 0;
565 
566     MY_CALLOC_N(N, bl->rows);
567     MY_CALLOC_N(N, bl->fs);
568     MY_CALLOC_N(N, bl->last_key);
569     for(int i=0;i<N;i++) {
570         {
571             int r = init_rowset(&bl->rows[i], memory_per_rowset_during_extract(bl));
572             if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
573         }
574         init_merge_fileset(&bl->fs[i]);
575         bl->last_key[i].flags = DB_DBT_REALLOC; // don't really need this, but it's nice to maintain it.  We use ulen to keep track of the realloced space.
576     }
577 
578     {
579         int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
580         if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
581     }
582     {   int r = toku_queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH);
583         if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
584     }
585     {
586         ft_loader_lock_init(bl);
587     }
588 
589     *blp = bl;
590 
591     return 0;
592 }
593 
toku_ft_loader_open(FTLOADER * blp,CACHETABLE cachetable,generate_row_for_put_func g,DB * src_db,int N,FT_HANDLE fts[],DB * dbs[],const char * new_fnames_in_env[],ft_compare_func bt_compare_functions[],const char * temp_file_template,LSN load_lsn,TOKUTXN txn,bool reserve_memory,uint64_t reserve_memory_size,bool compress_intermediates,bool allow_puts)594 int toku_ft_loader_open (FTLOADER *blp, /* out */
595                           CACHETABLE cachetable,
596                           generate_row_for_put_func g,
597                           DB *src_db,
598                           int N, FT_HANDLE fts[/*N*/], DB* dbs[/*N*/],
599                           const char *new_fnames_in_env[/*N*/],
600                           ft_compare_func bt_compare_functions[/*N*/],
601                           const char *temp_file_template,
602                           LSN load_lsn,
603                           TOKUTXN txn,
604                           bool reserve_memory,
605                           uint64_t reserve_memory_size,
606                           bool compress_intermediates,
607                           bool allow_puts) {
608 // Effect: called by DB_ENV->create_loader to create an ft loader.
609 // Arguments:
610 //   blp                  Return a ft loader ("bulk loader") here.
611 //   g                    The function for generating a row
612 //   src_db               The source database.  Needed by g.  May be NULL if that's ok with g.
613 //   N                    The number of dbs to create.
614 //   dbs                  An array of open databases.  Used by g.  The data will be put in these database.
615 //   new_fnames           The file names (these strings are owned by the caller: we make a copy for our own purposes).
616 //   temp_file_template   A template suitable for mkstemp()
617 //   reserve_memory       Cause the loader to reserve memory for its use from the cache table.
618 //   compress_intermediates  Cause the loader to compress intermediate loader files.
619 //   allow_puts           Prepare the loader for rows to insert.  When puts are disabled, the loader does not run the
620 //                        extractor or the fractal tree writer threads.
621 // Return value: 0 on success, an error number otherwise.
622     int result = 0;
623     {
624         int r = toku_ft_loader_internal_init(blp, cachetable, g, src_db,
625                                               N, fts, dbs,
626                                               new_fnames_in_env,
627                                               bt_compare_functions,
628                                               temp_file_template,
629                                               load_lsn,
630                                               txn,
631                                               reserve_memory,
632                                               reserve_memory_size,
633                                               compress_intermediates,
634                                               allow_puts);
635         if (r!=0) result = r;
636     }
637     if (result == 0 && allow_puts) {
638         FTLOADER bl = *blp;
639         int r = toku_pthread_create(*extractor_thread_key,
640                                     &bl->extractor_thread,
641                                     nullptr,
642                                     extractor_thread,
643                                     static_cast<void *>(bl));
644         if (r == 0) {
645             bl->extractor_live = true;
646         } else {
647             result = r;
648             (void) toku_ft_loader_internal_destroy(bl, true);
649         }
650     }
651     return result;
652 }
653 
ft_loader_set_panic(FTLOADER bl,int error,bool callback,int which_db,DBT * key,DBT * val)654 static void ft_loader_set_panic(FTLOADER bl, int error, bool callback, int which_db, DBT *key, DBT *val) {
655     DB *db = nullptr;
656     if (bl && bl->dbs && which_db >= 0 && which_db < bl->N) {
657         db = bl->dbs[which_db];
658     }
659     int r = ft_loader_set_error(&bl->error_callback, error, db, which_db, key, val);
660     if (r == 0 && callback)
661         ft_loader_call_error_function(&bl->error_callback);
662 }
663 
664 // One of the tests uses this.
toku_bl_fidx2file(FTLOADER bl,FIDX i)665 TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i) {
666     toku_mutex_lock(&bl->file_infos.lock);
667     invariant(i.idx >= 0 && i.idx < bl->file_infos.n_files);
668     invariant(bl->file_infos.file_infos[i.idx].is_open);
669     TOKU_FILE *result = bl->file_infos.file_infos[i.idx].file;
670     toku_mutex_unlock(&bl->file_infos.lock);
671     return result;
672 }
673 
bl_finish_compressed_write(TOKU_FILE * stream,struct wbuf * wb)674 static int bl_finish_compressed_write(TOKU_FILE *stream, struct wbuf *wb) {
675     int r = 0;
676     char *compressed_buf = NULL;
677     const size_t data_size = wb->ndone;
678     invariant(data_size > 0);
679     invariant(data_size <= MAX_UNCOMPRESSED_BUF);
680 
681     int n_sub_blocks = 0;
682     int sub_block_size = 0;
683 
684     r = choose_sub_block_size(wb->ndone, max_sub_blocks, &sub_block_size, &n_sub_blocks);
685     invariant(r==0);
686     invariant(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
687     invariant(sub_block_size > 0);
688 
689     struct sub_block sub_block[max_sub_blocks];
690     // set the initial sub block size for all of the sub blocks
691     for (int i = 0; i < n_sub_blocks; i++) {
692         sub_block_init(&sub_block[i]);
693     }
694     set_all_sub_block_sizes(data_size, sub_block_size, n_sub_blocks, sub_block);
695 
696     size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, TOKU_DEFAULT_COMPRESSION_METHOD);
697     const size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
698     const size_t other_overhead = sizeof(uint32_t); //total_size
699     const size_t header_len = sub_block_header_len + other_overhead;
700     MALLOC_N(header_len + compressed_len, compressed_buf);
701     if (compressed_buf == nullptr) {
702         return ENOMEM;
703     }
704 
705     // compress all of the sub blocks
706     char *uncompressed_ptr = (char*)wb->buf;
707     char *compressed_ptr = compressed_buf + header_len;
708     compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr,
709                                              get_num_cores(), get_ft_pool(), TOKU_DEFAULT_COMPRESSION_METHOD);
710 
711     //total_size does NOT include itself
712     uint32_t total_size = compressed_len + sub_block_header_len;
713     // serialize the sub block header
714     uint32_t *ptr = (uint32_t *)(compressed_buf);
715     *ptr++ = toku_htod32(total_size);
716     *ptr++ = toku_htod32(n_sub_blocks);
717     for (int i=0; i<n_sub_blocks; i++) {
718         ptr[0] = toku_htod32(sub_block[i].compressed_size);
719         ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
720         ptr[2] = toku_htod32(sub_block[i].xsum);
721         ptr += 3;
722     }
723     // Mark as written
724     wb->ndone = 0;
725 
726     size_t size_to_write = total_size + 4;  // Includes writing total_size
727 
728     r = toku_os_fwrite(compressed_buf, 1, size_to_write, stream);
729 
730     if (compressed_buf) {
731         toku_free(compressed_buf);
732     }
733     return r;
734 }
735 
bl_compressed_write(void * ptr,size_t nbytes,TOKU_FILE * stream,struct wbuf * wb)736 static int bl_compressed_write(void *ptr,
737                                size_t nbytes,
738                                TOKU_FILE *stream,
739                                struct wbuf *wb) {
740     invariant(wb->size <= MAX_UNCOMPRESSED_BUF);
741     size_t bytes_left = nbytes;
742     char *buf = (char *)ptr;
743 
744     while (bytes_left > 0) {
745         size_t bytes_to_copy = bytes_left;
746         if (wb->ndone + bytes_to_copy > wb->size) {
747             bytes_to_copy = wb->size - wb->ndone;
748         }
749         wbuf_nocrc_literal_bytes(wb, buf, bytes_to_copy);
750         if (wb->ndone == wb->size) {
751             //Compress, write to disk, and empty out wb
752             int r = bl_finish_compressed_write(stream, wb);
753             if (r != 0) {
754                 errno = r;
755                 return -1;
756             }
757             wb->ndone = 0;
758         }
759         bytes_left -= bytes_to_copy;
760         buf += bytes_to_copy;
761     }
762     return 0;
763 }
764 
bl_fwrite(void * ptr,size_t size,size_t nmemb,TOKU_FILE * stream,struct wbuf * wb,FTLOADER bl)765 static int bl_fwrite(void *ptr,
766                      size_t size,
767                      size_t nmemb,
768                      TOKU_FILE *stream,
769                      struct wbuf *wb,
770                      FTLOADER bl)
771 /* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise
772  * returns an error number.
773  * Arguments:
774  *   ptr    the data to be writen.
775  *   size   the amount of data to be written.
776  *   nmemb  the number of units of size to be written.
777  *   stream write the data here.
778  *   wb     where to write uncompressed data (if we're compressing) or ignore if
779  * NULL
780  *   bl     passed so we can panic the ft_loader if something goes wrong
781  * (recording the error number).
782  * Return value: 0 on success, an error number otherwise.
783  */
784 {
785     if (!bl->compress_intermediates || !wb) {
786         return toku_os_fwrite(ptr, size, nmemb, stream);
787     } else {
788         size_t num_bytes = size * nmemb;
789         int r = bl_compressed_write(ptr, num_bytes, stream, wb);
790         if (r != 0) {
791             return r;
792         }
793     }
794     return 0;
795 }
796 
bl_fread(void * ptr,size_t size,size_t nmemb,TOKU_FILE * stream)797 static int bl_fread(void *ptr, size_t size, size_t nmemb, TOKU_FILE *stream)
798 /* Effect: this is a wrapper for fread that returns 0 on success, otherwise
799  * returns an error number.
800  * Arguments:
801  *  ptr      read data into here.
802  *  size     size of data element to be read.
803  *  nmemb    number of data elements to be read.
804  *  stream   where to read the data from.
805  * Return value: 0 on success, an error number otherwise.
806  */
807 {
808     return toku_os_fread(ptr, size, nmemb, stream);
809 }
810 
bl_write_dbt(DBT * dbt,TOKU_FILE * datafile,uint64_t * dataoff,struct wbuf * wb,FTLOADER bl)811 static int bl_write_dbt(DBT *dbt,
812                         TOKU_FILE *datafile,
813                         uint64_t *dataoff,
814                         struct wbuf *wb,
815                         FTLOADER bl) {
816     int r;
817     int dlen = dbt->size;
818     if ((r=bl_fwrite(&dlen,     sizeof(dlen), 1,    datafile, wb, bl))) return r;
819     if ((r=bl_fwrite(dbt->data, 1,            dlen, datafile, wb, bl))) return r;
820     if (dataoff)
821         *dataoff += dlen + sizeof(dlen);
822     return 0;
823 }
824 
bl_read_dbt(DBT * dbt,TOKU_FILE * stream)825 static int bl_read_dbt(/*in*/ DBT *dbt, TOKU_FILE *stream) {
826     int len;
827     {
828         int r;
829         if ((r = bl_fread(&len, sizeof(len), 1, stream))) return r;
830         invariant(len>=0);
831     }
832     if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); }
833     {
834         int r;
835         if ((r = bl_fread(dbt->data, 1, len, stream)))     return r;
836     }
837     dbt->size = len;
838     return 0;
839 }
840 
bl_read_dbt_from_dbufio(DBT * dbt,DBUFIO_FILESET bfs,int filenum)841 static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int filenum)
842 {
843     int result = 0;
844     uint32_t len;
845     {
846         size_t n_read;
847         int r = dbufio_fileset_read(bfs, filenum, &len, sizeof(len), &n_read);
848         if (r!=0) {
849             result = r;
850         } else if (n_read<sizeof(len)) {
851             result = TOKUDB_NO_DATA; // must have run out of data prematurely.  This is not EOF, it's a real error.
852         }
853     }
854     if (result==0) {
855         if (dbt->ulen<len) {
856             void * data = toku_realloc(dbt->data, len);
857             if (data==NULL) {
858                 result = get_error_errno();
859             } else {
860                 dbt->ulen=len;
861                 dbt->data=data;
862             }
863         }
864     }
865     if (result==0) {
866         size_t n_read;
867         int r = dbufio_fileset_read(bfs, filenum, dbt->data, len, &n_read);
868         if (r!=0) {
869             result = r;
870         } else if (n_read<len) {
871             result = TOKUDB_NO_DATA; // must have run out of data prematurely.  This is not EOF, it's a real error.
872         } else {
873             dbt->size = len;
874         }
875     }
876     return result;
877 }
878 
loader_write_row(DBT * key,DBT * val,FIDX data,TOKU_FILE * dataf,uint64_t * dataoff,struct wbuf * wb,FTLOADER bl)879 int loader_write_row(DBT *key,
880                      DBT *val,
881                      FIDX data,
882                      TOKU_FILE *dataf,
883                      uint64_t *dataoff,
884                      struct wbuf *wb,
885                      FTLOADER bl)
886 /* Effect: Given a key and a val (both DBTs), write them to a file.  Increment
887  * *dataoff so that it's up to date.
888  * Arguments:
889  *   key, val   write these.
890  *   data       the file to write them to
891  *   dataoff    a pointer to a counter that keeps track of the amount of data
892  * written so far.
893  *   wb         a pointer (possibly NULL) to buffer uncompressed output
894  *   bl         the ft_loader (passed so we can panic if needed).
895  * Return value: 0 on success, an error number otherwise.
896  */
897 {
898     //int klen = key->size;
899     //int vlen = val->size;
900     int r;
901     // we have a chance to handle the errors because when we close we can delete all the files.
902     if ((r=bl_write_dbt(key, dataf, dataoff, wb, bl))) return r;
903     if ((r=bl_write_dbt(val, dataf, dataoff, wb, bl))) return r;
904     toku_mutex_lock(&bl->file_infos.lock);
905     bl->file_infos.file_infos[data.idx].n_rows++;
906     toku_mutex_unlock(&bl->file_infos.lock);
907     return 0;
908 }
909 
loader_read_row(TOKU_FILE * f,DBT * key,DBT * val)910 int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val)
911 /* Effect: Read a key value pair from a file.  The DBTs must have DB_DBT_REALLOC
912  * set.
913  * Arguments:
914  *    f         where to read it from.
915  *    key, val  read it into these.
916  *    bl        passed so we can panic if needed.
917  * Return value: 0 on success, an error number otherwise.
918  * Requires:   The DBTs must have DB_DBT_REALLOC
919  */
920 {
921     {
922         int r = bl_read_dbt(key, f);
923         if (r!=0) return r;
924     }
925     {
926         int r = bl_read_dbt(val, f);
927         if (r!=0) return r;
928     }
929     return 0;
930 }
931 
loader_read_row_from_dbufio(DBUFIO_FILESET bfs,int filenum,DBT * key,DBT * val)932 static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *key, DBT *val)
933 /* Effect: Read a key value pair from a file.  The DBTs must have DB_DBT_REALLOC set.
934  * Arguments:
935  *    f         where to read it from.
936  *    key, val  read it into these.
937  *    bl        passed so we can panic if needed.
938  * Return value: 0 on success, an error number otherwise.
939  * Requires:   The DBTs must have DB_DBT_REALLOC
940  */
941 {
942     {
943         int r = bl_read_dbt_from_dbufio(key, bfs, filenum);
944         if (r!=0) return r;
945     }
946     {
947         int r = bl_read_dbt_from_dbufio(val, bfs, filenum);
948         if (r!=0) return r;
949     }
950     return 0;
951 }
952 
953 
init_rowset(struct rowset * rows,uint64_t memory_budget)954 int init_rowset (struct rowset *rows, uint64_t memory_budget)
955 /* Effect: Initialize a collection of rows to be empty. */
956 {
957     int result = 0;
958 
959     rows->memory_budget = memory_budget;
960 
961     rows->rows = NULL;
962     rows->data = NULL;
963 
964     rows->n_rows = 0;
965     rows->n_rows_limit = 100;
966     MALLOC_N(rows->n_rows_limit, rows->rows);
967     if (rows->rows == NULL)
968         result = get_error_errno();
969     rows->n_bytes = 0;
970     rows->n_bytes_limit = (size_factor==1) ? 1024*size_factor*16 : memory_budget;
971     //printf("%s:%d n_bytes_limit=%ld (size_factor based limit=%d)\n", __FILE__, __LINE__, rows->n_bytes_limit, 1024*size_factor*16);
972     rows->data = (char *) toku_malloc(rows->n_bytes_limit);
973     if (rows->rows==NULL || rows->data==NULL) {
974         if (result == 0)
975             result = get_error_errno();
976         toku_free(rows->rows);
977         toku_free(rows->data);
978         rows->rows = NULL;
979         rows->data = NULL;
980     }
981     return result;
982 }
983 
zero_rowset(struct rowset * rows)984 static void zero_rowset (struct rowset *rows) {
985     memset(rows, 0, sizeof(*rows));
986 }
987 
destroy_rowset(struct rowset * rows)988 void destroy_rowset (struct rowset *rows) {
989     if ( rows ) {
990         toku_free(rows->data);
991         toku_free(rows->rows);
992         zero_rowset(rows);
993     }
994 }
995 
row_wont_fit(struct rowset * rows,size_t size)996 static int row_wont_fit (struct rowset *rows, size_t size)
997 /* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */
998 {
999     // Account for the memory used by the data and also the row structures.
1000     size_t memory_in_use = (rows->n_rows*sizeof(struct row)
1001                             + rows->n_bytes);
1002     return (rows->memory_budget <  memory_in_use + size);
1003 }
1004 
add_row(struct rowset * rows,DBT * key,DBT * val)1005 int add_row (struct rowset *rows, DBT *key, DBT *val)
1006 /* Effect: add a row to a collection. */
1007 {
1008     int result = 0;
1009     if (rows->n_rows >= rows->n_rows_limit) {
1010         struct row *old_rows = rows->rows;
1011         size_t old_n_rows_limit = rows->n_rows_limit;
1012         rows->n_rows_limit *= 2;
1013         REALLOC_N(rows->n_rows_limit, rows->rows);
1014         if (rows->rows == NULL) {
1015             result = get_error_errno();
1016             rows->rows = old_rows;
1017             rows->n_rows_limit = old_n_rows_limit;
1018             return result;
1019         }
1020     }
1021     size_t off      = rows->n_bytes;
1022     size_t next_off = off + key->size + val->size;
1023 
1024     struct row newrow;
1025     memset(&newrow, 0, sizeof newrow); newrow.off = off; newrow.klen = key->size; newrow.vlen = val->size;
1026 
1027     rows->rows[rows->n_rows++] = newrow;
1028     if (next_off > rows->n_bytes_limit) {
1029         size_t old_n_bytes_limit = rows->n_bytes_limit;
1030         while (next_off > rows->n_bytes_limit) {
1031             rows->n_bytes_limit = rows->n_bytes_limit*2;
1032         }
1033         invariant(next_off <= rows->n_bytes_limit);
1034         char *old_data = rows->data;
1035         REALLOC_N(rows->n_bytes_limit, rows->data);
1036         if (rows->data == NULL) {
1037             result = get_error_errno();
1038             rows->data = old_data;
1039             rows->n_bytes_limit = old_n_bytes_limit;
1040             return result;
1041         }
1042     }
1043     memcpy(rows->data+off,           key->data, key->size);
1044     memcpy(rows->data+off+key->size, val->data, val->size);
1045     rows->n_bytes = next_off;
1046     return result;
1047 }
1048 
1049 static int process_primary_rows (FTLOADER bl, struct rowset *primary_rowset);
1050 
finish_primary_rows_internal(FTLOADER bl)1051 static int finish_primary_rows_internal (FTLOADER bl)
1052 // now we have been asked to finish up.
1053 // Be sure to destroy the rowsets.
1054 {
1055     int *MALLOC_N(bl->N, ra);
1056     if (ra==NULL) return get_error_errno();
1057 
1058     for (int i = 0; i < bl->N; i++) {
1059         //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
1060         ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]);
1061         zero_rowset(&bl->rows[i]);
1062     }
1063 
1064     // accept any of the error codes (in this case, the last one).
1065     int r = 0;
1066     for (int i = 0; i < bl->N; i++)
1067         if (ra[i] != 0)
1068             r = ra[i];
1069 
1070     toku_free(ra);
1071     return r;
1072 }
1073 
finish_primary_rows(FTLOADER bl)1074 static int finish_primary_rows (FTLOADER bl) {
1075     return           finish_primary_rows_internal (bl);
1076 }
1077 
extractor_thread(void * blv)1078 static void* extractor_thread (void *blv) {
1079     FTLOADER bl = (FTLOADER)blv;
1080     int r = 0;
1081     while (1) {
1082         void *item = nullptr;
1083         {
1084             int rq = toku_queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
1085             if (rq==EOF) break;
1086             invariant(rq==0); // other errors are arbitrarily bad.
1087         }
1088         struct rowset *primary_rowset = (struct rowset *)item;
1089 
1090         //printf("%s:%d extractor got %ld rows\n", __FILE__, __LINE__, primary_rowset.n_rows);
1091 
1092         // Now we have some rows to output
1093         {
1094             r = process_primary_rows(bl, primary_rowset);
1095             if (r)
1096                 ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
1097         }
1098     }
1099 
1100     //printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
1101     if (r == 0) {
1102         r = finish_primary_rows(bl);
1103         if (r)
1104             ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
1105     }
1106     toku_instr_delete_current_thread();
1107     return nullptr;
1108 }
1109 
enqueue_for_extraction(FTLOADER bl)1110 static void enqueue_for_extraction(FTLOADER bl) {
1111     //printf("%s:%d enqueing %ld items\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
1112     struct rowset *XMALLOC(enqueue_me);
1113     *enqueue_me = bl->primary_rowset;
1114     zero_rowset(&bl->primary_rowset);
1115     int r = toku_queue_enq(bl->primary_rowset_queue, (void*)enqueue_me, 1, NULL);
1116     resource_assert_zero(r);
1117 }
1118 
loader_do_put(FTLOADER bl,DBT * pkey,DBT * pval)1119 static int loader_do_put(FTLOADER bl,
1120                          DBT *pkey,
1121                          DBT *pval)
1122 {
1123     int result;
1124     result = add_row(&bl->primary_rowset, pkey, pval);
1125     if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) {
1126         // queue the rows for further processing by the extractor thread.
1127         //printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
1128         enqueue_for_extraction(bl);
1129         {
1130             int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
1131             // bl->primary_rowset will get destroyed by toku_ft_loader_abort
1132             if (r != 0)
1133                 result = r;
1134         }
1135     }
1136     return result;
1137 }
1138 
1139 static int
finish_extractor(FTLOADER bl)1140 finish_extractor (FTLOADER bl) {
1141     //printf("%s:%d now finishing extraction\n", __FILE__, __LINE__);
1142 
1143     int rval;
1144 
1145     if (bl->primary_rowset.n_rows>0) {
1146         enqueue_for_extraction(bl);
1147     } else {
1148         destroy_rowset(&bl->primary_rowset);
1149     }
1150     //printf("%s:%d please finish extraction\n", __FILE__, __LINE__);
1151     {
1152         int r = toku_queue_eof(bl->primary_rowset_queue);
1153         invariant(r==0);
1154     }
1155     //printf("%s:%d joining\n", __FILE__, __LINE__);
1156     {
1157         void *toku_pthread_retval;
1158         int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
1159         resource_assert_zero(r);
1160         invariant(toku_pthread_retval == NULL);
1161         bl->extractor_live = false;
1162     }
1163     {
1164         int r = toku_queue_destroy(bl->primary_rowset_queue);
1165         invariant(r==0);
1166         bl->primary_rowset_queue = nullptr;
1167     }
1168 
1169     rval = ft_loader_fi_close_all(&bl->file_infos);
1170 
1171    //printf("%s:%d joined\n", __FILE__, __LINE__);
1172     return rval;
1173 }
1174 
1175 static const DBT zero_dbt = {0,0,0,0};
1176 
make_dbt(void * data,uint32_t size)1177 static DBT make_dbt (void *data, uint32_t size) {
1178     DBT result = zero_dbt;
1179     result.data = data;
1180     result.size = size;
1181     return result;
1182 }
1183 
1184 #define inc_error_count() error_count++
1185 
leafentry_xid(FTLOADER bl,int which_db)1186 static TXNID leafentry_xid(FTLOADER bl, int which_db) {
1187     TXNID le_xid = TXNID_NONE;
1188     if (bl->root_xids_that_created && bl->load_root_xid != bl->root_xids_that_created[which_db])
1189         le_xid = bl->load_root_xid;
1190     return le_xid;
1191 }
1192 
ft_loader_leafentry_size(size_t key_size,size_t val_size,TXNID xid)1193 size_t ft_loader_leafentry_size(size_t key_size, size_t val_size, TXNID xid) {
1194     size_t s = 0;
1195     if (xid == TXNID_NONE)
1196         s = LE_CLEAN_MEMSIZE(val_size) + key_size + sizeof(uint32_t);
1197     else
1198         s = LE_MVCC_COMMITTED_MEMSIZE(val_size) + key_size + sizeof(uint32_t);
1199     return s;
1200 }
1201 
process_primary_rows_internal(FTLOADER bl,struct rowset * primary_rowset)1202 static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_rowset)
1203 // process the rows in primary_rowset, and then destroy the rowset.
1204 // if FLUSH is true then write all the buffered rows out.
1205 // if primary_rowset is NULL then treat it as empty.
1206 {
1207     int error_count = 0;
1208     int *XMALLOC_N(bl->N, error_codes);
1209 
1210     // If we parallelize the first for loop, dest_keys/dest_vals init&cleanup need to move inside
1211     DBT_ARRAY dest_keys;
1212     DBT_ARRAY dest_vals;
1213     toku_dbt_array_init(&dest_keys, 1);
1214     toku_dbt_array_init(&dest_vals, 1);
1215 
1216     for (int i = 0; i < bl->N; i++) {
1217         unsigned int klimit,vlimit; // maximum row sizes.
1218         toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit);
1219 
1220         error_codes[i] = 0;
1221         struct rowset *rows = &(bl->rows[i]);
1222         struct merge_fileset *fs = &(bl->fs[i]);
1223         ft_compare_func compare = bl->bt_compare_funs[i];
1224 
1225         // Don't parallelize this loop, or we have to lock access to add_row() which would be a lot of overehad.
1226         // Also this way we can reuse the DB_DBT_REALLOC'd values inside dest_keys/dest_vals without a race.
1227         for (size_t prownum=0; prownum<primary_rowset->n_rows; prownum++) {
1228             if (error_count) break;
1229 
1230             struct row *prow = &primary_rowset->rows[prownum];
1231             DBT pkey = zero_dbt;
1232             DBT pval = zero_dbt;
1233             pkey.data = primary_rowset->data + prow->off;
1234             pkey.size = prow->klen;
1235             pval.data = primary_rowset->data + prow->off + prow->klen;
1236             pval.size = prow->vlen;
1237 
1238 
1239             DBT_ARRAY key_array;
1240             DBT_ARRAY val_array;
1241             if (bl->dbs[i] != bl->src_db) {
1242                 int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &dest_keys, &dest_vals, &pkey, &pval);
1243                 if (r != 0) {
1244                     error_codes[i] = r;
1245                     inc_error_count();
1246                     break;
1247                 }
1248                 paranoid_invariant(dest_keys.size <= dest_keys.capacity);
1249                 paranoid_invariant(dest_vals.size <= dest_vals.capacity);
1250                 paranoid_invariant(dest_keys.size == dest_vals.size);
1251 
1252                 key_array = dest_keys;
1253                 val_array = dest_vals;
1254             } else {
1255                 key_array.size = key_array.capacity = 1;
1256                 key_array.dbts = &pkey;
1257 
1258                 val_array.size = val_array.capacity = 1;
1259                 val_array.dbts = &pval;
1260             }
1261             for (uint32_t row = 0; row < key_array.size; row++) {
1262                 DBT *dest_key = &key_array.dbts[row];
1263                 DBT *dest_val = &val_array.dbts[row];
1264                 if (dest_key->size > klimit) {
1265                     error_codes[i] = EINVAL;
1266                     fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", dest_key->size, klimit);
1267                     inc_error_count();
1268                     break;
1269                 }
1270                 if (dest_val->size > vlimit) {
1271                     error_codes[i] = EINVAL;
1272                     fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", dest_val->size, vlimit);
1273                     inc_error_count();
1274                     break;
1275                 }
1276 
1277                 bl->extracted_datasizes[i] += ft_loader_leafentry_size(dest_key->size, dest_val->size, leafentry_xid(bl, i));
1278 
1279                 if (row_wont_fit(rows, dest_key->size + dest_val->size)) {
1280                     //printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
1281                     int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows.  If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
1282                     // If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
1283                     init_rowset(rows, memory_per_rowset_during_extract(bl)); // we passed the contents of rows to sort_and_write_rows.
1284                     if (r != 0) {
1285                         error_codes[i] = r;
1286                         inc_error_count();
1287                         break;
1288                     }
1289                 }
1290                 int r = add_row(rows, dest_key, dest_val);
1291                 if (r != 0) {
1292                     error_codes[i] = r;
1293                     inc_error_count();
1294                     break;
1295                 }
1296             }
1297         }
1298     }
1299     toku_dbt_array_destroy(&dest_keys);
1300     toku_dbt_array_destroy(&dest_vals);
1301 
1302     destroy_rowset(primary_rowset);
1303     toku_free(primary_rowset);
1304     int r = 0;
1305     if (error_count > 0) {
1306         for (int i=0; i<bl->N; i++) {
1307             if (error_codes[i]) {
1308                 r = error_codes[i];
1309                 ft_loader_set_panic(bl, r, false, i, nullptr, nullptr);
1310             }
1311         }
1312         invariant(r); // found the error
1313     }
1314     toku_free(error_codes);
1315     return r;
1316 }
1317 
process_primary_rows(FTLOADER bl,struct rowset * primary_rowset)1318 static int process_primary_rows (FTLOADER bl, struct rowset *primary_rowset) {
1319     int r = process_primary_rows_internal (bl, primary_rowset);
1320     return r;
1321 }
1322 
toku_ft_loader_put(FTLOADER bl,DBT * key,DBT * val)1323 int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val)
1324 /* Effect: Put a key-value pair into the ft loader.  Called by DB_LOADER->put().
1325  * Return value: 0 on success, an error number otherwise.
1326  */
1327 {
1328     if (!bl->allow_puts || ft_loader_get_error(&bl->error_callback))
1329         return EINVAL; // previous panic
1330     bl->n_rows++;
1331     return loader_do_put(bl, key, val);
1332 }
1333 
toku_ft_loader_set_n_rows(FTLOADER bl,uint64_t n_rows)1334 void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows) {
1335     bl->n_rows = n_rows;
1336 }
1337 
toku_ft_loader_get_n_rows(FTLOADER bl)1338 uint64_t toku_ft_loader_get_n_rows(FTLOADER bl) {
1339     return bl->n_rows;
1340 }
1341 
merge_row_arrays_base(struct row dest[],struct row a[],int an,struct row b[],int bn,int which_db,DB * dest_db,ft_compare_func compare,FTLOADER bl,struct rowset * rowset)1342 int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
1343                            int which_db, DB *dest_db, ft_compare_func compare,
1344 
1345                            FTLOADER bl,
1346                            struct rowset *rowset)
1347 /* Effect: Given two arrays of rows, a and b, merge them using the comparison function, and write them into dest.
1348  *   This function is suitable for use in a mergesort.
1349  *   If a pair of duplicate keys is ever noticed, then call the error_callback function (if it exists), and return DB_KEYEXIST.
1350  * Arguments:
1351  *   dest    write the rows here
1352  *   a,b     the rows being merged
1353  *   an,bn   the length of a and b respectively.
1354  *   dest_db We need the dest_db to run the comparison function.
1355  *   compare We need the compare function for the dest_db.
1356  */
1357 {
1358     while (an>0 && bn>0) {
1359         DBT akey; memset(&akey, 0, sizeof akey); akey.data=rowset->data+a->off; akey.size=a->klen;
1360         DBT bkey; memset(&bkey, 0, sizeof bkey); bkey.data=rowset->data+b->off; bkey.size=b->klen;
1361 
1362         int compare_result = compare(dest_db, &akey, &bkey);
1363         if (compare_result==0) {
1364             if (bl->error_callback.error_callback) {
1365                 DBT aval; memset(&aval, 0, sizeof aval); aval.data=rowset->data + a->off + a->klen; aval.size = a->vlen;
1366                 ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
1367             }
1368             return DB_KEYEXIST;
1369         } else if (compare_result<0) {
1370             // a is smaller
1371             *dest = *a;
1372             dest++; a++; an--;
1373         } else {
1374             *dest = *b;
1375             dest++; b++; bn--;
1376         }
1377     }
1378     while (an>0) {
1379         *dest = *a;
1380         dest++; a++; an--;
1381     }
1382     while (bn>0) {
1383         *dest = *b;
1384         dest++; b++; bn--;
1385     }
1386     return 0;
1387 }
1388 
binary_search(int * location,const DBT * key,struct row a[],int an,int abefore,int which_db,DB * dest_db,ft_compare_func compare,FTLOADER bl,struct rowset * rowset)1389 static int binary_search (int *location,
1390                           const DBT *key,
1391                           struct row a[/*an*/], int an,
1392                           int abefore,
1393                           int which_db, DB *dest_db, ft_compare_func compare,
1394                           FTLOADER bl,
1395                           struct rowset *rowset)
1396 // Given a sorted array of rows a, and a dbt key, find the first row in a that is > key.
1397 // If no such row exists, then consider the result to be equal to an.
1398 // On success store abefore+the index into *location
1399 // Return 0 on success.
1400 // Return DB_KEYEXIST if we find a row that is equal to key.
1401 {
1402     if (an==0) {
1403         *location = abefore;
1404         return 0;
1405     } else {
1406         int a2 = an/2;
1407         DBT akey = make_dbt(rowset->data+a[a2].off,  a[a2].klen);
1408         int compare_result = compare(dest_db, key, &akey);
1409         if (compare_result==0) {
1410             if (bl->error_callback.error_callback) {
1411                 DBT aval = make_dbt(rowset->data + a[a2].off + a[a2].klen,  a[a2].vlen);
1412                 ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
1413             }
1414             return DB_KEYEXIST;
1415         } else if (compare_result<0) {
1416             // key is before a2
1417             if (an==1) {
1418                 *location = abefore;
1419                 return 0;
1420             } else {
1421                 return binary_search(location, key,
1422                                      a,    a2,
1423                                      abefore,
1424                                      which_db, dest_db, compare, bl, rowset);
1425             }
1426         } else {
1427             // key is after a2
1428             if (an==1) {
1429                 *location = abefore + 1;
1430                 return 0;
1431             } else {
1432                 return binary_search(location, key,
1433                                      a+a2, an-a2,
1434                                      abefore+a2,
1435                                      which_db, dest_db, compare, bl, rowset);
1436             }
1437         }
1438     }
1439 }
1440 
1441 
1442 #define SWAP(typ,x,y) { typ tmp = x; x=y; y=tmp; }
1443 
merge_row_arrays(struct row dest[],struct row a[],int an,struct row b[],int bn,int which_db,DB * dest_db,ft_compare_func compare,FTLOADER bl,struct rowset * rowset)1444 static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
1445                              int which_db, DB *dest_db, ft_compare_func compare,
1446                              FTLOADER bl,
1447                              struct rowset *rowset)
1448 /* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest.
1449  * Arguments:
1450  *   dest    write the rows here
1451  *   a,b     the rows being merged
1452  *   an,bn   the length of a and b respectively.
1453  *   dest_db We need the dest_db to run the comparison function.
1454  *   compare We need the compare function for the dest_db.
1455  */
1456 {
1457     if (an + bn < 10000) {
1458         return merge_row_arrays_base(dest, a, an, b, bn, which_db, dest_db, compare, bl, rowset);
1459     }
1460     if (an < bn) {
1461         SWAP(struct row *,a, b)
1462         SWAP(int         ,an,bn)
1463     }
1464     // an >= bn
1465     int a2 = an/2;
1466     DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen);
1467     int b2 = 0; // initialize to zero so we can add the answer in.
1468     {
1469         int r = binary_search(&b2, &akey, b, bn, 0, which_db, dest_db, compare, bl, rowset);
1470         if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code.
1471     }
1472     int ra, rb;
1473     ra = merge_row_arrays(dest,       a,    a2,    b,    b2,    which_db, dest_db, compare, bl, rowset);
1474     rb = merge_row_arrays(dest+a2+b2, a+a2, an-a2, b+b2, bn-b2, which_db, dest_db, compare, bl, rowset);
1475     if (ra!=0) return ra;
1476     else       return rb;
1477 }
1478 
mergesort_row_array(struct row rows[],int n,int which_db,DB * dest_db,ft_compare_func compare,FTLOADER bl,struct rowset * rowset)1479 int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func compare, FTLOADER bl, struct rowset *rowset)
1480 /* Sort an array of rows (using mergesort).
1481  * Arguments:
1482  *   rows   sort this array of rows.
1483  *   n      the length of the array.
1484  *   dest_db  used by the comparison function.
1485  *   compare  the compare function
1486  */
1487 {
1488     if (n<=1) return 0; // base case is sorted
1489     int mid = n/2;
1490     int r1, r2;
1491     r1 = mergesort_row_array (rows,     mid,   which_db, dest_db, compare, bl, rowset);
1492 
1493     // Don't spawn this one explicitly
1494     r2 =            mergesort_row_array (rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);
1495 
1496     if (r1!=0) return r1;
1497     if (r2!=0) return r2;
1498 
1499     struct row *MALLOC_N(n, tmp);
1500     if (tmp == NULL) return get_error_errno();
1501     {
1502         int r = merge_row_arrays(tmp, rows, mid, rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);
1503         if (r!=0) {
1504             toku_free(tmp);
1505             return r;
1506         }
1507     }
1508     memcpy(rows, tmp, sizeof(*tmp)*n);
1509     toku_free(tmp);
1510     return 0;
1511 }
1512 
1513 // C function for testing mergesort_row_array
ft_loader_mergesort_row_array(struct row rows[],int n,int which_db,DB * dest_db,ft_compare_func compare,FTLOADER bl,struct rowset * rowset)1514 int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func compare, FTLOADER bl, struct rowset *rowset) {
1515     return mergesort_row_array (rows, n, which_db, dest_db, compare, bl, rowset);
1516 }
1517 
sort_rows(struct rowset * rows,int which_db,DB * dest_db,ft_compare_func compare,FTLOADER bl)1518 static int sort_rows (struct rowset *rows, int which_db, DB *dest_db, ft_compare_func compare,
1519                       FTLOADER bl)
1520 /* Effect: Sort a collection of rows.
1521  * If any duplicates are found, then call the error_callback function and return non zero.
1522  * Otherwise return 0.
1523  * Arguments:
1524  *   rowset    the */
1525 {
1526     return mergesort_row_array(rows->rows, rows->n_rows, which_db, dest_db, compare, bl, rows);
1527 }
1528 
1529 /* filesets Maintain a collection of files.  Typically these files are each individually sorted, and we will merge them.
1530  * These files have two parts, one is for the data rows, and the other is a collection of offsets so we an more easily parallelize the manipulation (e.g., by allowing us to find the offset of the ith row quickly). */
1531 
init_merge_fileset(struct merge_fileset * fs)1532 void init_merge_fileset (struct merge_fileset *fs)
1533 /* Effect: Initialize a fileset */
1534 {
1535     fs->have_sorted_output = false;
1536     fs->sorted_output      = FIDX_NULL;
1537     fs->prev_key           = zero_dbt;
1538     fs->prev_key.flags     = DB_DBT_REALLOC;
1539 
1540     fs->n_temp_files = 0;
1541     fs->n_temp_files_limit = 0;
1542     fs->data_fidxs = NULL;
1543 }
1544 
destroy_merge_fileset(struct merge_fileset * fs)1545 void destroy_merge_fileset (struct merge_fileset *fs)
1546 /* Effect: Destroy a fileset. */
1547 {
1548     if ( fs ) {
1549         toku_destroy_dbt(&fs->prev_key);
1550         fs->n_temp_files = 0;
1551         fs->n_temp_files_limit = 0;
1552         toku_free(fs->data_fidxs);
1553         fs->data_fidxs = NULL;
1554     }
1555 }
1556 
1557 
extend_fileset(FTLOADER bl,struct merge_fileset * fs,FIDX * ffile)1558 static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile)
1559 /* Effect: Add two files (one for data and one for idx) to the fileset.
1560  * Arguments:
1561  *   bl   the ft_loader (needed to panic if anything goes wrong, and also to get the temp_file_template.
1562  *   fs   the fileset
1563  *   ffile  the data file (which will be open)
1564  *   fidx   the index file (which will be open)
1565  */
1566 {
1567     FIDX sfile;
1568     int r;
1569     r = ft_loader_open_temp_file(bl, &sfile); if (r!=0) return r;
1570 
1571     if (fs->n_temp_files+1 > fs->n_temp_files_limit) {
1572         fs->n_temp_files_limit = (fs->n_temp_files+1)*2;
1573         XREALLOC_N(fs->n_temp_files_limit, fs->data_fidxs);
1574     }
1575     fs->data_fidxs[fs->n_temp_files] = sfile;
1576     fs->n_temp_files++;
1577 
1578     *ffile = sfile;
1579     return 0;
1580 }
1581 
1582 // RFP maybe this should be buried in the ft_loader struct
1583 static toku_mutex_t update_progress_lock = TOKU_MUTEX_INITIALIZER;
1584 
update_progress(int N,FTLOADER bl,const char * UU (message))1585 static int update_progress (int N,
1586                             FTLOADER bl,
1587                             const char *UU(message))
1588 {
1589     // Must protect the increment and the call to the poll_function.
1590     toku_mutex_lock(&update_progress_lock);
1591     bl->progress+=N;
1592 
1593     int result;
1594     if (bl->progress_callback_result == 0) {
1595         //printf(" %20s: %d ", message, bl->progress);
1596         result = ft_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
1597         if (result!=0) {
1598             bl->progress_callback_result = result;
1599         }
1600     } else {
1601         result = bl->progress_callback_result;
1602     }
1603     toku_mutex_unlock(&update_progress_lock);
1604     return result;
1605 }
1606 
1607 
write_rowset_to_file(FTLOADER bl,FIDX sfile,const struct rowset rows)1608 static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) {
1609     int r = 0;
1610     // Allocate a buffer if we're compressing intermediates.
1611     char *uncompressed_buffer = nullptr;
1612     if (bl->compress_intermediates) {
1613         MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
1614         if (uncompressed_buffer == nullptr) {
1615             return ENOMEM;
1616         }
1617     }
1618     struct wbuf wb;
1619     wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
1620 
1621     TOKU_FILE *sstream = toku_bl_fidx2file(bl, sfile);
1622     for (size_t i = 0; i < rows.n_rows; i++) {
1623         DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
1624         DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen,
1625                             rows.rows[i].vlen);
1626 
1627         uint64_t soffset=0; // don't really need this.
1628         r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, &wb, bl);
1629         if (r != 0) {
1630             goto exit;
1631         }
1632     }
1633 
1634     if (bl->compress_intermediates && wb.ndone > 0) {
1635         r = bl_finish_compressed_write(sstream, &wb);
1636         if (r != 0) {
1637             goto exit;
1638         }
1639     }
1640     r = 0;
1641 exit:
1642     if (uncompressed_buffer) {
1643         toku_free(uncompressed_buffer);
1644     }
1645     return r;
1646 }
1647 
1648 
sort_and_write_rows(struct rowset rows,struct merge_fileset * fs,FTLOADER bl,int which_db,DB * dest_db,ft_compare_func compare)1649 int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare)
1650 /* Effect: Given a rowset, sort it and write it to a temporary file.
1651  * Note:  The loader maintains for each index the most recently written-to file, as well as the DBT for the last key written into that file.
1652  *   If this rowset is sorted and all greater than that dbt, then we append to the file (skipping the sort, and reducing the number of temporary files).
1653  * Arguments:
1654  *   rows    the rowset
1655  *   fs      the fileset into which the sorted data will be added
1656  *   bl      the ft_loader
1657  *   dest_db the DB, needed for the comparison function.
1658  *   compare The comparison function.
1659  * Returns 0 on success, otherwise an error number.
1660  * Destroy the rowset after finishing it.
1661  * Note: There is no sense in trying to calculate progress by this function since it's done concurrently with the loader->put operation.
1662  * Note first time called: invariant: fs->have_sorted_output == false
1663  */
1664 {
1665     //printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
1666 
1667     // TODO: erase the files, and deal with all the cleanup on error paths
1668     //printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
1669     //bl_time_t before_sort = bl_time_now();
1670 
1671     int result;
1672     if (rows.n_rows == 0) {
1673         result = 0;
1674     } else {
1675         result = sort_rows(&rows, which_db, dest_db, compare, bl);
1676 
1677         //bl_time_t after_sort = bl_time_now();
1678 
1679         if (result == 0) {
1680             DBT min_rowset_key = make_dbt(rows.data+rows.rows[0].off, rows.rows[0].klen);
1681             if (fs->have_sorted_output && compare(dest_db, &fs->prev_key, &min_rowset_key) < 0) {
1682                 // write everything to the same output if the max key in the temp file (prev_key) is < min of the sorted rowset
1683                 result = write_rowset_to_file(bl, fs->sorted_output, rows);
1684                 if (result == 0) {
1685                     // set the max key in the temp file to the max key in the sorted rowset
1686                     result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
1687                 }
1688             } else {
1689                 // write the sorted rowset into a new temp file
1690                 if (fs->have_sorted_output) {
1691                     fs->have_sorted_output = false;
1692                     result = ft_loader_fi_close(&bl->file_infos, fs->sorted_output, true);
1693                 }
1694                 if (result == 0) {
1695                     FIDX sfile = FIDX_NULL;
1696                     result = extend_fileset(bl, fs, &sfile);
1697                     if (result == 0) {
1698                         result = write_rowset_to_file(bl, sfile, rows);
1699                         if (result == 0) {
1700                             fs->have_sorted_output = true; fs->sorted_output = sfile;
1701                             // set the max key in the temp file to the max key in the sorted rowset
1702                             result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
1703                         }
1704                     }
1705                 }
1706                 // Note: if result == 0 then invariant fs->have_sorted_output == true
1707             }
1708         }
1709     }
1710 
1711     destroy_rowset(&rows);
1712 
1713     //bl_time_t after_write = bl_time_now();
1714 
1715     return result;
1716 }
1717 
1718 // C function for testing sort_and_write_rows
ft_loader_sort_and_write_rows(struct rowset * rows,struct merge_fileset * fs,FTLOADER bl,int which_db,DB * dest_db,ft_compare_func compare)1719 int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare) {
1720     return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare);
1721 }
1722 
toku_merge_some_files_using_dbufio(const bool to_q,FIDX dest_data,QUEUE q,int n_sources,DBUFIO_FILESET bfs,FIDX srcs_fidxs[],FTLOADER bl,int which_db,DB * dest_db,ft_compare_func compare,int progress_allocation)1723 int toku_merge_some_files_using_dbufio(const bool to_q,
1724                                        FIDX dest_data,
1725                                        QUEUE q,
1726                                        int n_sources,
1727                                        DBUFIO_FILESET bfs,
1728                                        FIDX srcs_fidxs[/*n_sources*/],
1729                                        FTLOADER bl,
1730                                        int which_db,
1731                                        DB *dest_db,
1732                                        ft_compare_func compare,
1733                                        int progress_allocation)
1734 /* Effect: Given an array of FILE*'s each containing sorted, merge the data and
1735  * write it to an output.  All the files remain open after the merge.
1736  *   This merge is performed in one pass, so don't pass too many files in.  If
1737  * you need a tree of merges do it elsewhere.
1738  *   If TO_Q is true then we write rowsets into queue Q.  Otherwise we write
1739  * into dest_data.
1740  * Modifies:  May modify the arrays of files (but if modified, it must be a
1741  * permutation so the caller can use that array to close everything.)
1742  * Requires: The number of sources is at least one, and each of the input files
1743  * must have at least one row in it.
1744  * Arguments:
1745  *   to_q         boolean indicating that output is queue (true) or a file
1746  * (false)
1747  *   dest_data    where to write the sorted data
1748  *   q            where to write the sorted data
1749  *   n_sources    how many source files.
1750  *   srcs_data    the array of source data files.
1751  *   bl           the ft_loader.
1752  *   dest_db      the destination DB (used in the comparison function).
1753  * Return value: 0 on success, otherwise an error number.
1754  * The fidxs are not closed by this function.
1755  */
1756 {
1757     int result = 0;
1758 
1759     TOKU_FILE *dest_stream = to_q ? nullptr : toku_bl_fidx2file(bl, dest_data);
1760 
1761     // printf(" merge_some_files progress=%d fin at %d\n", bl->progress,
1762     // bl->progress+progress_allocation);
1763     DBT keys[n_sources];
1764     DBT vals[n_sources];
1765     uint64_t dataoff[n_sources];
1766     DBT zero = zero_dbt;  zero.flags=DB_DBT_REALLOC;
1767 
1768     for (int i=0; i<n_sources; i++) {
1769         keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably.
1770     }
1771 
1772     pqueue_t      *pq = NULL;
1773     pqueue_node_t *MALLOC_N(n_sources, pq_nodes); // freed in cleanup
1774     if (pq_nodes == NULL) { result = get_error_errno(); }
1775 
1776     if (result==0) {
1777         int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback);
1778         if (r!=0) result = r;
1779     }
1780 
1781     uint64_t n_rows = 0;
1782     if (result==0) {
1783         // load pqueue with first value from each source
1784         for (int i=0; i<n_sources; i++) {
1785             int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]);
1786             if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
1787             if (r!=0) {
1788                 result = r;
1789                 break;
1790             }
1791 
1792             pq_nodes[i].key = &keys[i];
1793             pq_nodes[i].val = &vals[i];
1794             pq_nodes[i].i   = i;
1795             r = pqueue_insert(pq, &pq_nodes[i]);
1796             if (r!=0) {
1797                 result = r;
1798                 // path tested by loader-dup-test5.tdbrun
1799                 // printf("%s:%d returning\n", __FILE__, __LINE__);
1800                 break;
1801             }
1802 
1803             dataoff[i] = 0;
1804             toku_mutex_lock(&bl->file_infos.lock);
1805             n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
1806             toku_mutex_unlock(&bl->file_infos.lock);
1807         }
1808     }
1809     uint64_t n_rows_done = 0;
1810 
1811     struct rowset *output_rowset = NULL;
1812     if (result==0 && to_q) {
1813         XMALLOC(output_rowset); // freed in cleanup
1814         int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
1815         if (r!=0) result = r;
1816     }
1817 
1818     // Allocate a buffer if we're compressing intermediates.
1819     char *uncompressed_buffer = nullptr;
1820     struct wbuf wb;
1821     if (bl->compress_intermediates && !to_q) {
1822         MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
1823         if (uncompressed_buffer == nullptr) {
1824             result = ENOMEM;
1825         }
1826     }
1827     wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
1828 
1829     //printf(" n_rows=%ld\n", n_rows);
1830     while (result==0 && pqueue_size(pq)>0) {
1831         int mini;
1832         {
1833             // get the minimum
1834             pqueue_node_t *node;
1835             int r = pqueue_pop(pq, &node);
1836             if (r!=0) {
1837                 result = r;
1838                 invariant(0);
1839                 break;
1840             }
1841             mini = node->i;
1842         }
1843         if (to_q) {
1844             if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
1845                 {
1846                     int r = toku_queue_enq(q, (void*)output_rowset, 1, NULL);
1847                     if (r!=0) {
1848                         result = r;
1849                         break;
1850                     }
1851                 }
1852                 XMALLOC(output_rowset); // freed in cleanup
1853                 {
1854                     int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
1855                     if (r!=0) {
1856                         result = r;
1857                         break;
1858                     }
1859                 }
1860             }
1861             {
1862                 int r = add_row(output_rowset, &keys[mini], &vals[mini]);
1863                 if (r!=0) {
1864                     result = r;
1865                     break;
1866                 }
1867             }
1868         } else {
1869             // write it to the dest file
1870             int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], &wb, bl);
1871             if (r!=0) {
1872                 result = r;
1873                 break;
1874             }
1875         }
1876 
1877         {
1878             // read next row from file that just sourced min value
1879             int r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]);
1880             if (r!=0) {
1881                 if (r==EOF) {
1882                     // on feof, queue size permanently smaller
1883                     toku_free(keys[mini].data);  keys[mini].data = NULL;
1884                     toku_free(vals[mini].data);  vals[mini].data = NULL;
1885                 } else {
1886                     fprintf(stderr, "%s:%d r=%d errno=%d bfs=%p mini=%d\n", __FILE__, __LINE__, r, get_maybe_error_errno(), bfs, mini);
1887                     dbufio_print(bfs);
1888                     result = r;
1889                     break;
1890                 }
1891             } else {
1892                 // insert value into queue (re-populate queue)
1893                 pq_nodes[mini].key = &keys[mini];
1894                 r = pqueue_insert(pq, &pq_nodes[mini]);
1895                 if (r!=0) {
1896                     // Note: This error path tested by loader-dup-test1.tdbrun (and by loader-dup-test4)
1897                     result = r;
1898                     // printf("%s:%d returning\n", __FILE__, __LINE__);
1899                     break;
1900                 }
1901             }
1902         }
1903 
1904         n_rows_done++;
1905         const uint64_t rows_per_report = size_factor*1024;
1906         if (n_rows_done%rows_per_report==0) {
1907             // need to update the progress.
1908             double fraction_of_remaining_we_just_did = (double)rows_per_report / (double)(n_rows - n_rows_done + rows_per_report);
1909             invariant(0<= fraction_of_remaining_we_just_did && fraction_of_remaining_we_just_did<=1);
1910             int progress_just_done = fraction_of_remaining_we_just_did * progress_allocation;
1911             progress_allocation -= progress_just_done;
1912             // ignore the result from update_progress here, we'll call update_progress again below, which will give us the nonzero result.
1913             int r = update_progress(progress_just_done, bl, "in file merge");
1914             if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
1915         }
1916     }
1917     if (result == 0 && uncompressed_buffer != nullptr && wb.ndone > 0) {
1918         result = bl_finish_compressed_write(dest_stream, &wb);
1919     }
1920 
1921     if (result==0 && to_q) {
1922         int r = toku_queue_enq(q, (void*)output_rowset, 1, NULL);
1923         if (r!=0)
1924             result = r;
1925         else
1926             output_rowset = NULL;
1927     }
1928 
1929     // cleanup
1930     if (uncompressed_buffer) {
1931         toku_free(uncompressed_buffer);
1932     }
1933     for (int i=0; i<n_sources; i++) {
1934         toku_free(keys[i].data);  keys[i].data = NULL;
1935         toku_free(vals[i].data);  vals[i].data = NULL;
1936     }
1937     if (output_rowset) {
1938         destroy_rowset(output_rowset);
1939         toku_free(output_rowset);
1940     }
1941     if (pq) { pqueue_free(pq); pq=NULL; }
1942     toku_free(pq_nodes);
1943     {
1944         int r = update_progress(progress_allocation, bl, "end of merge_some_files");
1945         //printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
1946         if (r!=0 && result==0) result = r;
1947     }
1948     return result;
1949 }
1950 
merge_some_files(const bool to_q,FIDX dest_data,QUEUE q,int n_sources,FIDX srcs_fidxs[],FTLOADER bl,int which_db,DB * dest_db,ft_compare_func compare,int progress_allocation)1951 static int merge_some_files (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation)
1952 {
1953     int result = 0;
1954     DBUFIO_FILESET bfs = NULL;
1955     int *MALLOC_N(n_sources, fds);
1956     if (fds == NULL)
1957         result = get_error_errno();
1958     if (result == 0) {
1959         for (int i = 0; i < n_sources; i++) {
1960             int r = fileno(
1961                 toku_bl_fidx2file(bl, srcs_fidxs[i])->file);  // we rely on the
1962                                                               // fact that when
1963                                                               // the files are
1964                                                               // closed, the fd
1965                                                               // is also closed.
1966             if (r == -1) {
1967                 result = get_error_errno();
1968                 break;
1969             }
1970             fds[i] = r;
1971         }
1972     }
1973     if (result==0) {
1974         int r = create_dbufio_fileset(&bfs, n_sources, fds,
1975                 memory_per_rowset_during_merge(bl, n_sources, to_q), bl->compress_intermediates);
1976         if (r!=0) { result = r; }
1977     }
1978 
1979     if (result==0) {
1980         int r = toku_merge_some_files_using_dbufio (to_q, dest_data, q, n_sources, bfs, srcs_fidxs, bl, which_db, dest_db, compare, progress_allocation);
1981         if (r!=0) { result = r; }
1982     }
1983 
1984     if (bfs!=NULL) {
1985         if (result != 0)
1986             (void) panic_dbufio_fileset(bfs, result);
1987         int r = destroy_dbufio_fileset(bfs);
1988         if (r!=0 && result==0) result=r;
1989         bfs = NULL;
1990     }
1991     if (fds!=NULL) {
1992         toku_free(fds);
1993         fds = NULL;
1994     }
1995     return result;
1996 }
1997 
int_min(int a,int b)1998 static int int_min (int a, int b)
1999 {
2000     if (a<b) return a;
2001     else return b;
2002 }
2003 
n_passes(int N,int B)2004 static int n_passes (int N, int B) {
2005     int result = 0;
2006     while (N>1) {
2007         N = (N+B-1)/B;
2008         result++;
2009     }
2010     return result;
2011 }
2012 
merge_files(struct merge_fileset * fs,FTLOADER bl,int which_db,DB * dest_db,ft_compare_func compare,int progress_allocation,QUEUE output_q)2013 int merge_files (struct merge_fileset *fs,
2014                  FTLOADER bl,
2015                  // These are needed for the comparison function and error callback.
2016                  int which_db, DB *dest_db, ft_compare_func compare,
2017                  int progress_allocation,
2018                  // Write rowsets into this queue.
2019                  QUEUE output_q
2020                  )
2021 /* Effect:  Given a fileset, merge all the files writing all the answers into a queue.
2022  *   All the files in fs, and any temporary files will be closed and unlinked (and the fileset will be empty)
2023  * Return value: 0 on success, otherwise an error number.
2024  *   On error *fs will contain no open files.  All the files (including any temporary files) will be closed and unlinked.
2025  *    (however the fs will still need to be deallocated.)
2026  */
2027 {
2028     //printf(" merge_files %d files\n", fs->n_temp_files);
2029     //printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
2030     const int final_mergelimit   = (size_factor == 1) ? 4 : merge_fanin(bl, true); // try for a merge to the leaf level
2031     const int earlier_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, false); // try for a merge at nonleaf.
2032     int n_passes_left  = (fs->n_temp_files<=final_mergelimit)
2033         ? 1
2034         : 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit);
2035     // printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left);
2036     int result = 0;
2037     while (fs->n_temp_files > 0) {
2038         int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
2039         progress_allocation -= progress_allocation_for_this_pass;
2040         //printf("%s:%d n_passes_left=%d progress_allocation_for_this_pass=%d\n", __FILE__, __LINE__, n_passes_left, progress_allocation_for_this_pass);
2041 
2042         invariant(fs->n_temp_files>0);
2043         struct merge_fileset next_file_set;
2044         bool to_queue = (bool)(fs->n_temp_files <= final_mergelimit);
2045         init_merge_fileset(&next_file_set);
2046         while (fs->n_temp_files>0) {
2047             // grab some files and merge them.
2048             int n_to_merge = int_min(to_queue?final_mergelimit:earlier_mergelimit, fs->n_temp_files);
2049 
2050             // We are about to do n_to_merge/n_temp_files of the remaining for this pass.
2051             int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
2052             // printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d b=%llu\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files, (long long unsigned) memory_per_rowset_during_merge(bl, n_to_merge, to_queue));
2053             progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;
2054 
2055             //printf("%s:%d merging\n", __FILE__, __LINE__);
2056             FIDX merged_data = FIDX_NULL;
2057 
2058             FIDX *XMALLOC_N(n_to_merge, data_fidxs);
2059             for (int i=0; i<n_to_merge; i++) {
2060                 data_fidxs[i] = FIDX_NULL;
2061             }
2062             for (int i=0; i<n_to_merge; i++) {
2063                 int idx = fs->n_temp_files -1 -i;
2064                 FIDX fidx = fs->data_fidxs[idx];
2065                 result = ft_loader_fi_reopen(&bl->file_infos, fidx, "r");
2066                 if (result) break;
2067                 data_fidxs[i] = fidx;
2068             }
2069             if (result==0 && !to_queue) {
2070                 result = extend_fileset(bl, &next_file_set,  &merged_data);
2071             }
2072 
2073             if (result==0) {
2074                 result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
2075                 // if result!=0, fall through
2076                 if (result==0) {
2077                     /*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0
2078                 }
2079             }
2080 
2081             //printf("%s:%d merged\n", __FILE__, __LINE__);
2082             for (int i=0; i<n_to_merge; i++) {
2083                 if (!fidx_is_null(data_fidxs[i])) {
2084                     {
2085                         int r = ft_loader_fi_close(&bl->file_infos, data_fidxs[i], true);
2086                         if (r!=0 && result==0) result = r;
2087                     }
2088                     {
2089                         int r = ft_loader_fi_unlink(&bl->file_infos, data_fidxs[i]);
2090                         if (r!=0 && result==0) result = r;
2091                     }
2092                     data_fidxs[i] = FIDX_NULL;
2093                 }
2094             }
2095 
2096             fs->n_temp_files -= n_to_merge;
2097             if (!to_queue && !fidx_is_null(merged_data)) {
2098                 int r = ft_loader_fi_close(&bl->file_infos, merged_data, true);
2099                 if (r!=0 && result==0) result = r;
2100             }
2101             toku_free(data_fidxs);
2102 
2103             if (result!=0) break;
2104         }
2105 
2106         destroy_merge_fileset(fs);
2107         *fs = next_file_set;
2108 
2109         // Update the progress
2110         n_passes_left--;
2111 
2112         if (result==0) { invariant(progress_allocation_for_this_pass==0); }
2113 
2114         if (result!=0) break;
2115     }
2116     if (result) ft_loader_set_panic(bl, result, true, which_db, nullptr, nullptr);
2117 
2118     {
2119         int r = toku_queue_eof(output_q);
2120         if (r!=0 && result==0) result = r;
2121     }
2122     // It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0)
2123     {
2124         int r = update_progress(progress_allocation, bl, "did merge_files");
2125         if (r!=0 && result==0) result = r;
2126     }
2127     return result;
2128 }
2129 
2130 struct subtree_info {
2131     int64_t block;
2132 };
2133 
2134 struct subtrees_info {
2135     int64_t next_free_block;
2136     int64_t n_subtrees;       // was n_blocks
2137     int64_t n_subtrees_limit;
2138     struct subtree_info *subtrees;
2139 };
2140 
subtrees_info_init(struct subtrees_info * p)2141 static void subtrees_info_init(struct subtrees_info *p) {
2142     p->next_free_block = p->n_subtrees = p->n_subtrees_limit = 0;
2143     p->subtrees = NULL;
2144 }
2145 
subtrees_info_destroy(struct subtrees_info * p)2146 static void subtrees_info_destroy(struct subtrees_info *p) {
2147     toku_free(p->subtrees);
2148     p->subtrees = NULL;
2149 }
2150 
allocate_node(struct subtrees_info * sts,int64_t b)2151 static void allocate_node (struct subtrees_info *sts, int64_t b) {
2152     if (sts->n_subtrees >= sts->n_subtrees_limit) {
2153         sts->n_subtrees_limit *= 2;
2154         XREALLOC_N(sts->n_subtrees_limit, sts->subtrees);
2155     }
2156     sts->subtrees[sts->n_subtrees].block = b;
2157     sts->n_subtrees++;
2158 }
2159 
2160 // dbuf will always contained 512-byte aligned buffer, but the length might not be a multiple of 512 bytes.  If that's what you want, then pad it.
2161 struct dbuf {
2162     unsigned char *buf;
2163     int buflen;
2164     int off;
2165     int error;
2166 };
2167 
2168 struct leaf_buf {
2169     BLOCKNUM blocknum;
2170     TXNID xid;
2171     uint64_t nkeys, ndata, dsize;
2172     FTNODE node;
2173     XIDS xids;
2174     uint64_t off;
2175 };
2176 
2177 struct translation {
2178     int64_t off, size;
2179 };
2180 
2181 struct dbout {
2182     int fd;
2183     toku_off_t current_off;
2184 
2185     int64_t n_translations;
2186     int64_t n_translations_limit;
2187     struct translation *translation;
2188     toku_mutex_t mutex;
2189     FT ft;
2190 };
2191 
dbout_init(struct dbout * out,FT ft)2192 static inline void dbout_init(struct dbout *out, FT ft) {
2193     out->fd = -1;
2194     out->current_off = 0;
2195     out->n_translations = out->n_translations_limit = 0;
2196     out->translation = NULL;
2197     toku_mutex_init(*loader_out_mutex_key, &out->mutex, nullptr);
2198     out->ft = ft;
2199 }
2200 
dbout_destroy(struct dbout * out)2201 static inline void dbout_destroy(struct dbout *out) {
2202     if (out->fd >= 0) {
2203         toku_os_close(out->fd);
2204         out->fd = -1;
2205     }
2206     toku_free(out->translation);
2207     out->translation = NULL;
2208     toku_mutex_destroy(&out->mutex);
2209 }
2210 
dbout_lock(struct dbout * out)2211 static inline void dbout_lock(struct dbout *out) {
2212     toku_mutex_lock(&out->mutex);
2213 }
2214 
dbout_unlock(struct dbout * out)2215 static inline void dbout_unlock(struct dbout *out) {
2216     toku_mutex_unlock(&out->mutex);
2217 }
2218 
seek_align_locked(struct dbout * out)2219 static void seek_align_locked(struct dbout *out) {
2220     toku_off_t old_current_off = out->current_off;
2221     int alignment = 4096;
2222     out->current_off += alignment-1;
2223     out->current_off &= ~(alignment-1);
2224     toku_off_t r = lseek(out->fd, out->current_off, SEEK_SET);
2225     invariant(r==out->current_off);
2226     invariant(out->current_off >= old_current_off);
2227     invariant(out->current_off < old_current_off+alignment);
2228     invariant(out->current_off % alignment == 0);
2229 }
2230 
seek_align(struct dbout * out)2231 static void seek_align(struct dbout *out) {
2232     dbout_lock(out);
2233     seek_align_locked(out);
2234     dbout_unlock(out);
2235 }
2236 
dbuf_init(struct dbuf * dbuf)2237 static void dbuf_init (struct dbuf *dbuf) {
2238     dbuf->buf = 0;
2239     dbuf->buflen = 0;
2240     dbuf->off = 0;
2241     dbuf->error = 0;
2242 }
2243 
dbuf_destroy(struct dbuf * dbuf)2244 static void dbuf_destroy (struct dbuf *dbuf) {
2245     toku_free(dbuf->buf); dbuf->buf = NULL;
2246 }
2247 
allocate_block(struct dbout * out,int64_t * ret_block_number)2248 static int allocate_block (struct dbout *out, int64_t *ret_block_number)
2249 // Return the new block number
2250 {
2251     int result = 0;
2252     dbout_lock(out);
2253     int64_t block_number = out->n_translations;
2254     if (block_number >= out->n_translations_limit) {
2255         int64_t old_n_translations_limit = out->n_translations_limit;
2256         struct translation *old_translation = out->translation;
2257         if (out->n_translations_limit==0) {
2258             out->n_translations_limit = 1;
2259         } else {
2260             out->n_translations_limit *= 2;
2261         }
2262         REALLOC_N(out->n_translations_limit, out->translation);
2263         if (out->translation == NULL) {
2264             result = get_error_errno();
2265             invariant(result);
2266             out->n_translations_limit = old_n_translations_limit;
2267             out->translation = old_translation;
2268             goto cleanup;
2269         }
2270     }
2271     out->n_translations++;
2272     *ret_block_number = block_number;
2273 cleanup:
2274     dbout_unlock(out);
2275     return result;
2276 }
2277 
putbuf_bytes(struct dbuf * dbuf,const void * bytes,int nbytes)2278 static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) {
2279     if (!dbuf->error && dbuf->off + nbytes > dbuf->buflen) {
2280         unsigned char *oldbuf = dbuf->buf;
2281         int oldbuflen = dbuf->buflen;
2282         dbuf->buflen += dbuf->off + nbytes;
2283         dbuf->buflen *= 2;
2284         REALLOC_N_ALIGNED(512, dbuf->buflen, dbuf->buf);
2285         if (dbuf->buf == NULL) {
2286             dbuf->error = get_error_errno();
2287             dbuf->buf = oldbuf;
2288             dbuf->buflen = oldbuflen;
2289         }
2290     }
2291     if (!dbuf->error) {
2292         memcpy(dbuf->buf + dbuf->off, bytes, nbytes);
2293         dbuf->off += nbytes;
2294     }
2295 }
2296 
putbuf_int32(struct dbuf * dbuf,int v)2297 static void putbuf_int32 (struct dbuf *dbuf, int v) {
2298     putbuf_bytes(dbuf, &v, 4);
2299 }
2300 
putbuf_int64(struct dbuf * dbuf,long long v)2301 static void putbuf_int64 (struct dbuf *dbuf, long long v) {
2302     putbuf_int32(dbuf, v>>32);
2303     putbuf_int32(dbuf, v&0xFFFFFFFF);
2304 }
2305 
start_leaf(struct dbout * out,const DESCRIPTOR UU (desc),int64_t lblocknum,TXNID xid,uint32_t UU (target_nodesize))2306 static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc), int64_t lblocknum, TXNID xid, uint32_t UU(target_nodesize)) {
2307     invariant(lblocknum < out->n_translations_limit);
2308 
2309     struct leaf_buf *XMALLOC(lbuf);
2310     lbuf->blocknum.b = lblocknum;
2311     lbuf->xid = xid;
2312     lbuf->nkeys = lbuf->ndata = lbuf->dsize = 0;
2313     lbuf->off = 0;
2314 
2315     lbuf->xids = toku_xids_get_root_xids();
2316     if (xid != TXNID_NONE) {
2317         XIDS new_xids = NULL;
2318         int r = toku_xids_create_child(lbuf->xids, &new_xids, xid);
2319         assert(r == 0 && new_xids);
2320         toku_xids_destroy(&lbuf->xids);
2321         lbuf->xids = new_xids;
2322     }
2323 
2324     FTNODE XMALLOC(node);
2325     toku_initialize_empty_ftnode(node, lbuf->blocknum, 0 /*height*/, 1 /*basement nodes*/, FT_LAYOUT_VERSION, 0);
2326     BP_STATE(node, 0) = PT_AVAIL;
2327     lbuf->node = node;
2328 
2329     return lbuf;
2330 }
2331 
2332 static void finish_leafnode(
2333     struct dbout* out,
2334     struct leaf_buf* lbuf,
2335     int progress_allocation,
2336     FTLOADER bl,
2337     uint32_t target_basementnodesize,
2338     enum toku_compression_method target_compression_method);
2339 
2340 static int write_nonleaves(
2341     FTLOADER bl,
2342     FIDX pivots_fidx,
2343     struct dbout* out,
2344     struct subtrees_info* sts,
2345     const DESCRIPTOR descriptor,
2346     uint32_t target_nodesize,
2347     uint32_t target_basementnodesize,
2348     enum toku_compression_method target_compression_method);
2349 
2350 static void add_pair_to_leafnode(
2351     struct leaf_buf* lbuf,
2352     unsigned char* key,
2353     int keylen,
2354     unsigned char* val,
2355     int vallen,
2356     int this_leafentry_size,
2357     STAT64INFO stats_to_update,
2358     int64_t* logical_rows_delta);
2359 
2360 static int write_translation_table(
2361     struct dbout* out,
2362     long long* off_of_translation_p);
2363 
2364 static int write_header(
2365     struct dbout* out,
2366     long long translation_location_on_disk,
2367     long long translation_size_on_disk);
2368 
drain_writer_q(QUEUE q)2369 static void drain_writer_q(QUEUE q) {
2370     void *item;
2371     while (1) {
2372         int r = toku_queue_deq(q, &item, NULL, NULL);
2373         if (r == EOF)
2374             break;
2375         invariant(r == 0);
2376         struct rowset *rowset = (struct rowset *) item;
2377         destroy_rowset(rowset);
2378         toku_free(rowset);
2379     }
2380 }
2381 
cleanup_maxkey(DBT * maxkey)2382 static void cleanup_maxkey(DBT *maxkey) {
2383     if (maxkey->flags == DB_DBT_REALLOC) {
2384         toku_free(maxkey->data);
2385         maxkey->data = NULL;
2386         maxkey->flags = 0;
2387     }
2388 }
2389 
update_maxkey(DBT * maxkey,DBT * key)2390 static void update_maxkey(DBT *maxkey, DBT *key) {
2391     cleanup_maxkey(maxkey);
2392     *maxkey = *key;
2393 }
2394 
copy_maxkey(DBT * maxkey)2395 static int copy_maxkey(DBT *maxkey) {
2396     DBT newkey;
2397     toku_init_dbt_flags(&newkey, DB_DBT_REALLOC);
2398     int r = toku_dbt_set(maxkey->size, maxkey->data, &newkey, NULL);
2399     if (r == 0)
2400         update_maxkey(maxkey, &newkey);
2401     return r;
2402 }
2403 
toku_loader_write_ft_from_q(FTLOADER bl,const DESCRIPTOR descriptor,int fd,int progress_allocation,QUEUE q,uint64_t total_disksize_estimate,int which_db,uint32_t target_nodesize,uint32_t target_basementnodesize,enum toku_compression_method target_compression_method,uint32_t target_fanout)2404 static int toku_loader_write_ft_from_q (FTLOADER bl,
2405                                          const DESCRIPTOR descriptor,
2406                                          int fd, // write to here
2407                                          int progress_allocation,
2408                                          QUEUE q,
2409                                          uint64_t total_disksize_estimate,
2410                                          int which_db,
2411                                          uint32_t target_nodesize,
2412                                          uint32_t target_basementnodesize,
2413                                          enum toku_compression_method target_compression_method,
2414                                          uint32_t target_fanout)
2415 // Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree.  Closes fd.
2416 {
2417     // set the number of fractal tree writer threads so that we can partition memory in the merger
2418     ft_loader_set_fractal_workers_count(bl);
2419 
2420     int result = 0;
2421     int r;
2422 
2423     // The pivots file will contain all the pivot strings (in the form <size(32bits)> <data>)
2424     // The pivots_fname is the name of the pivots file.
2425     // Note that the pivots file will have one extra pivot in it (the last key in the dictionary) which will not appear in the tree.
2426     int64_t n_pivots=0; // number of pivots in pivots_file
2427     FIDX pivots_file;  // the file
2428 
2429     r = ft_loader_open_temp_file (bl, &pivots_file);
2430     if (r) {
2431         result = r;
2432         drain_writer_q(q);
2433         r = toku_os_close(fd);
2434         assert_zero(r);
2435         return result;
2436     }
2437     TOKU_FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
2438 
2439     TXNID root_xid_that_created = TXNID_NONE;
2440     if (bl->root_xids_that_created)
2441         root_xid_that_created = bl->root_xids_that_created[which_db];
2442 
2443     // TODO: (Zardosht/Yoni/Leif), do this code properly
2444     struct ft ft;
2445     toku_ft_init(&ft, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
2446 
2447     struct dbout out;
2448     ZERO_STRUCT(out);
2449     dbout_init(&out, &ft);
2450     out.fd = fd;
2451     out.current_off = 8192; // leave 8K reserved at beginning
2452     out.n_translations = 3; // 3 translations reserved at the beginning
2453     out.n_translations_limit = 4;
2454     MALLOC_N(out.n_translations_limit, out.translation);
2455     if (out.translation == NULL) {
2456         result = get_error_errno();
2457         dbout_destroy(&out);
2458         drain_writer_q(q);
2459         toku_free(ft.h);
2460         return result;
2461     }
2462 
2463     // The blocks_array will contain all the block numbers that correspond to the pivots.  Generally there should be one more block than pivot.
2464     struct subtrees_info sts;
2465     subtrees_info_init(&sts);
2466     sts.next_free_block  = 3;
2467     sts.n_subtrees       = 0;
2468     sts.n_subtrees_limit = 1;
2469     MALLOC_N(sts.n_subtrees_limit, sts.subtrees);
2470     if (sts.subtrees == NULL) {
2471         result = get_error_errno();
2472         subtrees_info_destroy(&sts);
2473         dbout_destroy(&out);
2474         drain_writer_q(q);
2475         toku_free(ft.h);
2476         return result;
2477     }
2478 
2479     out.translation[0].off = -2LL; out.translation[0].size = 0; // block 0 is NULL
2480     invariant(1==RESERVED_BLOCKNUM_TRANSLATION);
2481     invariant(2==RESERVED_BLOCKNUM_DESCRIPTOR);
2482     out.translation[1].off = -1;                                // block 1 is the block translation, filled in later
2483     out.translation[2].off = -1;                                // block 2 is the descriptor
2484     seek_align(&out);
2485     int64_t lblock = 0;  // make gcc --happy
2486     result = allocate_block(&out, &lblock);
2487     invariant(result == 0); // can not fail since translations reserved above
2488 
2489     TXNID le_xid = leafentry_xid(bl, which_db);
2490     struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
2491     uint64_t n_rows_remaining = bl->n_rows;
2492     uint64_t old_n_rows_remaining = bl->n_rows;
2493 
2494     uint64_t  used_estimate = 0;  // how much diskspace have we used up?
2495 
2496     DBT maxkey = make_dbt(0, 0); // keep track of the max key of the current node
2497 
2498     STAT64INFO_S deltas = ZEROSTATS;
2499     // This is just a placeholder and not used in the loader, the real/accurate
2500     // stats will come out of 'deltas' because this loader is not pushing
2501     // messages down into the top of a fractal tree where the logical row count
2502     // is done, it is directly creating leaf entries so it must also take on
2503     // performing the logical row counting on its own
2504     int64_t logical_rows_delta = 0;
2505     while (result == 0) {
2506         void *item;
2507         {
2508             int rr = toku_queue_deq(q, &item, NULL, NULL);
2509             if (rr == EOF) break;
2510             if (rr != 0) {
2511                 ft_loader_set_panic(bl, rr, true, which_db, nullptr, nullptr);
2512                 break;
2513             }
2514         }
2515         struct rowset *output_rowset = (struct rowset *)item;
2516 
2517         for (unsigned int i = 0; i < output_rowset->n_rows; i++) {
2518             DBT key = make_dbt(output_rowset->data+output_rowset->rows[i].off,                               output_rowset->rows[i].klen);
2519             DBT val = make_dbt(output_rowset->data+output_rowset->rows[i].off + output_rowset->rows[i].klen, output_rowset->rows[i].vlen);
2520 
2521             size_t this_leafentry_size = ft_loader_leafentry_size(key.size, val.size, le_xid);
2522 
2523             used_estimate += this_leafentry_size;
2524 
2525             // Spawn off a node if
2526             //   a) there is at least one row in it, and
2527             //   b) this item would make the nodesize too big, or
2528             //   c) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount
2529             uint64_t remaining_amount = total_disksize_estimate - used_estimate;
2530             uint64_t used_here = lbuf->off + 1000;             // leave 1000 for various overheads.
2531             uint64_t target_size = (target_nodesize*7L)/8;     // use only 7/8 of the node.
2532             uint64_t used_here_with_next_key = used_here + this_leafentry_size;
2533             if (lbuf->nkeys > 0 &&
2534                 ((used_here_with_next_key >= target_size) || (used_here + remaining_amount >= target_size && lbuf->off > remaining_amount))) {
2535 
2536                 int progress_this_node = progress_allocation * (double)(old_n_rows_remaining - n_rows_remaining)/(double)old_n_rows_remaining;
2537                 progress_allocation -= progress_this_node;
2538                 old_n_rows_remaining = n_rows_remaining;
2539 
2540                 allocate_node(&sts, lblock);
2541 
2542                 n_pivots++;
2543 
2544                 invariant(maxkey.data != NULL);
2545                 if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, nullptr, bl))) {
2546                     ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
2547                     if (result == 0) result = r;
2548                     break;
2549                 }
2550 
2551                 finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method);
2552                 lbuf = NULL;
2553 
2554                 r = allocate_block(&out, &lblock);
2555                 if (r != 0) {
2556                     ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
2557                     if (result == 0) result = r;
2558                     break;
2559                 }
2560                 lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
2561             }
2562 
2563             add_pair_to_leafnode(
2564                 lbuf,
2565                 (unsigned char*)key.data,
2566                 key.size,
2567                 (unsigned char*)val.data,
2568                 val.size,
2569                 this_leafentry_size,
2570                 &deltas,
2571                 &logical_rows_delta);
2572             n_rows_remaining--;
2573 
2574             update_maxkey(&maxkey, &key); // set the new maxkey to the current key
2575         }
2576 
2577         r = copy_maxkey(&maxkey); // make a copy of maxkey before the rowset is destroyed
2578         if (result == 0)
2579             result = r;
2580         destroy_rowset(output_rowset);
2581         toku_free(output_rowset);
2582 
2583         if (result == 0)
2584             result = ft_loader_get_error(&bl->error_callback); // check if an error was posted and terminate this quickly
2585     }
2586 
2587     if (deltas.numrows || deltas.numbytes) {
2588         toku_ft_update_stats(&ft.in_memory_stats, deltas);
2589     }
2590 
2591     // As noted above, the loader directly creates a tree structure without
2592     // going through the higher level ft API and tus bypasses the logical row
2593     // counting performed at that level. So, we must manually update the logical
2594     // row count with the info we have from the physical delta that comes out of
2595     // add_pair_to_leafnode.
2596     toku_ft_adjust_logical_row_count(&ft, deltas.numrows);
2597 
2598     cleanup_maxkey(&maxkey);
2599 
2600     if (lbuf) {
2601         allocate_node(&sts, lblock);
2602         {
2603             int p = progress_allocation/2;
2604             finish_leafnode(&out, lbuf, p, bl, target_basementnodesize, target_compression_method);
2605             progress_allocation -= p;
2606         }
2607     }
2608 
2609 
2610     if (result == 0) {
2611         result = ft_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
2612     }
2613 
2614     if (result != 0) goto error;
2615 
2616     // We haven't paniced, so the sum should add up.
2617     invariant(used_estimate == total_disksize_estimate);
2618 
2619     n_pivots++;
2620 
2621     {
2622         DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file.
2623         r = bl_write_dbt(&key, pivots_stream, NULL, nullptr, bl);
2624         if (r) {
2625             result = r; goto error;
2626         }
2627     }
2628 
2629     r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
2630     if (r) {
2631         result = r; goto error;
2632     }
2633 
2634     {
2635         invariant(sts.n_subtrees==1);
2636         out.ft->h->root_blocknum = make_blocknum(sts.subtrees[0].block);
2637         toku_free(sts.subtrees); sts.subtrees = NULL;
2638 
2639         // write the descriptor
2640         {
2641             seek_align(&out);
2642             invariant(out.n_translations >= RESERVED_BLOCKNUM_DESCRIPTOR);
2643             invariant(out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off == -1);
2644             out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off = out.current_off;
2645             size_t desc_size = 4+toku_serialize_descriptor_size(descriptor);
2646             invariant(desc_size>0);
2647             out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].size = desc_size;
2648             struct wbuf wbuf;
2649             char *XMALLOC_N(desc_size, buf);
2650             wbuf_init(&wbuf, buf, desc_size);
2651             toku_serialize_descriptor_contents_to_wbuf(&wbuf, descriptor);
2652             uint32_t checksum = toku_x1764_finish(&wbuf.checksum);
2653             wbuf_int(&wbuf, checksum);
2654             invariant(wbuf.ndone==desc_size);
2655             r = toku_os_write(out.fd, wbuf.buf, wbuf.ndone);
2656             out.current_off += desc_size;
2657             toku_free(buf);    // wbuf_destroy
2658             if (r) {
2659                 result = r; goto error;
2660             }
2661         }
2662 
2663         long long off_of_translation;
2664         r = write_translation_table(&out, &off_of_translation);
2665         if (r) {
2666             result = r; goto error;
2667         }
2668 
2669         r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4);
2670         if (r) {
2671             result = r; goto error;
2672         }
2673 
2674         r = update_progress(progress_allocation, bl, "wrote tdb file");
2675         if (r) {
2676             result = r; goto error;
2677         }
2678     }
2679 
2680     r = fsync(out.fd);
2681     if (r) {
2682         result = get_error_errno(); goto error;
2683     }
2684 
2685     // Do we need to pay attention to user_said_stop?  Or should the guy at the other end of the queue pay attention and send in an EOF.
2686 
2687  error:
2688     {
2689         int rr = toku_os_close(fd);
2690         if (rr)
2691             result = get_error_errno();
2692     }
2693     out.fd = -1;
2694 
2695     subtrees_info_destroy(&sts);
2696     dbout_destroy(&out);
2697     drain_writer_q(q);
2698     toku_free(ft.h);
2699 
2700     return result;
2701 }
2702 
toku_loader_write_ft_from_q_in_C(FTLOADER bl,const DESCRIPTOR descriptor,int fd,int progress_allocation,QUEUE q,uint64_t total_disksize_estimate,int which_db,uint32_t target_nodesize,uint32_t target_basementnodesize,enum toku_compression_method target_compression_method,uint32_t target_fanout)2703 int toku_loader_write_ft_from_q_in_C (FTLOADER                bl,
2704                                       const DESCRIPTOR descriptor,
2705                                       int                      fd, // write to here
2706                                       int                      progress_allocation,
2707                                       QUEUE                    q,
2708                                       uint64_t                 total_disksize_estimate,
2709                                       int                      which_db,
2710                                       uint32_t                 target_nodesize,
2711                                       uint32_t                 target_basementnodesize,
2712                                       enum toku_compression_method target_compression_method,
2713                                       uint32_t                 target_fanout)
2714 // This is probably only for testing.
2715 {
2716     target_nodesize = target_nodesize == 0 ? default_loader_nodesize : target_nodesize;
2717     target_basementnodesize = target_basementnodesize == 0 ? default_loader_basementnodesize : target_basementnodesize;
2718     return toku_loader_write_ft_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
2719 }
2720 
2721 
fractal_thread(void * ftav)2722 static void* fractal_thread (void *ftav) {
2723     struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
2724     int r = toku_loader_write_ft_from_q(fta->bl,
2725                                         fta->descriptor,
2726                                         fta->fd,
2727                                         fta->progress_allocation,
2728                                         fta->q,
2729                                         fta->total_disksize_estimate,
2730                                         fta->which_db,
2731                                         fta->target_nodesize,
2732                                         fta->target_basementnodesize,
2733                                         fta->target_compression_method,
2734                                         fta->target_fanout);
2735     fta->errno_result = r;
2736     toku_instr_delete_current_thread();
2737     return toku_pthread_done(nullptr);
2738 }
2739 
loader_do_i(FTLOADER bl,int which_db,DB * dest_db,ft_compare_func compare,const DESCRIPTOR descriptor,const char * new_fname,int progress_allocation)2740 static int loader_do_i(FTLOADER bl,
2741                        int which_db,
2742                        DB *dest_db,
2743                        ft_compare_func compare,
2744                        const DESCRIPTOR descriptor,
2745                        const char *new_fname,
2746                        int progress_allocation  // how much progress do I need
2747                                                 // to add into bl->progress by
2748                                                 // the end..
2749                        )
2750 /* Effect: Handle the file creating for one particular DB in the bulk loader. */
2751 /* Requires: The data is fully extracted, so we can do merges out of files and
2752    write the ft file. */
2753 {
2754     //printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
2755     struct merge_fileset *fs = &(bl->fs[which_db]);
2756     struct rowset *rows = &(bl->rows[which_db]);
2757     invariant(rows->data==NULL); // the rows should be all cleaned up already
2758 
2759     int r = toku_queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
2760     if (r) goto error;
2761 
2762     {
2763         mode_t mode = S_IRUSR + S_IWUSR + S_IRGRP + S_IWGRP;
2764         int fd = toku_os_open(new_fname,
2765                               O_RDWR | O_CREAT | O_BINARY,
2766                               mode,
2767                               *tokudb_file_load_key);  // #2621
2768         if (fd < 0) {
2769             r = get_error_errno();
2770             goto error;
2771         }
2772 
2773         uint32_t target_nodesize, target_basementnodesize, target_fanout;
2774         enum toku_compression_method target_compression_method;
2775         r = dest_db->get_pagesize(dest_db, &target_nodesize);
2776         invariant_zero(r);
2777         r = dest_db->get_readpagesize(dest_db, &target_basementnodesize);
2778         invariant_zero(r);
2779         r = dest_db->get_compression_method(dest_db, &target_compression_method);
2780         invariant_zero(r);
2781         r = dest_db->get_fanout(dest_db, &target_fanout);
2782         invariant_zero(r);
2783 
2784         if (bl->allow_puts) {
2785             // a better allocation would be to figure out roughly how many merge passes we'll need.
2786             int allocation_for_merge = (2*progress_allocation)/3;
2787             progress_allocation -= allocation_for_merge;
2788 
2789             // This structure must stay live until the join below.
2790             struct fractal_thread_args fta = {bl,
2791                                               descriptor,
2792                                               fd,
2793                                               progress_allocation,
2794                                               bl->fractal_queues[which_db],
2795                                               bl->extracted_datasizes[which_db],
2796                                               0,
2797                                               which_db,
2798                                               target_nodesize,
2799                                               target_basementnodesize,
2800                                               target_compression_method,
2801                                               target_fanout};
2802 
2803             r = toku_pthread_create(*fractal_thread_key,
2804                                     bl->fractal_threads + which_db,
2805                                     nullptr,
2806                                     fractal_thread,
2807                                     static_cast<void *>(&fta));
2808             if (r) {
2809                 int r2 __attribute__((__unused__)) =
2810                     toku_queue_destroy(bl->fractal_queues[which_db]);
2811                 // ignore r2, since we already have an error
2812                 bl->fractal_queues[which_db] = nullptr;
2813                 goto error;
2814             }
2815             invariant(bl->fractal_threads_live[which_db]==false);
2816             bl->fractal_threads_live[which_db] = true;
2817 
2818             r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
2819 
2820             {
2821                 void *toku_pthread_retval;
2822                 int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
2823                 invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here.  A previous bug put that struct into a C block statement.
2824                 resource_assert_zero(r2);
2825                 invariant(toku_pthread_retval==NULL);
2826                 invariant(bl->fractal_threads_live[which_db]);
2827                 bl->fractal_threads_live[which_db] = false;
2828                 if (r == 0) r = fta.errno_result;
2829             }
2830         } else {
2831             toku_queue_eof(bl->fractal_queues[which_db]);
2832             r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation,
2833                                             bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db,
2834                                             target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
2835         }
2836     }
2837 
2838  error: // this is the cleanup code.  Even if r==0 (no error) we fall through to here.
2839     if (bl->fractal_queues[which_db]) {
2840         int r2 = toku_queue_destroy(bl->fractal_queues[which_db]);
2841         invariant(r2==0);
2842         bl->fractal_queues[which_db] = nullptr;
2843     }
2844 
2845     // if we get here we need to free up the merge_fileset and the rowset, as well as the keys
2846     toku_free(rows->data); rows->data = NULL;
2847     toku_free(rows->rows); rows->rows = NULL;
2848     toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
2849     return r;
2850 }
2851 
toku_ft_loader_close_internal(FTLOADER bl)2852 static int toku_ft_loader_close_internal (FTLOADER bl)
2853 /* Effect: Close the bulk loader.
2854  * Return all the file descriptors in the array fds. */
2855 {
2856     int result = 0;
2857     if (bl->N == 0)
2858         result = update_progress(PROGRESS_MAX, bl, "done");
2859     else {
2860         int remaining_progress = PROGRESS_MAX;
2861         for (int i = 0; i < bl->N; i++) {
2862             // Take the unallocated progress and divide it among the unfinished jobs.
2863             // This calculation allocates all of the PROGRESS_MAX bits of progress to some job.
2864             int allocate_here = remaining_progress/(bl->N - i);
2865             remaining_progress -= allocate_here;
2866             char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[i]);
2867             result = loader_do_i(bl, i, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd, allocate_here);
2868             toku_free(fname_in_cwd);
2869             if (result != 0)
2870                 goto error;
2871             invariant(0 <= bl->progress && bl->progress <= PROGRESS_MAX);
2872         }
2873         if (result==0) invariant(remaining_progress==0);
2874 
2875         // fsync the directory containing the new tokudb files.
2876         char *fname0 = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[0]);
2877         int r = toku_fsync_directory(fname0);
2878         toku_free(fname0);
2879         if (r != 0) {
2880             result = r; goto error;
2881         }
2882     }
2883     invariant(bl->file_infos.n_files_open   == 0);
2884     invariant(bl->file_infos.n_files_extant == 0);
2885     invariant(bl->progress == PROGRESS_MAX);
2886  error:
2887     toku_ft_loader_internal_destroy(bl, (bool)(result!=0));
2888     return result;
2889 }
2890 
toku_ft_loader_close(FTLOADER bl,ft_loader_error_func error_function,void * error_extra,ft_loader_poll_func poll_function,void * poll_extra)2891 int toku_ft_loader_close (FTLOADER bl,
2892                            ft_loader_error_func error_function, void *error_extra,
2893                            ft_loader_poll_func  poll_function,  void *poll_extra
2894                            )
2895 {
2896     int result = 0;
2897 
2898     int r;
2899 
2900     //printf("Closing\n");
2901 
2902     ft_loader_set_error_function(&bl->error_callback, error_function, error_extra);
2903 
2904     ft_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);
2905 
2906     if (bl->extractor_live) {
2907         r = finish_extractor(bl);
2908         if (r)
2909             result = r;
2910         invariant(!bl->extractor_live);
2911     } else {
2912         r = finish_primary_rows(bl);
2913         if (r)
2914             result = r;
2915     }
2916 
2917     // check for an error during extraction
2918     if (result == 0) {
2919         r = ft_loader_call_error_function(&bl->error_callback);
2920         if (r)
2921             result = r;
2922     }
2923 
2924     if (result == 0) {
2925         r = toku_ft_loader_close_internal(bl);
2926         if (r && result == 0)
2927             result = r;
2928     } else
2929         toku_ft_loader_internal_destroy(bl, true);
2930 
2931     return result;
2932 }
2933 
toku_ft_loader_finish_extractor(FTLOADER bl)2934 int toku_ft_loader_finish_extractor(FTLOADER bl) {
2935     int result = 0;
2936     if (bl->extractor_live) {
2937         int r = finish_extractor(bl);
2938         if (r)
2939             result = r;
2940         invariant(!bl->extractor_live);
2941     } else
2942         result = EINVAL;
2943     return result;
2944 }
2945 
toku_ft_loader_abort(FTLOADER bl,bool is_error)2946 int toku_ft_loader_abort(FTLOADER bl, bool is_error)
2947 /* Effect : Abort the bulk loader, free ft_loader resources */
2948 {
2949     int result = 0;
2950 
2951     // cleanup the extractor thread
2952     if (bl->extractor_live) {
2953         int r = finish_extractor(bl);
2954         if (r)
2955             result = r;
2956         invariant(!bl->extractor_live);
2957     }
2958 
2959     for (int i = 0; i < bl->N; i++)
2960         invariant(!bl->fractal_threads_live[i]);
2961 
2962     toku_ft_loader_internal_destroy(bl, is_error);
2963     return result;
2964 }
2965 
toku_ft_loader_get_error(FTLOADER bl,int * error)2966 int toku_ft_loader_get_error(FTLOADER bl, int *error) {
2967     *error = ft_loader_get_error(&bl->error_callback);
2968     return 0;
2969 }
2970 
add_pair_to_leafnode(struct leaf_buf * lbuf,unsigned char * key,int keylen,unsigned char * val,int vallen,int this_leafentry_size,STAT64INFO stats_to_update,int64_t * logical_rows_delta)2971 static void add_pair_to_leafnode(
2972     struct leaf_buf* lbuf,
2973     unsigned char* key,
2974     int keylen,
2975     unsigned char* val,
2976     int vallen,
2977     int this_leafentry_size,
2978     STAT64INFO stats_to_update,
2979     int64_t* logical_rows_delta) {
2980 
2981     lbuf->nkeys++;
2982     lbuf->ndata++;
2983     lbuf->dsize += keylen + vallen;
2984     lbuf->off += this_leafentry_size;
2985 
2986     // append this key val pair to the leafnode
2987     // #3588 TODO just make a clean ule and append it to the omt
2988     // #3588 TODO can do the rebalancing here and avoid a lot of work later
2989     FTNODE leafnode = lbuf->node;
2990     uint32_t idx = BLB_DATA(leafnode, 0)->num_klpairs();
2991     DBT kdbt, vdbt;
2992     ft_msg msg(
2993         toku_fill_dbt(&kdbt, key, keylen),
2994         toku_fill_dbt(&vdbt, val, vallen),
2995         FT_INSERT,
2996         ZERO_MSN,
2997         lbuf->xids);
2998     uint64_t workdone = 0;
2999     // there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info
3000     txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
3001     toku_ft_bn_apply_msg_once(
3002         BLB(leafnode, 0),
3003         msg,
3004         idx,
3005         keylen,
3006         NULL,
3007         &gc_info,
3008         &workdone,
3009         stats_to_update,
3010         logical_rows_delta);
3011 }
3012 
write_literal(struct dbout * out,void * data,size_t len)3013 static int write_literal(struct dbout *out, void*data,  size_t len) {
3014     invariant(out->current_off%4096==0);
3015     int result = toku_os_write(out->fd, data, len);
3016     if (result == 0)
3017         out->current_off+=len;
3018     return result;
3019 }
3020 
finish_leafnode(struct dbout * out,struct leaf_buf * lbuf,int progress_allocation,FTLOADER bl,uint32_t target_basementnodesize,enum toku_compression_method target_compression_method)3021 static void finish_leafnode(
3022     struct dbout* out,
3023     struct leaf_buf* lbuf,
3024     int progress_allocation,
3025     FTLOADER bl,
3026     uint32_t target_basementnodesize,
3027     enum toku_compression_method target_compression_method) {
3028 
3029     int result = 0;
3030 
3031     // serialize leaf to buffer
3032     size_t serialized_leaf_size = 0;
3033     size_t uncompressed_serialized_leaf_size = 0;
3034     char *serialized_leaf = NULL;
3035     FTNODE_DISK_DATA ndd = NULL;
3036     result = toku_serialize_ftnode_to_memory(
3037         lbuf->node,
3038         &ndd,
3039         target_basementnodesize,
3040         target_compression_method,
3041         true,
3042         true,
3043         &serialized_leaf_size,
3044         &uncompressed_serialized_leaf_size,
3045         &serialized_leaf);
3046 
3047     // write it out
3048     if (result == 0) {
3049         dbout_lock(out);
3050         long long off_of_leaf = out->current_off;
3051         result = write_literal(out, serialized_leaf, serialized_leaf_size);
3052         if (result == 0) {
3053             out->translation[lbuf->blocknum.b].off  = off_of_leaf;
3054             out->translation[lbuf->blocknum.b].size = serialized_leaf_size;
3055             seek_align_locked(out);
3056         }
3057         dbout_unlock(out);
3058     }
3059 
3060     // free the node
3061     if (serialized_leaf) {
3062         toku_free(ndd);
3063         toku_free(serialized_leaf);
3064     }
3065     toku_ftnode_free(&lbuf->node);
3066     toku_xids_destroy(&lbuf->xids);
3067     toku_free(lbuf);
3068 
3069     //printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
3070     if (result == 0)
3071         result = update_progress(progress_allocation, bl, "wrote node");
3072 
3073     if (result)
3074         ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
3075 }
3076 
write_translation_table(struct dbout * out,long long * off_of_translation_p)3077 static int write_translation_table (struct dbout *out, long long *off_of_translation_p) {
3078     seek_align(out);
3079     struct dbuf ttable;
3080     dbuf_init(&ttable);
3081     long long off_of_translation = out->current_off;
3082     long long bt_size_on_disk = out->n_translations * 16 + 20;
3083     putbuf_int64(&ttable, out->n_translations);    // number of records
3084     putbuf_int64(&ttable, -1LL); // the linked list
3085     out->translation[1].off = off_of_translation;
3086     out->translation[1].size = bt_size_on_disk;
3087     for (int i=0; i<out->n_translations; i++) {
3088         putbuf_int64(&ttable, out->translation[i].off);
3089         putbuf_int64(&ttable, out->translation[i].size);
3090     }
3091     unsigned int checksum = toku_x1764_memory(ttable.buf, ttable.off);
3092     putbuf_int32(&ttable, checksum);
3093     // pad it to 512 zeros
3094     long long encoded_length = ttable.off;
3095     {
3096         int nbytes_to_add = roundup_to_multiple(512, ttable.off) - encoded_length;
3097         char zeros[nbytes_to_add];
3098         for (int i=0; i<nbytes_to_add; i++) zeros[i]=0;
3099         putbuf_bytes(&ttable, zeros, nbytes_to_add);
3100     }
3101     int result = ttable.error;
3102     if (result == 0) {
3103         invariant(bt_size_on_disk==encoded_length);
3104         result = toku_os_pwrite(out->fd, ttable.buf, ttable.off, off_of_translation);
3105     }
3106     dbuf_destroy(&ttable);
3107     *off_of_translation_p = off_of_translation;
3108     return result;
3109 }
3110 
write_header(struct dbout * out,long long translation_location_on_disk,long long translation_size_on_disk)3111 static int write_header(
3112     struct dbout* out,
3113     long long translation_location_on_disk,
3114     long long translation_size_on_disk) {
3115 
3116     int result = 0;
3117     size_t size = toku_serialize_ft_size(out->ft->h);
3118     size_t alloced_size = roundup_to_multiple(512, size);
3119     struct wbuf wbuf;
3120     char *MALLOC_N_ALIGNED(512, alloced_size, buf);
3121     if (buf == NULL) {
3122         result = get_error_errno();
3123     } else {
3124         wbuf_init(&wbuf, buf, size);
3125         out->ft->h->on_disk_stats = out->ft->in_memory_stats;
3126         out->ft->h->on_disk_logical_rows = out->ft->in_memory_logical_rows;
3127         toku_serialize_ft_to_wbuf(&wbuf, out->ft->h, translation_location_on_disk, translation_size_on_disk);
3128         for (size_t i=size; i<alloced_size; i++) buf[i]=0; // initialize all those unused spots to zero
3129         if (wbuf.ndone != size)
3130             result = EINVAL;
3131         else {
3132             assert(wbuf.ndone <= alloced_size);
3133             result = toku_os_pwrite(out->fd, wbuf.buf, alloced_size, 0);
3134         }
3135         toku_free(buf);
3136     }
3137     return result;
3138 }
3139 
read_some_pivots(FIDX pivots_file,int n_to_read,FTLOADER bl,DBT pivots[])3140 static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl,
3141                       /*out*/ DBT pivots[/*n_to_read*/])
3142 // pivots is an array to be filled in.  The pivots array is uninitialized.
3143 {
3144     for (int i = 0; i < n_to_read; i++)
3145         pivots[i] = zero_dbt;
3146 
3147     TOKU_FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
3148 
3149     int result = 0;
3150     for (int i = 0; i < n_to_read; i++) {
3151         int r = bl_read_dbt(&pivots[i], pivots_stream);
3152         if (r != 0) {
3153             result = r;
3154             break;
3155         }
3156     }
3157     return result;
3158 }
3159 
delete_pivots(DBT pivots[],int n)3160 static void delete_pivots(DBT pivots[], int n) {
3161     for (int i = 0; i < n; i++)
3162         toku_free(pivots[i].data);
3163     toku_free(pivots);
3164 }
3165 
setup_nonleaf_block(int n_children,struct subtrees_info * subtrees,FIDX pivots_file,int64_t first_child_offset_in_subtrees,struct subtrees_info * next_subtrees,FIDX next_pivots_file,struct dbout * out,FTLOADER bl,int64_t * blocknum,struct subtree_info ** subtrees_info_p,DBT ** pivots_p)3166 static int setup_nonleaf_block (int n_children,
3167                                 struct subtrees_info *subtrees,         FIDX pivots_file,        int64_t first_child_offset_in_subtrees,
3168                                 struct subtrees_info *next_subtrees,    FIDX next_pivots_file,
3169                                 struct dbout *out, FTLOADER bl,
3170                                 /*out*/int64_t *blocknum,
3171                                 /*out*/struct subtree_info **subtrees_info_p,
3172                                 /*out*/DBT **pivots_p)
3173 // Do the serial part of setting up a non leaf block.
3174 //   Read the pivots out of the file, and store them in a newly allocated array of DBTs (returned in *pivots_p)  There are (n_blocks_to_use-1) of these.
3175 //   Copy the final pivot into the next_pivots file instead of returning it.
3176 //   Copy the subtree_info from the subtrees structure, and store them in a newly allocated array of subtree_infos (return in *subtrees_info_p).  There are n_blocks_to_use of these.
3177 //   Allocate a block number and return it in *blocknum.
3178 //   Store the blocknum in the next_blocks structure, so it can be combined with the pivots at the next level of the tree.
3179 //   Update n_blocks_used and n_translations.
3180 // This code cannot be called in parallel because of all the race conditions.
3181 // The actual creation of the node can be called in parallel after this work is done.
3182 {
3183     //printf("Nonleaf has children :"); for(int i=0; i<n_children; i++) printf(" %ld", subtrees->subtrees[i].block); printf("\n");
3184 
3185     int result = 0;
3186 
3187     DBT *MALLOC_N(n_children, pivots);
3188     if (pivots == NULL) {
3189         result = get_error_errno();
3190     }
3191 
3192     if (result == 0) {
3193         int r = read_some_pivots(pivots_file, n_children, bl, pivots);
3194         if (r)
3195             result = r;
3196     }
3197 
3198     if (result == 0) {
3199         TOKU_FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
3200         int r = bl_write_dbt(
3201             &pivots[n_children - 1], next_pivots_stream, NULL, nullptr, bl);
3202         if (r)
3203             result = r;
3204     }
3205 
3206     if (result == 0) {
3207         // The last pivot was written to the next_pivots file, so we free it now instead of returning it.
3208         toku_free(pivots[n_children-1].data);
3209         pivots[n_children-1] = zero_dbt;
3210 
3211         struct subtree_info *XMALLOC_N(n_children, subtrees_array);
3212         for (int i = 0; i < n_children; i++) {
3213             int64_t from_blocknum = first_child_offset_in_subtrees + i;
3214             subtrees_array[i] = subtrees->subtrees[from_blocknum];
3215         }
3216 
3217         int r = allocate_block(out, blocknum);
3218         if (r) {
3219             toku_free(subtrees_array);
3220             result = r;
3221         } else {
3222             allocate_node(next_subtrees, *blocknum);
3223 
3224             *pivots_p = pivots;
3225             *subtrees_info_p = subtrees_array;
3226         }
3227     }
3228 
3229     if (result != 0) {
3230         if (pivots) {
3231             delete_pivots(pivots, n_children); pivots = NULL;
3232         }
3233     }
3234 
3235     return result;
3236 }
3237 
write_nonleaf_node(FTLOADER bl,struct dbout * out,int64_t blocknum_of_new_node,int n_children,DBT * pivots,struct subtree_info * subtree_info,int height,const DESCRIPTOR UU (desc),uint32_t UU (target_nodesize),uint32_t target_basementnodesize,enum toku_compression_method target_compression_method)3238 static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children,
3239                                 DBT *pivots, /* must free this array, as well as the things it points t */
3240                                 struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc), uint32_t UU(target_nodesize), uint32_t target_basementnodesize, enum toku_compression_method target_compression_method)
3241 {
3242     //Nodes do not currently touch descriptors
3243     invariant(height > 0);
3244 
3245     int result = 0;
3246 
3247     FTNODE XMALLOC(node);
3248     toku_initialize_empty_ftnode(node, make_blocknum(blocknum_of_new_node), height, n_children,
3249                                   FT_LAYOUT_VERSION, 0);
3250     node->pivotkeys.create_from_dbts(pivots, n_children - 1);
3251     assert(node->bp);
3252     for (int i=0; i<n_children; i++) {
3253         BP_BLOCKNUM(node,i)  = make_blocknum(subtree_info[i].block);
3254         BP_STATE(node,i) = PT_AVAIL;
3255     }
3256 
3257     FTNODE_DISK_DATA ndd = NULL;
3258     if (result == 0) {
3259         size_t n_bytes;
3260         size_t n_uncompressed_bytes;
3261         char *bytes;
3262         int r;
3263         r = toku_serialize_ftnode_to_memory(node, &ndd, target_basementnodesize, target_compression_method, true, true, &n_bytes, &n_uncompressed_bytes, &bytes);
3264         if (r) {
3265             result = r;
3266         } else {
3267             dbout_lock(out);
3268             out->translation[blocknum_of_new_node].off = out->current_off;
3269             out->translation[blocknum_of_new_node].size = n_bytes;
3270             //fprintf(stderr, "Wrote internal node at %ld (%ld bytes)\n", out->current_off, n_bytes);
3271             //for (uint32_t i=0; i<n_bytes; i++) { unsigned char b = bytes[i]; printf("%d:%02x (%d) ('%c')\n", i, b, b, (b>=' ' && b<128) ? b : '*'); }
3272             r = write_literal(out, bytes, n_bytes);
3273             if (r)
3274                 result = r;
3275             else
3276                 seek_align_locked(out);
3277             dbout_unlock(out);
3278             toku_free(bytes);
3279         }
3280     }
3281 
3282     for (int i=0; i<n_children-1; i++) {
3283         toku_free(pivots[i].data);
3284     }
3285     for (int i=0; i<n_children; i++) {
3286         destroy_nonleaf_childinfo(BNC(node,i));
3287     }
3288     toku_free(pivots);
3289     // TODO: Should be using toku_destroy_ftnode_internals, which should be renamed to toku_ftnode_destroy
3290     toku_free(node->bp);
3291     node->pivotkeys.destroy();
3292     toku_free(node);
3293     toku_free(ndd);
3294     toku_free(subtree_info);
3295 
3296     if (result != 0)
3297         ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
3298 }
3299 
write_nonleaves(FTLOADER bl,FIDX pivots_fidx,struct dbout * out,struct subtrees_info * sts,const DESCRIPTOR descriptor,uint32_t target_nodesize,uint32_t target_basementnodesize,enum toku_compression_method target_compression_method)3300 static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) {
3301     int result = 0;
3302     int height = 1;
3303 
3304     // Watch out for the case where we saved the last pivot but didn't write any more nodes out.
3305     // The trick is not to look at n_pivots, but to look at blocks.n_blocks
3306     while (sts->n_subtrees > 1) {
3307         // If there is more than one block in blocks, then we must build another level of the tree.
3308 
3309         // we need to create a pivots file for the pivots of the next level.
3310         // and a blocks_array
3311         // So for example.
3312         //  1) we grab 16 pivots and 16 blocks.
3313         //  2) We put the 15 pivots and 16 blocks into an non-leaf node.
3314         //  3) We put the 16th pivot into the next pivots file.
3315         {
3316             int r =
3317                 fseek(toku_bl_fidx2file(bl, pivots_fidx)->file, 0, SEEK_SET);
3318             if (r != 0) {
3319                 return get_error_errno();
3320             }
3321         }
3322 
3323         FIDX next_pivots_file;
3324         {
3325             int r = ft_loader_open_temp_file (bl, &next_pivots_file);
3326             if (r != 0) { result = r; break; }
3327         }
3328 
3329         struct subtrees_info next_sts;
3330         subtrees_info_init(&next_sts);
3331         next_sts.n_subtrees = 0;
3332         next_sts.n_subtrees_limit = 1;
3333         XMALLOC_N(next_sts.n_subtrees_limit, next_sts.subtrees);
3334 
3335         const int n_per_block = 15;
3336         int64_t n_subtrees_used = 0;
3337         while (sts->n_subtrees - n_subtrees_used >= n_per_block*2) {
3338             // grab the first N_PER_BLOCK and build a node.
3339             DBT *pivots;
3340             int64_t blocknum_of_new_node = 0;
3341             struct subtree_info *subtree_info;
3342             int r = setup_nonleaf_block (n_per_block,
3343                                          sts, pivots_fidx, n_subtrees_used,
3344                                          &next_sts, next_pivots_file,
3345                                          out, bl,
3346                                          &blocknum_of_new_node, &subtree_info, &pivots);
3347             if (r) {
3348                 result = r;
3349                 break;
3350             } else {
3351                 write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node.
3352                 n_subtrees_used += n_per_block;
3353             }
3354         }
3355 
3356         int64_t n_blocks_left = sts->n_subtrees - n_subtrees_used;
3357         if (result == 0) {
3358             // Now we have a one or two blocks at the end to handle.
3359             invariant(n_blocks_left>=2);
3360             if (n_blocks_left > n_per_block) {
3361                 // Write half the remaining blocks
3362                 int64_t n_first = n_blocks_left/2;
3363                 DBT *pivots;
3364                 int64_t blocknum_of_new_node;
3365                 struct subtree_info *subtree_info;
3366                 int r = setup_nonleaf_block(n_first,
3367                                             sts, pivots_fidx, n_subtrees_used,
3368                                             &next_sts, next_pivots_file,
3369                                             out, bl,
3370                                             &blocknum_of_new_node, &subtree_info, &pivots);
3371                 if (r) {
3372                     result = r;
3373                 } else {
3374                     write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
3375                     n_blocks_left -= n_first;
3376                     n_subtrees_used += n_first;
3377                 }
3378             }
3379         }
3380         if (result == 0) {
3381             // Write the last block.
3382             DBT *pivots;
3383             int64_t blocknum_of_new_node;
3384             struct subtree_info *subtree_info;
3385             int r = setup_nonleaf_block(n_blocks_left,
3386                                         sts, pivots_fidx, n_subtrees_used,
3387                                         &next_sts, next_pivots_file,
3388                                         out, bl,
3389                                         &blocknum_of_new_node, &subtree_info, &pivots);
3390             if (r) {
3391                 result = r;
3392             } else {
3393                 write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
3394                 n_subtrees_used += n_blocks_left;
3395             }
3396         }
3397         if (result == 0)
3398             invariant(n_subtrees_used == sts->n_subtrees);
3399 
3400 
3401         if (result == 0) // pick up write_nonleaf_node errors
3402             result = ft_loader_get_error(&bl->error_callback);
3403 
3404         // Now set things up for the next iteration.
3405         int r = ft_loader_fi_close(&bl->file_infos, pivots_fidx, true); if (r != 0 && result == 0) result = r;
3406         r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx);    if (r != 0 && result == 0) result = r;
3407         pivots_fidx = next_pivots_file;
3408         toku_free(sts->subtrees); sts->subtrees = NULL;
3409         *sts = next_sts;
3410         height++;
3411 
3412         if (result)
3413             break;
3414     }
3415     { int r = ft_loader_fi_close (&bl->file_infos, pivots_fidx, true); if (r != 0 && result == 0) result = r; }
3416     { int r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r; }
3417     return result;
3418 }
3419 
ft_loader_set_fractal_workers_count_from_c(FTLOADER bl)3420 void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl) {
3421     ft_loader_set_fractal_workers_count (bl);
3422 }
3423 
3424 
3425