1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 /* This is a performance test.  Releasing lock during I/O should mean that given two threads doing queries,
40  * and one of them is in-memory and one of them is out of memory, then the in-memory one should not be slowed down by the out-of-memory one.
41  *
42  * Step 1: Create a dictionary that doesn't fit in main memory.  Do it fast (sequential insertions).
43  * Step 2: Measure performance of in-memory requests.
44  * Step 3: Add a thread that does requests in parallel.
45  */
46 
47 #include "test.h"
48 #include <string.h>
49 #include <toku_time.h>
50 #include <toku_pthread.h>
51 #include <portability/toku_atomic.h>
52 
53 static const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
54 
55 #define ROWSIZE 100
56 static const char dbname[] = "data.db";
57 static unsigned long long n_rows;
58 
59 static DB_ENV *env = NULL;
60 static DB *db;
61 
62 // BDB cannot handle big transactions  by default (runs out of locks).
63 #define N_PER_XACTION 10000
64 
65 static void create_db (uint64_t N) {
66     n_rows = N;
67     toku_os_recursive_delete(TOKU_TEST_FILENAME);
68     toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO);
69     { int r = db_env_create(&env, 0);                                          CKERR(r); }
70     env->set_errfile(env, stderr);
71     env->set_redzone(env, 0);
72     { int r = env->set_cachesize(env, 0, 400*4096, 1);                        CKERR(r); }
73     { int r = env->open(env, TOKU_TEST_FILENAME, envflags, S_IRWXU+S_IRWXG+S_IRWXO);       CKERR(r); }
74     DB_TXN *txn;
75     { int r = env->txn_begin(env, NULL, &txn, 0);                              CKERR(r); }
76     { int r = db_create(&db, env, 0);                                          CKERR(r); }
77     { int r = db->set_pagesize(db, 4096);                                      CKERR(r); }
78     { int r = db->open(db, txn, dbname, NULL, DB_BTREE, DB_CREATE, 0666);      CKERR(r); }
79     { int r = txn->commit(txn, DB_TXN_NOSYNC);                                 CKERR(r); }
80 
81     { int r = env->txn_begin(env, NULL, &txn, 0);                              CKERR(r); }
82     uint64_t n_since_commit = 0;
83     for (unsigned long long i=0; i<N; i++) {
84 	if (n_since_commit++ > N_PER_XACTION) {
85 	    { int r = txn->commit(txn, DB_TXN_NOSYNC);                         CKERR(r); }
86 	    { int r = env->txn_begin(env, NULL, &txn, 0);                      CKERR(r); }
87 	}
88 	char key[20];
89 	char data[200];
90 	snprintf(key,  sizeof(key),  "%016llx", i);
91 	snprintf(data, sizeof(data), "%08lx%08lx%66s", random(), random()%16, "");
92 	DBT keyd, datad;
93 	{
94 	    int r = db->put(db, txn, dbt_init(&keyd, key, strlen(key)+1), dbt_init(&datad, data, strlen(data)+1), 0);
95 	    CKERR(r);
96 	}
97     }
98     //printf("n_rows=%lld\n", n_rows);
99     { int r = txn->commit(txn, DB_TXN_NOSYNC);                                 CKERR(r); }
100 }
101 
102 struct reader_thread_state {
103     /* output */
104     double             elapsed_time;
105     unsigned long long n_did_read;
106 
107     /* input */
108     signed long long n_to_read;  // Negative if we just run forever
109     int              do_local;
110 
111     /* communicate to the thread while running */
112     volatile int finish;
113 
114 };
115 
116 static
117 void* reader_thread (void *arg)
118 // Return the time to read
119 {
120     struct timeval start_time, end_time;
121     gettimeofday(&start_time, 0);
122 
123     DB_TXN *txn;
124     struct reader_thread_state *rs = (struct reader_thread_state *)arg;
125 
126     { int r = env->txn_begin(env, NULL, &txn, 0);                              CKERR(r); }
127     char key[20];
128     char data [200];
129     DBT keyd, datad;
130     keyd.data = key;
131     keyd.size = 0;
132     keyd.ulen = sizeof(key);
133     keyd.flags = DB_DBT_USERMEM;
134     datad.data = data;
135     datad.size = 0;
136     datad.ulen = sizeof(data);
137     datad.flags = DB_DBT_USERMEM;
138 
139 #define N_DISTINCT 16
140     unsigned long long vals[N_DISTINCT];
141     if (rs->do_local) {
142 	for (int i=0; i<N_DISTINCT; i++) {
143 	    vals[i] = random()%n_rows;
144 	}
145     }
146 
147     uint64_t n_since_commit = 0;
148     long long n_read_so_far = 0;
149     while ((!rs->finish) && ((rs->n_to_read < 0) || (n_read_so_far < rs->n_to_read))) {
150 
151 	if (n_since_commit++ > N_PER_XACTION) {
152 	    { int r = txn->commit(txn, DB_TXN_NOSYNC);                         CKERR(r); }
153 	    { int r = env->txn_begin(env, NULL, &txn, 0);                      CKERR(r); }
154 	    n_since_commit = 0;
155 	}
156 	long long value;
157 	if (rs->do_local) {
158 	    long which = random()%N_DISTINCT;
159 	    value = vals[which];
160 	    //printf("value=%lld\n", value);
161 	} else {
162 	    value = random()%n_rows;
163 	}
164 	snprintf(key,  sizeof(key),  "%016llx", value);
165 	keyd.size = strlen(key)+1;
166 	int r = db->get(db, txn, &keyd, &datad, 0);
167 #ifdef BLOCKING_ROW_LOCKS_READS_NOT_SHARED
168         invariant(r == 0 || r == DB_LOCK_NOTGRANTED || r == DB_LOCK_DEADLOCK);
169 #else
170 	CKERR(r);
171 #endif
172 	rs->n_did_read++;
173 	n_read_so_far ++;
174     }
175     { int r = txn->commit(txn, DB_TXN_NOSYNC);                                 CKERR(r); }
176 
177     gettimeofday(&end_time, 0);
178     rs->elapsed_time = toku_tdiff(&end_time, &start_time);
179     return NULL;
180 }
181 
182 static
183 void do_threads (unsigned long long N, int do_nonlocal) {
184     toku_pthread_t ths[2];
185     struct reader_thread_state rstates[2] = {{.elapsed_time = 0.0,
186                                               .n_did_read = 0,
187                                               .n_to_read = (long long signed)N,
188                                               .do_local = 1,
189                                               .finish = 0},
190                                              {.elapsed_time = 0.0,
191                                               .n_did_read = 0,
192                                               .n_to_read = -1,
193                                               .do_local = 0,
194                                               .finish = 0}};
195     int n_to_create = do_nonlocal ? 2 : 1;
196     for (int i = 0; i < n_to_create; i++) {
197         int r = toku_pthread_create(toku_uninstrumented,
198                                     &ths[i],
199                                     nullptr,
200                                     reader_thread,
201                                     static_cast<void *>(&rstates[i]));
202         CKERR(r);
203     }
204     for (int i = 0; i < n_to_create; i++) {
205         void *retval;
206         int r = toku_pthread_join(ths[i], &retval);
207         CKERR(r);
208         assert(retval == 0);
209         if (verbose) {
210             printf("%9s thread time = %8.2fs on %9lld reads (%.3f us/read)\n",
211                    (i == 0 ? "local" : "nonlocal"),
212                    rstates[i].elapsed_time,
213                    rstates[i].n_did_read,
214                    rstates[i].elapsed_time / rstates[i].n_did_read * 1e6);
215         }
216         rstates[1].finish = 1;
217     }
218     if (verbose && do_nonlocal) {
219 	printf("total                                %9lld reads (%.3f us/read)\n",
220 	       rstates[0].n_did_read + rstates[1].n_did_read,
221 	       (rstates[0].elapsed_time)/(rstates[0].n_did_read + rstates[1].n_did_read) * 1e6);
222     }
223 }
224 
225 static volatile unsigned long long n_preads;
226 
227 static ssize_t my_pread (int fd, void *buf, size_t count, off_t offset) {
228     (void) toku_sync_fetch_and_add(&n_preads, 1);
229     usleep(1000); // sleep for a millisecond
230     return pread(fd, buf, count, offset);
231 }
232 
233 unsigned long N_default = 100000;
234 unsigned long N;
235 
236 static void my_parse_args (int argc, char * const argv[]) {
237     const char *progname = argv[0];
238     argc--; argv++;
239     verbose = 0;
240     N = N_default;
241     while (argc>0) {
242 	if (strcmp(argv[0],"-v")==0) {
243 	    verbose++;
244 	} else if (strcmp(argv[0],"-q")==0) {
245 	    if (verbose>0) verbose--;
246 	} else if (strcmp(argv[0],"-n")==0) {
247 	    argc--; argv++;
248 	    if (argc==0) goto usage;
249 	    errno = 0;
250 	    char *end;
251 	    N = strtol(argv[0], &end, 10);
252 	    if (errno!=0 || *end!=0) goto usage;
253 	} else {
254 	usage:
255 	    fprintf(stderr, "Usage:\n %s [-v] [-q] [-n <rowcount> (default %ld)]\n", progname, N_default);
256 	    fprintf(stderr, "  -n 10000     is probably good for valgrind.\n");
257 	    exit(1);
258 	}
259 	argc--; argv++;
260     }
261 
262 }
263 
264 int test_main (int argc, char * const argv[])  {
265     my_parse_args(argc, argv);
266 
267     unsigned long long M = N*10;
268 
269     db_env_set_func_pread(my_pread);
270 
271     create_db (N);
272     if (verbose) printf("%lld preads\n", n_preads);
273     do_threads (M, 0);
274     if (verbose) printf("%lld preads\n", n_preads);
275     do_threads (M, 0);
276     if (verbose) printf("%lld preads\n", n_preads);
277     do_threads (M, 1);
278     if (verbose) printf("%lld preads\n", n_preads);
279     { int r = db->close(db, 0);                                                CKERR(r); }
280     { int r = env->close(env, 0);                                              CKERR(r); }
281     if (verbose) printf("%lld preads\n", n_preads);
282     return 0;
283 }
284 
285