1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #pragma once
40 
41 #include <db.h>
42 
43 #include "portability/toku_pthread.h"
44 
45 #include "loader/dbufio.h"
46 #include "loader/loader.h"
47 #include "util/queue.h"
48 
49 enum {
50     EXTRACTOR_QUEUE_DEPTH = 2,
51     FILE_BUFFER_SIZE  = 1<<24,
52     MIN_ROWSET_MEMORY = 1<<23,
53     MIN_MERGE_FANIN   = 2,
54     FRACTAL_WRITER_QUEUE_DEPTH = 3,
55     FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
56     DBUFIO_DEPTH = 2,
57     TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
58     MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much
59     MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE
60 };
61 
62 /* These functions are exported to allow the tests to compile. */
63 
64 /* These structures maintain a collection of all the open temporary files used by the loader. */
65 struct file_info {
66     bool is_open;
67     bool is_extant;  // if true, the file must be unlinked.
68     char *fname;
69     TOKU_FILE *file;
70     uint64_t n_rows;  // how many rows were written into that file
71     size_t buffer_size;
72     void *buffer;
73 };
74 struct file_infos {
75     int n_files;
76     int n_files_limit;
77     struct file_info *file_infos;
78     int n_files_open, n_files_extant;
79     toku_mutex_t lock; // must protect this data structure because current activity performs a REALLOC(fi->file_infos).
80 };
81 typedef struct fidx { int idx; } FIDX;
82 static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1};
83 static int fidx_is_null(const FIDX f) __attribute__((__unused__));
fidx_is_null(const FIDX f)84 static int fidx_is_null(const FIDX f) { return f.idx == -1; }
85 TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i);
86 
87 int ft_loader_open_temp_file(FTLOADER bl, FIDX *file_idx);
88 
89 /* These data structures are used for manipulating a collection of rows in main memory. */
90 struct row {
91     size_t off; // the offset in the data array.
92     int   klen,vlen;
93 };
94 struct rowset {
95     uint64_t memory_budget;
96     size_t n_rows, n_rows_limit;
97     struct row *rows;
98     size_t n_bytes, n_bytes_limit;
99     char *data;
100 };
101 
102 int init_rowset (struct rowset *rows, uint64_t memory_budget);
103 void destroy_rowset(struct rowset *rows);
104 int add_row(struct rowset *rows, DBT *key, DBT *val);
105 
106 int loader_write_row(DBT *key,
107                      DBT *val,
108                      FIDX data,
109                      TOKU_FILE *,
110                      uint64_t *dataoff,
111                      struct wbuf *wb,
112                      FTLOADER bl);
113 int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val);
114 
115 struct merge_fileset {
116     bool have_sorted_output;  // Is there an previous key?
117     FIDX sorted_output;       // this points to one of the data_fidxs.  If output_is_sorted then this is the file containing sorted data.  It's still open
118     DBT  prev_key;            // What is it?  If it's here, its the last output in the merge fileset
119 
120     int n_temp_files, n_temp_files_limit;
121     FIDX *data_fidxs;
122 };
123 
124 void init_merge_fileset (struct merge_fileset *fs);
125 void destroy_merge_fileset (struct merge_fileset *fs);
126 
127 struct poll_callback_s {
128     ft_loader_poll_func poll_function;
129     void *poll_extra;
130 };
131 typedef struct poll_callback_s *ft_loader_poll_callback;
132 
133 int ft_loader_init_poll_callback(ft_loader_poll_callback);
134 
135 void ft_loader_destroy_poll_callback(ft_loader_poll_callback);
136 
137 void ft_loader_set_poll_function(ft_loader_poll_callback, ft_loader_poll_func poll_function, void *poll_extra);
138 
139 int ft_loader_call_poll_function(ft_loader_poll_callback, float progress);
140 
141 struct error_callback_s {
142     int error;
143     ft_loader_error_func error_callback;
144     void *extra;
145     DB *db;
146     int which_db;
147     DBT key;
148     DBT val;
149     bool did_callback;
150     toku_mutex_t mutex;
151 };
152 typedef struct error_callback_s *ft_loader_error_callback;
153 
154 void ft_loader_init_error_callback(ft_loader_error_callback);
155 
156 void ft_loader_destroy_error_callback(ft_loader_error_callback);
157 
158 int ft_loader_get_error(ft_loader_error_callback);
159 
160 void ft_loader_set_error_function(ft_loader_error_callback, ft_loader_error_func error_function, void *extra);
161 
162 int ft_loader_set_error(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
163 
164 int ft_loader_call_error_function(ft_loader_error_callback);
165 
166 int ft_loader_set_error_and_callback(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
167 
168 struct ft_loader_s {
169     // These two are set in the close function, and used while running close
170     struct error_callback_s error_callback;
171     struct poll_callback_s poll_callback;
172 
173     generate_row_for_put_func generate_row_for_put;
174     ft_compare_func *bt_compare_funs;
175 
176     DB *src_db;
177     int N;
178     DB **dbs; // N of these
179     DESCRIPTOR *descriptors; // N of these.
180     TXNID      *root_xids_that_created; // N of these.
181     const char **new_fnames_in_env; // N of these.  The file names that the final data will be written to (relative to env).
182 
183     uint64_t *extracted_datasizes; // N of these.
184 
185     struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated.
186     struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread.
187 
188     QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks).  The extractor thread removes them, sorts them, adn writes to file.
189     toku_pthread_t     extractor_thread;     // the thread that takes primary rowset and does extraction and the first level sort and write to file.
190     bool extractor_live;
191 
192     DBT  *last_key;         // for each rowset, remember the most recently output key.  The system may choose not to keep this up-to-date when a rowset is unsorted.  These keys are malloced and ulen maintains the size of the malloced block.
193 
194     struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file.
195     uint64_t n_rows; // how many rows have been put?
196     struct merge_fileset *fs;
197 
198     const char *temp_file_template;
199 
200     CACHETABLE cachetable;
201     bool did_reserve_memory;
202     bool compress_intermediates;
203     bool allow_puts;
204     uint64_t reserved_memory;  // how much memory are we allowed to use?
205 
206     /* To make it easier to recover from errors, we don't use TOKU_FILE*,
207      * instead we use an index into the file_infos. */
208     struct file_infos file_infos;
209 
210 #define PROGRESS_MAX (1 << 16)
211     int progress;       // Progress runs from 0 to PROGRESS_MAX.  When we call the poll function we convert to a float from 0.0 to 1.0
212     // We use an integer so that we can add to the progress using a fetch-and-add instruction.
213 
214     int progress_callback_result; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again).
215 
216     LSN load_lsn; //LSN of the fsynced 'load' log entry.  Write this LSN (as checkpoint_lsn) in ft headers made by this loader.
217     TXNID load_root_xid; //(Root) transaction that performed the load.
218 
219     QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
220     toku_pthread_t *fractal_threads;
221     bool *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread.  (There is no NULL for a pthread_t, so we have to maintain this separately).
222 
223     unsigned fractal_workers; // number of fractal tree writer threads
224 
225     toku_mutex_t mutex;
226     bool mutex_init;
227 };
228 
229 // Set the number of rows in the loader.  Used for test.
230 void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows);
231 
232 // Get the number of rows in the loader.  Used for test.
233 uint64_t toku_ft_loader_get_n_rows(FTLOADER bl);
234 
235 // The data passed into a fractal_thread via pthread_create.
236 struct fractal_thread_args {
237     FTLOADER                bl;
238     const DESCRIPTOR descriptor;
239     int                      fd; // write the ft into fd.
240     int                      progress_allocation;
241     QUEUE                    q;
242     uint64_t                 total_disksize_estimate;
243     int                      errno_result; // the final result.
244     int                      which_db;
245     uint32_t                 target_nodesize;
246     uint32_t                 target_basementnodesize;
247     enum toku_compression_method target_compression_method;
248     uint32_t                 target_fanout;
249 };
250 
251 void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows);
252 uint64_t toku_ft_loader_get_n_rows(FTLOADER bl);
253 
254 int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
255                            int which_db, DB *dest_db, ft_compare_func,
256 			   FTLOADER,
257                            struct rowset *);
258 
259 int merge_files (struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func, int progress_allocation, QUEUE);
260 
261 int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func);
262 
263 int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *);
264 
265 //int write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
266 int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation);
267 
268 int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func);
269 
270 // This is probably only for testing.
271 int toku_loader_write_ft_from_q_in_C (FTLOADER                 bl,
272 				      const DESCRIPTOR         descriptor,
273 				      int                      fd, // write to here
274 				      int                      progress_allocation,
275 				      QUEUE                    q,
276 				      uint64_t                 total_disksize_estimate,
277                                       int                      which_db,
278                                       uint32_t                 target_nodesize,
279                                       uint32_t                 target_basementnodesize,
280                                       enum toku_compression_method target_compression_method,
281                                       uint32_t                 fanout);
282 
283 int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *);
284 
285 int ft_loader_write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
286 
287 int ft_loader_init_file_infos (struct file_infos *fi);
288 void ft_loader_fi_destroy (struct file_infos *fi, bool is_error);
289 int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open);
290 int ft_loader_fi_close_all (struct file_infos *fi);
291 int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode);
292 int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx);
293 
294 int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
295 				   CACHETABLE cachetable,
296 				   generate_row_for_put_func g,
297 				   DB *src_db,
298 				   int N, FT_HANDLE ft_hs[/*N*/], DB* dbs[/*N*/],
299 				   const char *new_fnames_in_env[/*N*/],
300 				   ft_compare_func bt_compare_functions[/*N*/],
301 				   const char *temp_file_template,
302 				   LSN load_lsn,
303                                    TOKUTXN txn,
304                                    bool reserve_memory,
305                                    uint64_t reserve_memory_size,
306                                    bool compress_intermediates,
307                                    bool allow_puts);
308 
309 void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error);
310 
311 // For test purposes only.  (In production, the rowset size is determined by negotiation with the cachetable for some memory.  See #2613.)
312 uint64_t toku_ft_loader_get_rowset_budget_for_testing (void);
313 
314 int toku_ft_loader_finish_extractor(FTLOADER bl);
315 
316 int toku_ft_loader_get_error(FTLOADER bl, int *loader_errno);
317 
318 void ft_loader_lock_init(FTLOADER bl);
319 void ft_loader_lock_destroy(FTLOADER bl);
320 void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl);
321