1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 #include "toku_pthread.h"
41 #include <db.h>
42 #include <sys/stat.h>
43 
44 static const char *envdir = TOKU_TEST_FILENAME;
45 
46 DB_ENV *env;
47 int DISALLOW_PUTS=0;
48 int COMPRESS=0;
49 enum {MAX_NAME=128};
50 enum {NUM_DBS=1};
51 enum {NUM_KV_PAIRS=3};
52 struct kv_pair {
53     int64_t key;
54     int64_t val;
55 };
56 struct kv_pair kv_pairs[NUM_KV_PAIRS] = {{1,4},
57                                          {2,5},
58                                          {3,6}};
59 static uint32_t block_size = 0;
60 
put_multiple_generate(DB * dest_db,DB * src_db,DBT_ARRAY * dest_keys,DBT_ARRAY * dest_vals,const DBT * src_key,const DBT * src_val)61 static int put_multiple_generate(DB *dest_db, DB *src_db, DBT_ARRAY *dest_keys, DBT_ARRAY *dest_vals, const DBT *src_key, const DBT *src_val) {
62     toku_dbt_array_resize(dest_keys, 1);
63     toku_dbt_array_resize(dest_vals, 1);
64     DBT *dest_key = &dest_keys->dbts[0];
65     DBT *dest_val = &dest_vals->dbts[0];
66     dest_key->flags = 0;
67     dest_val->flags = 0;
68 
69     (void) src_db;
70 
71     uint32_t which = *(uint32_t*)dest_db->app_private;
72     assert(which == 0);
73 
74     dbt_init(dest_key, src_key->data, src_key->size);
75     dbt_init(dest_val, src_val->data, src_val->size);
76 
77 //    printf("dest_key.data = %d\n", *(int*)dest_key->data);
78 //    printf("dest_val.data = %d\n", *(int*)dest_val->data);
79 
80     return 0;
81 }
82 
test_loader(DB ** dbs)83 static void test_loader(DB **dbs)
84 {
85     int r;
86     DB_TXN    *txn;
87     DB_LOADER *loader;
88     uint32_t db_flags[NUM_DBS];
89     uint32_t dbt_flags[NUM_DBS];
90     for(int i=0;i<NUM_DBS;i++) {
91         db_flags[i] = DB_NOOVERWRITE;
92         dbt_flags[i] = 0;
93     }
94     uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p or -c option
95 
96     // create and initialize loader
97     r = env->txn_begin(env, NULL, &txn, 0);
98     CKERR(r);
99     r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
100     CKERR(r);
101     r = loader->set_error_callback(loader, NULL, NULL);
102     CKERR(r);
103     r = loader->set_poll_function(loader, NULL, NULL);
104     CKERR(r);
105 
106     uint64_t before_puts = toku_test_get_latest_lsn(env);
107     // using loader->put, put values into DB
108     DBT key, val;
109     for(int i=0;i<NUM_KV_PAIRS;i++) {
110         dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key));
111         dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val));
112         r = loader->put(loader, &key, &val);
113         if (DISALLOW_PUTS) {
114             CKERR2(r, EINVAL);
115         } else {
116             CKERR(r);
117         }
118     }
119     uint64_t after_puts = toku_test_get_latest_lsn(env);
120     assert(before_puts == after_puts);
121 
122     // close the loader
123     r = loader->close(loader);
124     CKERR(r);
125     r = txn->commit(txn, 0);
126     CKERR(r);
127 
128     // verify the DBs
129     DBC *cursor;
130     r = env->txn_begin(env, NULL, &txn, 0);
131     CKERR(r);
132 
133     for(int j=0;j<NUM_DBS;j++) {
134         r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
135         CKERR(r);
136         for(int i=0;i<NUM_KV_PAIRS;i++) {
137             r = cursor->c_get(cursor, &key, &val, DB_NEXT);
138 	    if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); }
139             if (DISALLOW_PUTS) {
140                 CKERR2(r, DB_NOTFOUND);
141             } else {
142                 CKERR(r);
143                 assert(*(int64_t*)key.data == kv_pairs[i].key);
144                 assert(*(int64_t*)val.data == kv_pairs[i].val);
145             }
146         }
147         cursor->c_close(cursor);
148     }
149     r = txn->commit(txn, 0);
150     CKERR(r);
151 
152     if ( verbose ) printf("PASS\n");
153 }
154 
run_test(void)155 static void run_test(void)
156 {
157     int r;
158     char rmcmd[32 + strlen(envdir)];
159     snprintf(rmcmd, sizeof rmcmd, "rm -rf %s", envdir);
160     r = system(rmcmd);                                                                             CKERR(r);
161     r = toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO);                                                       CKERR(r);
162     char logdir[8 + strlen(envdir)];
163     snprintf(logdir, sizeof logdir, "%s/log", envdir);
164     r = toku_os_mkdir(logdir, S_IRWXU+S_IRWXG+S_IRWXO);
165     CKERR(r);
166 
167     r = db_env_create(&env, 0);                                                                               CKERR(r);
168     r = env->set_lg_dir(env, "log");
169     CKERR(r);
170     r = env->set_default_bt_compare(env, int64_dbt_cmp);                                                      CKERR(r);
171     r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
172     CKERR(r);
173 //    int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
174     int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
175     r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO);                                            CKERR(r);
176     env->set_errfile(env, stderr);
177     //Disable auto-checkpointing
178     r = env->checkpointing_set_period(env, 0);                                                                CKERR(r);
179 
180     DBT desc;
181     dbt_init(&desc, "foo", sizeof("foo"));
182     char name[MAX_NAME*2];
183 
184     DB *dbs[NUM_DBS];
185     int idx[NUM_DBS];
186     for(int i=0;i<NUM_DBS;i++) {
187         idx[i] = i;
188         r = db_create(&dbs[i], env, 0);                                                                       CKERR(r);
189         dbs[i]->app_private = &idx[i];
190         if (block_size != 0) {
191             r = dbs[i]->set_pagesize(dbs[i], block_size); CKERR(r);
192         }
193         snprintf(name, sizeof(name), "db_%04x", i);
194         r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666);                                CKERR(r);
195         IN_TXN_COMMIT(env, NULL, txn_desc, 0, {
196                 { int chk_r = dbs[i]->change_descriptor(dbs[i], txn_desc, &desc, 0); CKERR(chk_r); }
197         });
198     }
199 
200     // -------------------------- //
201     test_loader(dbs);
202     // -------------------------- //
203 
204     for(int i=0;i<NUM_DBS;i++) {
205         dbs[i]->close(dbs[i], 0);                                                                             CKERR(r);
206         dbs[i] = NULL;
207     }
208     r = env->close(env, 0);                                                                                   CKERR(r);
209 }
210 
211 // ------------ infrastructure ----------
212 static void do_args(int argc, char * const argv[]);
213 
test_main(int argc,char * const * argv)214 int test_main(int argc, char * const *argv) {
215     do_args(argc, argv);
216     run_test();
217     return 0;
218 }
219 
do_args(int argc,char * const argv[])220 static void do_args(int argc, char * const argv[]) {
221     int resultcode;
222     char *cmd = argv[0];
223     argc--; argv++;
224     while (argc>0) {
225 	if (strcmp(argv[0], "-v")==0) {
226 	    verbose++;
227 	} else if (strcmp(argv[0],"-q")==0) {
228 	    verbose--;
229 	    if (verbose<0) verbose=0;
230         } else if (strcmp(argv[0], "-h")==0) {
231 	    resultcode=0;
232 	do_usage:
233 	    fprintf(stderr, "Usage:\n%s\n", cmd);
234 	    exit(resultcode);
235         } else if (strcmp(argv[0], "-p")==0) {
236             DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
237         } else if (strcmp(argv[0], "-z")==0) {
238             COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
239         } else if (strcmp(argv[0], "--block_size") == 0) {
240             argc--; argv++;
241             block_size = atoi(argv[0]);
242         } else if (strcmp(argv[0], "-e") == 0) {
243             argc--; argv++;
244             if (argc > 0)
245                 envdir = argv[0];
246 	} else {
247 	    fprintf(stderr, "Unknown arg: %s\n", argv[0]);
248 	    resultcode=1;
249 	    goto do_usage;
250 	}
251 	argc--;
252 	argv++;
253     }
254 }
255