1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <db.h>
40 #include "ydb-internal.h"
41 #include "indexer.h"
42 #include <ft/log_header.h>
43 #include <ft/cachetable/checkpoint.h>
44 #include "ydb_row_lock.h"
45 #include "ydb_write.h"
46 #include "ydb_db.h"
47 #include <portability/toku_atomic.h>
48 #include <util/status.h>
49 
50 static YDB_WRITE_LAYER_STATUS_S ydb_write_layer_status;
51 #ifdef STATUS_VALUE
52 #undef STATUS_VALUE
53 #endif
54 #define STATUS_VALUE(x) ydb_write_layer_status.status[x].value.num
55 
56 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_write_layer_status, k, c, t, l, inc)
57 
58 static void
ydb_write_layer_status_init(void)59 ydb_write_layer_status_init (void) {
60     // Note, this function initializes the keyname, type, and legend fields.
61     // Value fields are initialized to zero by compiler.
62     STATUS_INIT(YDB_LAYER_NUM_INSERTS,                nullptr, UINT64,   "dictionary inserts", TOKU_ENGINE_STATUS);
63     STATUS_INIT(YDB_LAYER_NUM_INSERTS_FAIL,           nullptr, UINT64,   "dictionary inserts fail", TOKU_ENGINE_STATUS);
64     STATUS_INIT(YDB_LAYER_NUM_DELETES,                nullptr, UINT64,   "dictionary deletes", TOKU_ENGINE_STATUS);
65     STATUS_INIT(YDB_LAYER_NUM_DELETES_FAIL,           nullptr, UINT64,   "dictionary deletes fail", TOKU_ENGINE_STATUS);
66     STATUS_INIT(YDB_LAYER_NUM_UPDATES,                nullptr, UINT64,   "dictionary updates", TOKU_ENGINE_STATUS);
67     STATUS_INIT(YDB_LAYER_NUM_UPDATES_FAIL,           nullptr, UINT64,   "dictionary updates fail", TOKU_ENGINE_STATUS);
68     STATUS_INIT(YDB_LAYER_NUM_UPDATES_BROADCAST,      nullptr, UINT64,   "dictionary broadcast updates", TOKU_ENGINE_STATUS);
69     STATUS_INIT(YDB_LAYER_NUM_UPDATES_BROADCAST_FAIL, nullptr, UINT64,   "dictionary broadcast updates fail", TOKU_ENGINE_STATUS);
70     STATUS_INIT(YDB_LAYER_NUM_MULTI_INSERTS,          nullptr, UINT64,   "dictionary multi inserts", TOKU_ENGINE_STATUS);
71     STATUS_INIT(YDB_LAYER_NUM_MULTI_INSERTS_FAIL,     nullptr, UINT64,   "dictionary multi inserts fail", TOKU_ENGINE_STATUS);
72     STATUS_INIT(YDB_LAYER_NUM_MULTI_DELETES,          nullptr, UINT64,   "dictionary multi deletes", TOKU_ENGINE_STATUS);
73     STATUS_INIT(YDB_LAYER_NUM_MULTI_DELETES_FAIL,     nullptr, UINT64,   "dictionary multi deletes fail", TOKU_ENGINE_STATUS);
74     STATUS_INIT(YDB_LAYER_NUM_MULTI_UPDATES,          nullptr, UINT64,   "dictionary updates multi", TOKU_ENGINE_STATUS);
75     STATUS_INIT(YDB_LAYER_NUM_MULTI_UPDATES_FAIL,     nullptr, UINT64,   "dictionary updates multi fail", TOKU_ENGINE_STATUS);
76     ydb_write_layer_status.initialized = true;
77 }
78 #undef STATUS_INIT
79 
80 void
ydb_write_layer_get_status(YDB_WRITE_LAYER_STATUS statp)81 ydb_write_layer_get_status(YDB_WRITE_LAYER_STATUS statp) {
82     if (!ydb_write_layer_status.initialized)
83         ydb_write_layer_status_init();
84     *statp = ydb_write_layer_status;
85 }
86 
87 
88 static inline uint32_t
get_prelocked_flags(uint32_t flags)89 get_prelocked_flags(uint32_t flags) {
90     uint32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
91     return lock_flags;
92 }
93 
94 // these next two static functions are defined
95 // both here and ydb.c. We should find a good
96 // place for them.
97 static int
ydb_getf_do_nothing(DBT const * UU (key),DBT const * UU (val),void * UU (extra))98 ydb_getf_do_nothing(DBT const* UU(key), DBT const* UU(val), void* UU(extra)) {
99     return 0;
100 }
101 
102 // Check if the available file system space is less than the reserve
103 // Returns ENOSPC if not enough space, othersize 0
104 static inline int
env_check_avail_fs_space(DB_ENV * env)105 env_check_avail_fs_space(DB_ENV *env) {
106     int r = env->i->fs_state == FS_RED ? ENOSPC : 0;
107     if (r) {
108         env->i->enospc_redzone_ctr++;
109     }
110     return r;
111 }
112 
113 // Return 0 if proposed pair do not violate size constraints of DB
114 // (insertion is legal)
115 // Return non zero otherwise.
116 static int
db_put_check_size_constraints(DB * db,const DBT * key,const DBT * val)117 db_put_check_size_constraints(DB *db, const DBT *key, const DBT *val) {
118     int r = 0;
119     unsigned int klimit, vlimit;
120 
121     toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit);
122     if (key->size > klimit) {
123         r = toku_ydb_do_error(db->dbenv, EINVAL,
124                 "The largest key allowed is %u bytes", klimit);
125     } else if (val->size > vlimit) {
126         r = toku_ydb_do_error(db->dbenv, EINVAL,
127                 "The largest value allowed is %u bytes", vlimit);
128     }
129     return r;
130 }
131 
132 //Return 0 if insert is legal
133 static int
db_put_check_overwrite_constraint(DB * db,DB_TXN * txn,DBT * key,uint32_t lock_flags,uint32_t overwrite_flag)134 db_put_check_overwrite_constraint(DB *db, DB_TXN *txn, DBT *key,
135                                   uint32_t lock_flags, uint32_t overwrite_flag) {
136     int r;
137 
138     if (overwrite_flag == 0) { // 0 (yesoverwrite) does not impose constraints.
139         r = 0;
140     } else if (overwrite_flag == DB_NOOVERWRITE) {
141         // Check if (key,anything) exists in dictionary.
142         // If exists, fail.  Otherwise, do insert.
143         // The DB_RMW flag causes the cursor to grab a write lock instead of a read lock on the key if it exists.
144         r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE|DB_RMW, key, ydb_getf_do_nothing, NULL);
145         if (r == DB_NOTFOUND)
146             r = 0;
147         else if (r == 0)
148             r = DB_KEYEXIST;
149         //Any other error is passed through.
150     } else if (overwrite_flag == DB_NOOVERWRITE_NO_ERROR) {
151         r = 0;
152     } else {
153         //Other flags are not (yet) supported.
154         r = EINVAL;
155     }
156     return r;
157 }
158 
159 
160 int
toku_db_del(DB * db,DB_TXN * txn,DBT * key,uint32_t flags,bool holds_mo_lock)161 toku_db_del(DB *db, DB_TXN *txn, DBT *key, uint32_t flags, bool holds_mo_lock) {
162     HANDLE_PANICKED_DB(db);
163     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
164     HANDLE_READ_ONLY_TXN(txn);
165 
166     uint32_t unchecked_flags = flags;
167     //DB_DELETE_ANY means delete regardless of whether it exists in the db.
168     bool error_if_missing = (bool)(!(flags&DB_DELETE_ANY));
169     unchecked_flags &= ~DB_DELETE_ANY;
170     uint32_t lock_flags = get_prelocked_flags(flags);
171     unchecked_flags &= ~lock_flags;
172     bool do_locking = (bool)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
173 
174     int r = 0;
175     if (unchecked_flags!=0) {
176         r = EINVAL;
177     }
178 
179     if (r == 0 && error_if_missing) {
180         //Check if the key exists in the db.
181         r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE|DB_RMW, key, ydb_getf_do_nothing, NULL);
182     }
183     if (r == 0 && do_locking) {
184         //Do locking if necessary.
185         r = toku_db_get_point_write_lock(db, txn, key);
186     }
187     if (r == 0) {
188         //Do the actual deleting.
189         if (!holds_mo_lock) toku_multi_operation_client_lock();
190         toku_ft_delete(db->i->ft_handle, key, txn ? db_txn_struct_i(txn)->tokutxn : 0);
191         if (!holds_mo_lock) toku_multi_operation_client_unlock();
192     }
193 
194     if (r == 0) {
195         STATUS_VALUE(YDB_LAYER_NUM_DELETES)++;  // accountability
196     }
197     else {
198         STATUS_VALUE(YDB_LAYER_NUM_DELETES_FAIL)++;  // accountability
199     }
200     return r;
201 }
202 
203 static int
db_put(DB * db,DB_TXN * txn,DBT * key,DBT * val,int flags,bool do_log)204 db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, int flags, bool do_log) {
205     int r = 0;
206     bool unique = false;
207     enum ft_msg_type type = FT_INSERT;
208     if (flags == DB_NOOVERWRITE) {
209         unique = true;
210     } else if (flags == DB_NOOVERWRITE_NO_ERROR) {
211         type = FT_INSERT_NO_OVERWRITE;
212     } else if (flags != 0) {
213         // All other non-zero flags are unsupported
214         r = EINVAL;
215     }
216     if (r == 0) {
217         TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : nullptr;
218         if (unique) {
219             r = toku_ft_insert_unique(db->i->ft_handle, key, val, ttxn, do_log);
220         } else {
221             toku_ft_maybe_insert(db->i->ft_handle, key, val, ttxn, false, ZERO_LSN, do_log, type);
222         }
223         invariant(r == DB_KEYEXIST || r == 0);
224     }
225     return r;
226 }
227 
228 int
toku_db_put(DB * db,DB_TXN * txn,DBT * key,DBT * val,uint32_t flags,bool holds_mo_lock)229 toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, uint32_t flags, bool holds_mo_lock) {
230     HANDLE_PANICKED_DB(db);
231     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
232     HANDLE_READ_ONLY_TXN(txn);
233     int r = 0;
234 
235     uint32_t lock_flags = get_prelocked_flags(flags);
236     flags &= ~lock_flags;
237 
238     r = db_put_check_size_constraints(db, key, val);
239 
240     //Do locking if necessary.
241     bool do_locking = (bool)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
242     if (r == 0 && do_locking) {
243         r = toku_db_get_point_write_lock(db, txn, key);
244     }
245     if (r == 0) {
246         //Insert into the ft.
247         if (!holds_mo_lock) toku_multi_operation_client_lock();
248         r = db_put(db, txn, key, val, flags, true);
249         if (!holds_mo_lock) toku_multi_operation_client_unlock();
250     }
251 
252     if (r == 0) {
253         // helgrind flags a race on this status update.  we increment it atomically to satisfy helgrind.
254         // STATUS_VALUE(YDB_LAYER_NUM_INSERTS)++;  // accountability
255         (void) toku_sync_fetch_and_add(&STATUS_VALUE(YDB_LAYER_NUM_INSERTS), 1);
256     } else {
257         // STATUS_VALUE(YDB_LAYER_NUM_INSERTS_FAIL)++;  // accountability
258         (void) toku_sync_fetch_and_add(&STATUS_VALUE(YDB_LAYER_NUM_INSERTS_FAIL), 1);
259     }
260 
261     return r;
262 }
263 
264 static int
toku_db_update(DB * db,DB_TXN * txn,const DBT * key,const DBT * update_function_extra,uint32_t flags)265 toku_db_update(DB *db, DB_TXN *txn,
266                const DBT *key,
267                const DBT *update_function_extra,
268                uint32_t flags) {
269     HANDLE_PANICKED_DB(db);
270     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
271     HANDLE_READ_ONLY_TXN(txn);
272     int r = 0;
273 
274     uint32_t lock_flags = get_prelocked_flags(flags);
275     flags &= ~lock_flags;
276 
277     r = db_put_check_size_constraints(db, key, update_function_extra);
278     if (r != 0) { goto cleanup; }
279 
280     bool do_locking;
281     do_locking = (db->i->lt && !(lock_flags & DB_PRELOCKED_WRITE));
282     if (do_locking) {
283         r = toku_db_get_point_write_lock(db, txn, key);
284         if (r != 0) { goto cleanup; }
285     }
286 
287     TOKUTXN ttxn;
288     ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
289     toku_multi_operation_client_lock();
290     toku_ft_maybe_update(db->i->ft_handle, key, update_function_extra, ttxn,
291                               false, ZERO_LSN, true);
292     toku_multi_operation_client_unlock();
293 
294 cleanup:
295     if (r == 0)
296         STATUS_VALUE(YDB_LAYER_NUM_UPDATES)++;  // accountability
297     else
298         STATUS_VALUE(YDB_LAYER_NUM_UPDATES_FAIL)++;  // accountability
299     return r;
300 }
301 
302 
303 // DB_IS_RESETTING_OP is true if the dictionary should be considered as if created by this transaction.
304 // For example, it will be true if toku_db_update_broadcast() is used to implement a schema change (such
305 // as adding a column), and will be false if used simply to update all the rows of a table (such as
306 // incrementing a field).
307 static int
toku_db_update_broadcast(DB * db,DB_TXN * txn,const DBT * update_function_extra,uint32_t flags)308 toku_db_update_broadcast(DB *db, DB_TXN *txn,
309                          const DBT *update_function_extra,
310                          uint32_t flags) {
311     HANDLE_PANICKED_DB(db);
312     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
313     HANDLE_READ_ONLY_TXN(txn);
314     int r = 0;
315 
316     uint32_t lock_flags = get_prelocked_flags(flags);
317     flags &= ~lock_flags;
318     uint32_t is_resetting_op_flag = flags & DB_IS_RESETTING_OP;
319     flags &= is_resetting_op_flag;
320     bool is_resetting_op = (is_resetting_op_flag != 0);
321 
322 
323     if (is_resetting_op) {
324         if (txn->parent != NULL) {
325             r = EINVAL; // cannot have a parent if you are a resetting op
326             goto cleanup;
327         }
328         r = toku_db_pre_acquire_fileops_lock(db, txn);
329         if (r != 0) { goto cleanup; }
330     }
331     {
332         DBT null_key;
333         toku_init_dbt(&null_key);
334         r = db_put_check_size_constraints(db, &null_key, update_function_extra);
335         if (r != 0) { goto cleanup; }
336     }
337 
338     bool do_locking;
339     do_locking = (db->i->lt && !(lock_flags & DB_PRELOCKED_WRITE));
340     if (do_locking) {
341         r = toku_db_pre_acquire_table_lock(db, txn);
342         if (r != 0) { goto cleanup; }
343     }
344 
345     TOKUTXN ttxn;
346     ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
347     toku_multi_operation_client_lock();
348     toku_ft_maybe_update_broadcast(db->i->ft_handle, update_function_extra, ttxn,
349                                         false, ZERO_LSN, true, is_resetting_op);
350     toku_multi_operation_client_unlock();
351 
352 cleanup:
353     if (r == 0)
354         STATUS_VALUE(YDB_LAYER_NUM_UPDATES_BROADCAST)++;  // accountability
355     else
356         STATUS_VALUE(YDB_LAYER_NUM_UPDATES_BROADCAST_FAIL)++;  // accountability
357     return r;
358 }
359 
360 static void
log_del_single(DB_TXN * txn,FT_HANDLE ft_handle,const DBT * key)361 log_del_single(DB_TXN *txn, FT_HANDLE ft_handle, const DBT *key) {
362     TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
363     toku_ft_log_del(ttxn, ft_handle, key);
364 }
365 
366 static uint32_t
sum_size(uint32_t num_arrays,DBT_ARRAY keys[],uint32_t overhead)367 sum_size(uint32_t num_arrays, DBT_ARRAY keys[], uint32_t overhead) {
368     uint32_t sum = 0;
369     for (uint32_t i = 0; i < num_arrays; i++) {
370         for (uint32_t j = 0; j < keys[i].size; j++) {
371             sum += keys[i].dbts[j].size + overhead;
372         }
373     }
374     return sum;
375 }
376 
377 static void
log_del_multiple(DB_TXN * txn,DB * src_db,const DBT * key,const DBT * val,uint32_t num_dbs,FT_HANDLE fts[],DBT_ARRAY keys[])378 log_del_multiple(DB_TXN *txn, DB *src_db, const DBT *key, const DBT *val, uint32_t num_dbs, FT_HANDLE fts[], DBT_ARRAY keys[]) {
379     if (num_dbs > 0) {
380         TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
381         FT_HANDLE src_ft  = src_db ? src_db->i->ft_handle : NULL;
382         uint32_t del_multiple_size = key->size + val->size + num_dbs*sizeof (uint32_t) + toku_log_enq_delete_multiple_overhead;
383         uint32_t del_single_sizes = sum_size(num_dbs, keys, toku_log_enq_delete_any_overhead);
384         if (del_single_sizes < del_multiple_size) {
385             for (uint32_t i = 0; i < num_dbs; i++) {
386                 for (uint32_t j = 0; j < keys[i].size; j++) {
387                     log_del_single(txn, fts[i], &keys[i].dbts[j]);
388                 }
389             }
390         } else {
391             toku_ft_log_del_multiple(ttxn, src_ft, fts, num_dbs, key, val);
392         }
393     }
394 }
395 
396 static uint32_t
lookup_src_db(uint32_t num_dbs,DB * db_array[],DB * src_db)397 lookup_src_db(uint32_t num_dbs, DB *db_array[], DB *src_db) {
398     uint32_t which_db;
399     for (which_db = 0; which_db < num_dbs; which_db++)
400         if (db_array[which_db] == src_db)
401             break;
402     return which_db;
403 }
404 
405 static int
do_del_multiple(DB_TXN * txn,uint32_t num_dbs,DB * db_array[],DBT_ARRAY keys[],DB * src_db,const DBT * src_key,bool indexer_shortcut)406 do_del_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DB *src_db, const DBT *src_key, bool indexer_shortcut) {
407     int r = 0;
408     TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
409     for (uint32_t which_db = 0; r == 0 && which_db < num_dbs; which_db++) {
410         DB *db = db_array[which_db];
411 
412         paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
413 
414         // if db is being indexed by an indexer, then insert a delete message into the db if the src key is to the left or equal to the
415         // indexers cursor.  we have to get the src_db from the indexer and find it in the db_array.
416         int do_delete = true;
417         DB_INDEXER *indexer = toku_db_get_indexer(db);
418         if (indexer && !indexer_shortcut) { // if this db is the index under construction
419             DB *indexer_src_db = toku_indexer_get_src_db(indexer);
420             invariant(indexer_src_db != NULL);
421             const DBT *indexer_src_key;
422             if (src_db == indexer_src_db)
423                 indexer_src_key = src_key;
424             else {
425                 uint32_t which_src_db = lookup_src_db(num_dbs, db_array, indexer_src_db);
426                 invariant(which_src_db < num_dbs);
427                 // The indexer src db must have exactly one item or we don't know how to continue.
428                 invariant(keys[which_src_db].size == 1);
429                 indexer_src_key = &keys[which_src_db].dbts[0];
430             }
431             do_delete = toku_indexer_should_insert_key(indexer, indexer_src_key);
432             toku_indexer_update_estimate(indexer);
433         }
434         if (do_delete) {
435             for (uint32_t i = 0; i < keys[which_db].size; i++) {
436                 toku_ft_maybe_delete(db->i->ft_handle, &keys[which_db].dbts[i], ttxn, false, ZERO_LSN, false);
437             }
438         }
439     }
440     return r;
441 }
442 
443 //
444 // if a hot index is in progress, gets the indexer
445 // also verifies that there is at most one hot index
446 // in progress. If it finds more than one, then returns EINVAL
447 //
448 static int
get_indexer_if_exists(uint32_t num_dbs,DB ** db_array,DB * src_db,DB_INDEXER ** indexerp,bool * src_db_is_indexer_src)449 get_indexer_if_exists(
450     uint32_t num_dbs,
451     DB **db_array,
452     DB *src_db,
453     DB_INDEXER** indexerp,
454     bool *src_db_is_indexer_src
455     )
456 {
457     int r = 0;
458     DB_INDEXER* first_indexer = NULL;
459     for (uint32_t i = 0; i < num_dbs; i++) {
460         DB_INDEXER* indexer = toku_db_get_indexer(db_array[i]);
461         if (indexer) {
462             if (!first_indexer) {
463                 first_indexer = indexer;
464             }
465             else if (first_indexer != indexer) {
466                 r = EINVAL;
467             }
468         }
469     }
470     if (r == 0) {
471         if (first_indexer) {
472             DB* indexer_src_db = toku_indexer_get_src_db(first_indexer);
473             // we should just make this an invariant
474             if (src_db == indexer_src_db) {
475                 *src_db_is_indexer_src = true;
476             }
477         }
478         *indexerp = first_indexer;
479     }
480     return r;
481 }
482 
483 int
env_del_multiple(DB_ENV * env,DB * src_db,DB_TXN * txn,const DBT * src_key,const DBT * src_val,uint32_t num_dbs,DB ** db_array,DBT_ARRAY * keys,uint32_t * flags_array)484 env_del_multiple(
485     DB_ENV *env,
486     DB *src_db,
487     DB_TXN *txn,
488     const DBT *src_key,
489     const DBT *src_val,
490     uint32_t num_dbs,
491     DB **db_array,
492     DBT_ARRAY *keys,
493     uint32_t *flags_array)
494 {
495     int r;
496     DBT_ARRAY del_keys[num_dbs];
497     DB_INDEXER* indexer = NULL;
498 
499     HANDLE_PANICKED_ENV(env);
500     HANDLE_READ_ONLY_TXN(txn);
501 
502     uint32_t lock_flags[num_dbs];
503     uint32_t remaining_flags[num_dbs];
504     FT_HANDLE fts[num_dbs];
505     bool indexer_lock_taken = false;
506     bool src_same = false;
507     bool indexer_shortcut = false;
508     if (!txn) {
509         r = EINVAL;
510         goto cleanup;
511     }
512     if (!env->i->generate_row_for_del) {
513         r = EINVAL;
514         goto cleanup;
515     }
516 
517     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
518     r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
519     if (r) {
520         goto cleanup;
521     }
522 
523     for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
524         DB *db = db_array[which_db];
525         lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
526         remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
527 
528         if (db == src_db) {
529             del_keys[which_db].size = 1;
530             del_keys[which_db].capacity = 1;
531             del_keys[which_db].dbts = const_cast<DBT*>(src_key);
532         }
533         else {
534             //Generate the key
535             r = env->i->generate_row_for_del(db, src_db, &keys[which_db], src_key, src_val);
536             if (r != 0) goto cleanup;
537             del_keys[which_db] = keys[which_db];
538             paranoid_invariant(del_keys[which_db].size <= del_keys[which_db].capacity);
539         }
540 
541         if (remaining_flags[which_db] & ~DB_DELETE_ANY) {
542             r = EINVAL;
543             goto cleanup;
544         }
545         bool error_if_missing = (bool)(!(remaining_flags[which_db]&DB_DELETE_ANY));
546         for (uint32_t which_key = 0; which_key < del_keys[which_db].size; which_key++) {
547             DBT *del_key = &del_keys[which_db].dbts[which_key];
548             if (error_if_missing) {
549                 //Check if the key exists in the db.
550                 //Grabs a write lock
551                 r = db_getf_set(db, txn, lock_flags[which_db]|DB_SERIALIZABLE|DB_RMW, del_key, ydb_getf_do_nothing, NULL);
552                 if (r != 0) goto cleanup;
553             } else if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {  //Do locking if necessary.
554                 //Needs locking
555                 r = toku_db_get_point_write_lock(db, txn, del_key);
556                 if (r != 0) goto cleanup;
557             }
558         }
559         fts[which_db] = db->i->ft_handle;
560     }
561 
562     if (indexer) {
563         // do a cheap check
564         if (src_same) {
565             bool may_insert = toku_indexer_may_insert(indexer, src_key);
566             if (!may_insert) {
567                 toku_indexer_lock(indexer);
568                 indexer_lock_taken = true;
569             }
570             else {
571                 indexer_shortcut = true;
572             }
573         }
574     }
575     toku_multi_operation_client_lock();
576     log_del_multiple(txn, src_db, src_key, src_val, num_dbs, fts, del_keys);
577     r = do_del_multiple(txn, num_dbs, db_array, del_keys, src_db, src_key, indexer_shortcut);
578     toku_multi_operation_client_unlock();
579     if (indexer_lock_taken) {
580         toku_indexer_unlock(indexer);
581     }
582 
583 cleanup:
584     if (r == 0)
585         STATUS_VALUE(YDB_LAYER_NUM_MULTI_DELETES) += num_dbs;  // accountability
586     else
587         STATUS_VALUE(YDB_LAYER_NUM_MULTI_DELETES_FAIL) += num_dbs;  // accountability
588     return r;
589 }
590 
591 static void
log_put_multiple(DB_TXN * txn,DB * src_db,const DBT * src_key,const DBT * src_val,uint32_t num_dbs,FT_HANDLE fts[])592 log_put_multiple(DB_TXN *txn, DB *src_db, const DBT *src_key, const DBT *src_val, uint32_t num_dbs, FT_HANDLE fts[]) {
593     if (num_dbs > 0) {
594         TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
595         FT_HANDLE src_ft  = src_db ? src_db->i->ft_handle : NULL;
596         toku_ft_log_put_multiple(ttxn, src_ft, fts, num_dbs, src_key, src_val);
597     }
598 }
599 
600 // Requires: If remaining_flags is non-null, this function performs any required uniqueness checks
601 //           Otherwise, the caller is responsible.
602 static int
do_put_multiple(DB_TXN * txn,uint32_t num_dbs,DB * db_array[],DBT_ARRAY keys[],DBT_ARRAY vals[],uint32_t * remaining_flags,DB * src_db,const DBT * src_key,bool indexer_shortcut)603 do_put_multiple(DB_TXN *txn, uint32_t num_dbs, DB *db_array[], DBT_ARRAY keys[], DBT_ARRAY vals[], uint32_t *remaining_flags, DB *src_db, const DBT *src_key, bool indexer_shortcut) {
604     int r = 0;
605     for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
606         DB *db = db_array[which_db];
607 
608         invariant(keys[which_db].size == vals[which_db].size);
609         paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
610         paranoid_invariant(vals[which_db].size <= vals[which_db].capacity);
611 
612         if (keys[which_db].size > 0) {
613             bool do_put = true;
614             DB_INDEXER *indexer = toku_db_get_indexer(db);
615             if (indexer && !indexer_shortcut) { // if this db is the index under construction
616                 DB *indexer_src_db = toku_indexer_get_src_db(indexer);
617                 invariant(indexer_src_db != NULL);
618                 const DBT *indexer_src_key;
619                 if (src_db == indexer_src_db)
620                     indexer_src_key = src_key;
621                 else {
622                     uint32_t which_src_db = lookup_src_db(num_dbs, db_array, indexer_src_db);
623                     invariant(which_src_db < num_dbs);
624                     // The indexer src db must have exactly one item or we don't know how to continue.
625                     invariant(keys[which_src_db].size == 1);
626                     indexer_src_key = &keys[which_src_db].dbts[0];
627                 }
628                 do_put = toku_indexer_should_insert_key(indexer, indexer_src_key);
629                 toku_indexer_update_estimate(indexer);
630             }
631             if (do_put) {
632                 for (uint32_t i = 0; i < keys[which_db].size; i++) {
633                     int flags = 0;
634                     if (remaining_flags != nullptr) {
635                         flags = remaining_flags[which_db];
636                         invariant(!(flags & DB_NOOVERWRITE_NO_ERROR));
637                     }
638                     r = db_put(db, txn, &keys[which_db].dbts[i], &vals[which_db].dbts[i], flags, false);
639                     if (r != 0) {
640                         goto done;
641                     }
642                 }
643             }
644         }
645     }
646 done:
647     return r;
648 }
649 
650 static int
env_put_multiple_internal(DB_ENV * env,DB * src_db,DB_TXN * txn,const DBT * src_key,const DBT * src_val,uint32_t num_dbs,DB ** db_array,DBT_ARRAY * keys,DBT_ARRAY * vals,uint32_t * flags_array)651 env_put_multiple_internal(
652     DB_ENV *env,
653     DB *src_db,
654     DB_TXN *txn,
655     const DBT *src_key,
656     const DBT *src_val,
657     uint32_t num_dbs,
658     DB **db_array,
659     DBT_ARRAY *keys,
660     DBT_ARRAY *vals,
661     uint32_t *flags_array)
662 {
663     int r;
664     DBT_ARRAY put_keys[num_dbs];
665     DBT_ARRAY put_vals[num_dbs];
666     DB_INDEXER* indexer = NULL;
667 
668     HANDLE_PANICKED_ENV(env);
669     HANDLE_READ_ONLY_TXN(txn);
670 
671     uint32_t lock_flags[num_dbs];
672     uint32_t remaining_flags[num_dbs];
673     FT_HANDLE fts[num_dbs];
674     bool indexer_shortcut = false;
675     bool indexer_lock_taken = false;
676     bool src_same = false;
677 
678     if (!txn || !num_dbs) {
679         r = EINVAL;
680         goto cleanup;
681     }
682     if (!env->i->generate_row_for_put) {
683         r = EINVAL;
684         goto cleanup;
685     }
686 
687     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
688     r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
689     if (r) {
690         goto cleanup;
691     }
692 
693     for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
694         DB *db = db_array[which_db];
695 
696         lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
697         remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
698 
699         //Generate the row
700         if (db == src_db) {
701             put_keys[which_db].size = put_keys[which_db].capacity = 1;
702             put_keys[which_db].dbts = const_cast<DBT*>(src_key);
703 
704             put_vals[which_db].size = put_vals[which_db].capacity = 1;
705             put_vals[which_db].dbts = const_cast<DBT*>(src_val);
706         }
707         else {
708             r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], src_key, src_val);
709             if (r != 0) goto cleanup;
710 
711             paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
712             paranoid_invariant(vals[which_db].size <= vals[which_db].capacity);
713             paranoid_invariant(keys[which_db].size == vals[which_db].size);
714 
715             put_keys[which_db] = keys[which_db];
716             put_vals[which_db] = vals[which_db];
717         }
718         for (uint32_t i = 0; i < put_keys[which_db].size; i++) {
719             DBT &put_key = put_keys[which_db].dbts[i];
720             DBT &put_val = put_vals[which_db].dbts[i];
721 
722             // check size constraints
723             r = db_put_check_size_constraints(db, &put_key, &put_val);
724             if (r != 0) goto cleanup;
725 
726             if (remaining_flags[which_db] == DB_NOOVERWRITE_NO_ERROR) {
727                 //put_multiple does not support delaying the no error, since we would
728                 //have to log the flag in the put_multiple.
729                 r = EINVAL; goto cleanup;
730             }
731 
732             //Do locking if necessary.
733             if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
734                 //Needs locking
735                 r = toku_db_get_point_write_lock(db, txn, &put_key);
736                 if (r != 0) goto cleanup;
737             }
738         }
739         fts[which_db] = db->i->ft_handle;
740     }
741 
742     if (indexer) {
743         // do a cheap check
744         if (src_same) {
745             bool may_insert = toku_indexer_may_insert(indexer, src_key);
746             if (!may_insert) {
747                 toku_indexer_lock(indexer);
748                 indexer_lock_taken = true;
749             }
750             else {
751                 indexer_shortcut = true;
752             }
753         }
754     }
755     toku_multi_operation_client_lock();
756     r = do_put_multiple(txn, num_dbs, db_array, put_keys, put_vals, remaining_flags, src_db, src_key, indexer_shortcut);
757     if (r == 0) {
758         log_put_multiple(txn, src_db, src_key, src_val, num_dbs, fts);
759     }
760     toku_multi_operation_client_unlock();
761     if (indexer_lock_taken) {
762         toku_indexer_unlock(indexer);
763     }
764 
765 cleanup:
766     if (r == 0)
767         STATUS_VALUE(YDB_LAYER_NUM_MULTI_INSERTS) += num_dbs;  // accountability
768     else
769         STATUS_VALUE(YDB_LAYER_NUM_MULTI_INSERTS_FAIL) += num_dbs;  // accountability
770     return r;
771 }
772 
swap_dbts(DBT * a,DBT * b)773 static void swap_dbts(DBT *a, DBT *b) {
774     DBT c;
775     c = *a;
776     *a = *b;
777     *b = c;
778 }
779 
780 //TODO: 26 Add comment in API description about.. new val.size being generated as '0' REQUIRES old_val.size == 0
781 //
782 int
env_update_multiple(DB_ENV * env,DB * src_db,DB_TXN * txn,DBT * old_src_key,DBT * old_src_data,DBT * new_src_key,DBT * new_src_data,uint32_t num_dbs,DB ** db_array,uint32_t * flags_array,uint32_t num_keys,DBT_ARRAY keys[],uint32_t num_vals,DBT_ARRAY vals[])783 env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
784                     DBT *old_src_key, DBT *old_src_data,
785                     DBT *new_src_key, DBT *new_src_data,
786                     uint32_t num_dbs, DB **db_array, uint32_t* flags_array,
787                     uint32_t num_keys, DBT_ARRAY keys[],
788                     uint32_t num_vals, DBT_ARRAY vals[]) {
789     int r = 0;
790 
791     HANDLE_PANICKED_ENV(env);
792     DB_INDEXER* indexer = NULL;
793     bool indexer_shortcut = false;
794     bool indexer_lock_taken = false;
795     bool src_same = false;
796     HANDLE_READ_ONLY_TXN(txn);
797     DBT_ARRAY old_key_arrays[num_dbs];
798     DBT_ARRAY new_key_arrays[num_dbs];
799     DBT_ARRAY new_val_arrays[num_dbs];
800 
801     if (!txn) {
802         r = EINVAL;
803         goto cleanup;
804     }
805     if (!env->i->generate_row_for_put) {
806         r = EINVAL;
807         goto cleanup;
808     }
809 
810     if (num_dbs + num_dbs > num_keys || num_dbs > num_vals) {
811         r = ENOMEM; goto cleanup;
812     }
813 
814     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
815     r = get_indexer_if_exists(num_dbs, db_array, src_db, &indexer, &src_same);
816     if (r) {
817         goto cleanup;
818     }
819 
820     {
821         uint32_t n_del_dbs = 0;
822         DB *del_dbs[num_dbs];
823         FT_HANDLE del_fts[num_dbs];
824         DBT_ARRAY del_key_arrays[num_dbs];
825 
826         uint32_t n_put_dbs = 0;
827         DB *put_dbs[num_dbs];
828         FT_HANDLE put_fts[num_dbs];
829         DBT_ARRAY put_key_arrays[num_dbs];
830         DBT_ARRAY put_val_arrays[num_dbs];
831 
832         uint32_t lock_flags[num_dbs];
833         uint32_t remaining_flags[num_dbs];
834 
835         for (uint32_t which_db = 0; which_db < num_dbs; which_db++) {
836             DB *db = db_array[which_db];
837 
838             lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
839             remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
840 
841             if (db == src_db) {
842                 // Copy the old keys
843                 old_key_arrays[which_db].size = old_key_arrays[which_db].capacity = 1;
844                 old_key_arrays[which_db].dbts = old_src_key;
845 
846                 // Copy the new keys and vals
847                 new_key_arrays[which_db].size = new_key_arrays[which_db].capacity = 1;
848                 new_key_arrays[which_db].dbts = new_src_key;
849 
850                 new_val_arrays[which_db].size = new_val_arrays[which_db].capacity = 1;
851                 new_val_arrays[which_db].dbts = new_src_data;
852             } else {
853                 // keys[0..num_dbs-1] are the new keys
854                 // keys[num_dbs..2*num_dbs-1] are the old keys
855                 // vals[0..num_dbs-1] are the new vals
856 
857                 // Generate the old keys
858                 r = env->i->generate_row_for_put(db, src_db, &keys[which_db + num_dbs], NULL, old_src_key, old_src_data);
859                 if (r != 0) goto cleanup;
860 
861                 paranoid_invariant(keys[which_db+num_dbs].size <= keys[which_db+num_dbs].capacity);
862                 old_key_arrays[which_db] = keys[which_db+num_dbs];
863 
864                 // Generate the new keys and vals
865                 r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], new_src_key, new_src_data);
866                 if (r != 0) goto cleanup;
867 
868                 paranoid_invariant(keys[which_db].size <= keys[which_db].capacity);
869                 paranoid_invariant(vals[which_db].size <= vals[which_db].capacity);
870                 paranoid_invariant(keys[which_db].size == vals[which_db].size);
871 
872                 new_key_arrays[which_db] = keys[which_db];
873                 new_val_arrays[which_db] = vals[which_db];
874             }
875             DBT_ARRAY &old_keys = old_key_arrays[which_db];
876             DBT_ARRAY &new_keys = new_key_arrays[which_db];
877             DBT_ARRAY &new_vals = new_val_arrays[which_db];
878 
879             uint32_t num_skip = 0;
880             uint32_t num_del = 0;
881             uint32_t num_put = 0;
882             // Next index in old_keys to look at
883             uint32_t idx_old = 0;
884             // Next index in new_keys/new_vals to look at
885             uint32_t idx_new = 0;
886             uint32_t idx_old_used = 0;
887             uint32_t idx_new_used = 0;
888             while (idx_old < old_keys.size || idx_new < new_keys.size) {
889                 // Check for old key, both, new key
890                 DBT *curr_old_key = &old_keys.dbts[idx_old];
891                 DBT *curr_new_key = &new_keys.dbts[idx_new];
892                 DBT *curr_new_val = &new_vals.dbts[idx_new];
893 
894                 bool locked_new_key = false;
895                 int cmp;
896                 if (idx_new == new_keys.size) {
897                     cmp = -1;
898                 } else if (idx_old == old_keys.size) {
899                     cmp = +1;
900                 } else {
901                     const toku::comparator &cmpfn = toku_db_get_comparator(db);
902                     cmp = cmpfn(curr_old_key, curr_new_key);
903                 }
904 
905                 bool do_del = false;
906                 bool do_put = false;
907                 bool do_skip = false;
908                 if (cmp > 0) { // New key does not exist in old array
909                     //Check overwrite constraints only in the case where the keys are not equal
910                     //(new key is alone/not equal to old key)
911                     // If the keys are equal, then we do not care of the flag is DB_NOOVERWRITE or 0
912                     r = db_put_check_overwrite_constraint(db, txn,
913                                                           curr_new_key,
914                                                           lock_flags[which_db], remaining_flags[which_db]);
915                     if (r != 0) goto cleanup;
916                     if (remaining_flags[which_db] == DB_NOOVERWRITE) {
917                         locked_new_key = true;
918                     }
919 
920                     if (remaining_flags[which_db] == DB_NOOVERWRITE_NO_ERROR) {
921                         //update_multiple does not support delaying the no error, since we would
922                         //have to log the flag in the put_multiple.
923                         r = EINVAL; goto cleanup;
924                     }
925                     do_put = true;
926                 } else if (cmp < 0) {
927                     // lock old key only when it does not exist in new array
928                     // otherwise locking new key takes care of this
929                     if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
930                         r = toku_db_get_point_write_lock(db, txn, curr_old_key);
931                         if (r != 0) goto cleanup;
932                     }
933                     do_del = true;
934                 } else {
935                     do_put = curr_new_val->size > 0 ||
936                                 curr_old_key->size != curr_new_key->size ||
937                                 memcmp(curr_old_key->data, curr_new_key->data, curr_old_key->size);
938                     do_skip = !do_put;
939                 }
940                 // Check put size constraints and insert new key only if keys are unequal (byte for byte) or there is a val
941                 // We assume any val.size > 0 as unequal (saves on generating old val)
942                 //      (allows us to avoid generating the old val)
943                 // we assume that any new vals with size > 0 are different than the old val
944                 // if (!key_eq || !(dbt_cmp(&vals[which_db], &vals[which_db + num_dbs]) == 0)) { /* ... */ }
945                 if (do_put) {
946                     r = db_put_check_size_constraints(db, curr_new_key, curr_new_val);
947                     if (r != 0) goto cleanup;
948 
949                     // lock new key unless already locked
950                     if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE) && !locked_new_key) {
951                         r = toku_db_get_point_write_lock(db, txn, curr_new_key);
952                         if (r != 0) goto cleanup;
953                     }
954                 }
955 
956                 // TODO: 26 Add comments explaining squish and why not just use another stack array
957                 // Add more comments to explain this if elseif else well
958                 if (do_skip) {
959                     paranoid_invariant(cmp == 0);
960                     paranoid_invariant(!do_put);
961                     paranoid_invariant(!do_del);
962 
963                     num_skip++;
964                     idx_old++;
965                     idx_new++;
966                 } else if (do_put) {
967                     paranoid_invariant(cmp >= 0);
968                     paranoid_invariant(!do_skip);
969                     paranoid_invariant(!do_del);
970 
971                     num_put++;
972                     if (idx_new != idx_new_used) {
973                         swap_dbts(&new_keys.dbts[idx_new_used], &new_keys.dbts[idx_new]);
974                         swap_dbts(&new_vals.dbts[idx_new_used], &new_vals.dbts[idx_new]);
975                     }
976                     idx_new++;
977                     idx_new_used++;
978                     if (cmp == 0) {
979                         idx_old++;
980                     }
981                 } else {
982                     invariant(do_del);
983                     paranoid_invariant(cmp < 0);
984                     paranoid_invariant(!do_skip);
985                     paranoid_invariant(!do_put);
986 
987                     num_del++;
988                     if (idx_old != idx_old_used) {
989                         swap_dbts(&old_keys.dbts[idx_old_used], &old_keys.dbts[idx_old]);
990                     }
991                     idx_old++;
992                     idx_old_used++;
993                 }
994             }
995             old_keys.size = idx_old_used;
996             new_keys.size = idx_new_used;
997             new_vals.size = idx_new_used;
998 
999             if (num_del > 0) {
1000                 del_dbs[n_del_dbs] = db;
1001                 del_fts[n_del_dbs] = db->i->ft_handle;
1002                 del_key_arrays[n_del_dbs] = old_keys;
1003                 n_del_dbs++;
1004             }
1005             // If we put none, but delete some, but not all, then we need the log_put_multiple to happen.
1006             // Include this db in the put_dbs so we do log_put_multiple.
1007             // do_put_multiple will be a no-op for this db.
1008             if (num_put > 0 || (num_del > 0 && num_skip > 0)) {
1009                 put_dbs[n_put_dbs] = db;
1010                 put_fts[n_put_dbs] = db->i->ft_handle;
1011                 put_key_arrays[n_put_dbs] = new_keys;
1012                 put_val_arrays[n_put_dbs] = new_vals;
1013                 n_put_dbs++;
1014             }
1015         }
1016         if (indexer) {
1017             // do a cheap check
1018             if (src_same) {
1019                 bool may_insert =
1020                     toku_indexer_may_insert(indexer, old_src_key) &&
1021                     toku_indexer_may_insert(indexer, new_src_key);
1022                 if (!may_insert) {
1023                     toku_indexer_lock(indexer);
1024                     indexer_lock_taken = true;
1025                 }
1026                 else {
1027                     indexer_shortcut = true;
1028                 }
1029             }
1030         }
1031         toku_multi_operation_client_lock();
1032         if (r == 0 && n_del_dbs > 0) {
1033             log_del_multiple(txn, src_db, old_src_key, old_src_data, n_del_dbs, del_fts, del_key_arrays);
1034             r = do_del_multiple(txn, n_del_dbs, del_dbs, del_key_arrays, src_db, old_src_key, indexer_shortcut);
1035         }
1036 
1037         if (r == 0 && n_put_dbs > 0) {
1038             // We sometimes skip some keys for del/put during runtime, but during recovery
1039             // we (may) delete ALL the keys for a given DB.  Therefore we must put ALL the keys during
1040             // recovery so we don't end up losing data.
1041             // So unlike env->put_multiple, we ONLY log a 'put_multiple' log entry.
1042             log_put_multiple(txn, src_db, new_src_key, new_src_data, n_put_dbs, put_fts);
1043             r = do_put_multiple(txn, n_put_dbs, put_dbs, put_key_arrays, put_val_arrays, nullptr, src_db, new_src_key, indexer_shortcut);
1044         }
1045         toku_multi_operation_client_unlock();
1046         if (indexer_lock_taken) {
1047             toku_indexer_unlock(indexer);
1048         }
1049     }
1050 
1051 cleanup:
1052     if (r == 0)
1053         STATUS_VALUE(YDB_LAYER_NUM_MULTI_UPDATES) += num_dbs;  // accountability
1054     else
1055         STATUS_VALUE(YDB_LAYER_NUM_MULTI_UPDATES_FAIL) += num_dbs;  // accountability
1056     return r;
1057 }
1058 
1059 int
autotxn_db_del(DB * db,DB_TXN * txn,DBT * key,uint32_t flags)1060 autotxn_db_del(DB* db, DB_TXN* txn, DBT* key, uint32_t flags) {
1061     bool changed; int r;
1062     r = toku_db_construct_autotxn(db, &txn, &changed, false);
1063     if (r!=0) return r;
1064     r = toku_db_del(db, txn, key, flags, false);
1065     return toku_db_destruct_autotxn(txn, r, changed);
1066 }
1067 
1068 int
autotxn_db_put(DB * db,DB_TXN * txn,DBT * key,DBT * data,uint32_t flags)1069 autotxn_db_put(DB* db, DB_TXN* txn, DBT* key, DBT* data, uint32_t flags) {
1070     //{ unsigned i; printf("put %p keylen=%d key={", db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", data->size); for(i=0; i<data->size; i++) printf("%d,", ((char*)data->data)[i]); printf("}\n"); }
1071     bool changed; int r;
1072     r = env_check_avail_fs_space(db->dbenv);
1073     if (r != 0) { goto cleanup; }
1074     r = toku_db_construct_autotxn(db, &txn, &changed, false);
1075     if (r!=0) {
1076         goto cleanup;
1077     }
1078     r = toku_db_put(db, txn, key, data, flags, false);
1079     r = toku_db_destruct_autotxn(txn, r, changed);
1080 cleanup:
1081     return r;
1082 }
1083 
1084 int
autotxn_db_update(DB * db,DB_TXN * txn,const DBT * key,const DBT * update_function_extra,uint32_t flags)1085 autotxn_db_update(DB *db, DB_TXN *txn,
1086                   const DBT *key,
1087                   const DBT *update_function_extra,
1088                   uint32_t flags) {
1089     bool changed; int r;
1090     r = env_check_avail_fs_space(db->dbenv);
1091     if (r != 0) { goto cleanup; }
1092     r = toku_db_construct_autotxn(db, &txn, &changed, false);
1093     if (r != 0) { return r; }
1094     r = toku_db_update(db, txn, key, update_function_extra, flags);
1095     r = toku_db_destruct_autotxn(txn, r, changed);
1096 cleanup:
1097     return r;
1098 }
1099 
1100 int
autotxn_db_update_broadcast(DB * db,DB_TXN * txn,const DBT * update_function_extra,uint32_t flags)1101 autotxn_db_update_broadcast(DB *db, DB_TXN *txn,
1102                             const DBT *update_function_extra,
1103                             uint32_t flags) {
1104     bool changed; int r;
1105     r = env_check_avail_fs_space(db->dbenv);
1106     if (r != 0) { goto cleanup; }
1107     r = toku_db_construct_autotxn(db, &txn, &changed, false);
1108     if (r != 0) { return r; }
1109     r = toku_db_update_broadcast(db, txn, update_function_extra, flags);
1110     r = toku_db_destruct_autotxn(txn, r, changed);
1111 cleanup:
1112     return r;
1113 }
1114 
1115 int
env_put_multiple(DB_ENV * env,DB * src_db,DB_TXN * txn,const DBT * src_key,const DBT * src_val,uint32_t num_dbs,DB ** db_array,DBT_ARRAY * keys,DBT_ARRAY * vals,uint32_t * flags_array)1116 env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *src_key, const DBT *src_val, uint32_t num_dbs, DB **db_array, DBT_ARRAY *keys, DBT_ARRAY *vals, uint32_t *flags_array) {
1117     int r = env_check_avail_fs_space(env);
1118     if (r == 0) {
1119         r = env_put_multiple_internal(env, src_db, txn, src_key, src_val, num_dbs, db_array, keys, vals, flags_array);
1120     }
1121     return r;
1122 }
1123 
1124 int
toku_ydb_check_avail_fs_space(DB_ENV * env)1125 toku_ydb_check_avail_fs_space(DB_ENV *env) {
1126     int rval = env_check_avail_fs_space(env);
1127     return rval;
1128 }
1129 #undef STATUS_VALUE
1130 
1131 #include <toku_race_tools.h>
1132 void __attribute__((constructor)) toku_ydb_write_helgrind_ignore(void);
1133 void
toku_ydb_write_helgrind_ignore(void)1134 toku_ydb_write_helgrind_ignore(void) {
1135     TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_write_layer_status, sizeof ydb_write_layer_status);
1136 }
1137