1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 #include <stdio.h>
42 #include <stdlib.h>
43 
44 #include <unistd.h>
45 #include <memory.h>
46 #include <errno.h>
47 #include <sys/stat.h>
48 #include <db.h>
49 
50 
put_multiple_generate(DB * UU (dest_db),DB * UU (src_db),DBT_ARRAY * dest_key_arrays,DBT_ARRAY * dest_val_arrays,const DBT * src_key,const DBT * src_val)51 static int put_multiple_generate(DB *UU(dest_db), DB *UU(src_db), DBT_ARRAY *dest_key_arrays, DBT_ARRAY *dest_val_arrays, const DBT *src_key, const DBT *src_val) {
52     toku_dbt_array_resize(dest_key_arrays, 1);
53     toku_dbt_array_resize(dest_val_arrays, 1);
54     DBT *dest_key = &dest_key_arrays->dbts[0];
55     DBT *dest_val = &dest_val_arrays->dbts[0];
56     dest_key->flags = 0;
57     dest_val->flags = 0;
58     dbt_init(dest_key, src_key->data, src_key->size);
59     dbt_init(dest_val, src_val->data, src_val->size);
60     return 0;
61 }
62 
63 static void
test_loader_abort(bool do_compress,bool abort_loader,bool abort_txn)64 test_loader_abort (bool do_compress, bool abort_loader, bool abort_txn) {
65     DB_ENV * env;
66     DB *db;
67     DB_TXN *txn;
68     DB_TXN* const null_txn = 0;
69     const char * const fname = "test.loader_abort.ft_handle";
70     int r;
71     toku_os_recursive_delete(TOKU_TEST_FILENAME);
72     toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO);
73 
74     DB_LOADER *loader;
75     uint32_t db_flags = 0;
76     uint32_t dbt_flags = 0;
77     uint32_t loader_flags = do_compress ? LOADER_COMPRESS_INTERMEDIATES : 0;
78     DBC* cursor = NULL;
79 
80     /* create the dup database file */
81     r = db_env_create(&env, 0);        assert(r == 0);
82     env->set_errfile(env, stderr);
83     r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
84     CKERR(r);
85     r = env->open(env, TOKU_TEST_FILENAME, DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
86     r = db_create(&db, env, 0); assert(r == 0);
87     db->set_errfile(db,stderr); // Turn off those annoying errors
88     r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
89 
90     r = env->txn_begin(env, NULL, &txn, 0);
91     CKERR(r);
92 
93     r = env->create_loader(env, txn, &loader, db, 1, &db, &db_flags, &dbt_flags, loader_flags);
94     CKERR(r);
95 
96     DBT key, val;
97     uint32_t k;
98     uint32_t v;
99     uint32_t num_elements = 2;
100     for (uint32_t i = 0; i < num_elements; i++) {
101         k = i;
102         v = i;
103         r = loader->put(
104             loader,
105             dbt_init(&key, &k, sizeof k),
106             dbt_init(&val, &v, sizeof v)
107             );
108         assert(r == 0);
109     }
110     if (abort_loader) {
111         loader->abort(loader);
112     }
113     else {
114         loader->close(loader);
115     }
116     k = num_elements;
117     v = num_elements;
118     r = db->put(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
119 
120     if (abort_txn) {
121         r = txn->abort(txn);
122         CKERR(r);
123     }
124     else {
125         r = txn->commit(txn, 0);
126         CKERR(r);
127     }
128 
129     r = env->txn_begin(env, NULL, &txn, 0);
130     CKERR(r);
131     r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
132     DBT k1; memset(&k1, 0, sizeof k1);
133     DBT v1; memset(&v1, 0, sizeof v1);
134     if (!abort_txn) {
135         if (!abort_loader) {
136             for (uint32_t i = 0; i < num_elements; i++) {
137                 r = cursor->c_get(cursor, &k1, &v1, DB_NEXT); assert(r == 0);
138                 assert(k1.size == sizeof(uint32_t));
139                 assert(v1.size == sizeof(uint32_t));
140                 assert(*(uint32_t *)k1.data == i);
141                 assert(*(uint32_t *)v1.data == i);
142             }
143         }
144         r = cursor->c_get(cursor, &k1, &v1, DB_NEXT); assert(r == 0);
145         assert(k1.size == sizeof(uint32_t));
146         assert(v1.size == sizeof(uint32_t));
147         assert(*(uint32_t *)k1.data == num_elements);
148         assert(*(uint32_t *)v1.data == num_elements);
149     }
150     r = cursor->c_get(cursor, &k1, &v1, DB_NEXT); assert(r == DB_NOTFOUND);
151 
152     r = cursor->c_close(cursor); assert(r == 0);
153     r = txn->commit(txn, 0);
154     CKERR(r);
155 
156     r = db->close(db, 0); assert(r == 0);
157     r = env->close(env, 0); assert(r == 0);
158 }
159 
160 int
test_main(int argc,char * const argv[])161 test_main(int argc, char *const argv[]) {
162     parse_args(argc, argv);
163     test_loader_abort(false, false, true);
164     test_loader_abort(false, true, true);
165     test_loader_abort(true, false, true);
166     test_loader_abort(true, true, true);
167     test_loader_abort(false, false, false);
168     test_loader_abort(false, true, false);
169     test_loader_abort(true, false, false);
170     test_loader_abort(true, true, false);
171     return 0;
172 }
173