1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 /* This is a performance test. Releasing lock during I/O should mean that given two threads doing queries,
40 * and one of them is in-memory and one of them is out of memory, then the in-memory one should not be slowed down by the out-of-memory one.
41 *
42 * Step 1: Create a dictionary that doesn't fit in main memory. Do it fast (sequential insertions).
43 * Step 2: Measure performance of in-memory requests.
44 * Step 3: Add a thread that does requests in parallel.
45 */
46
47 #include "test.h"
48 #include <string.h>
49 #include <toku_time.h>
50 #include <toku_pthread.h>
51 #include <portability/toku_atomic.h>
52
53 static const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
54
55 #define ROWSIZE 100
56 static const char dbname[] = "data.db";
57 static unsigned long long n_rows;
58
59 static DB_ENV *env = NULL;
60 static DB *db;
61
62 // BDB cannot handle big transactions by default (runs out of locks).
63 #define N_PER_XACTION 10000
64
create_db(uint64_t N)65 static void create_db (uint64_t N) {
66 n_rows = N;
67 toku_os_recursive_delete(TOKU_TEST_FILENAME);
68 toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO);
69 { int r = db_env_create(&env, 0); CKERR(r); }
70 env->set_errfile(env, stderr);
71 env->set_redzone(env, 0);
72 { int r = env->set_cachesize(env, 0, 400*4096, 1); CKERR(r); }
73 { int r = env->open(env, TOKU_TEST_FILENAME, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); }
74 DB_TXN *txn;
75 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); }
76 { int r = db_create(&db, env, 0); CKERR(r); }
77 { int r = db->set_pagesize(db, 4096); CKERR(r); }
78 { int r = db->open(db, txn, dbname, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r); }
79 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); }
80
81 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); }
82 uint64_t n_since_commit = 0;
83 for (unsigned long long i=0; i<N; i++) {
84 if (n_since_commit++ > N_PER_XACTION) {
85 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); }
86 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); }
87 }
88 char key[20];
89 char data[200];
90 snprintf(key, sizeof(key), "%016llx", i);
91 snprintf(data, sizeof(data), "%08lx%08lx%66s", random(), random()%16, "");
92 DBT keyd, datad;
93 {
94 int r = db->put(db, txn, dbt_init(&keyd, key, strlen(key)+1), dbt_init(&datad, data, strlen(data)+1), 0);
95 CKERR(r);
96 }
97 }
98 //printf("n_rows=%lld\n", n_rows);
99 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); }
100 }
101
102 struct reader_thread_state {
103 /* output */
104 double elapsed_time;
105 unsigned long long n_did_read;
106
107 /* input */
108 signed long long n_to_read; // Negative if we just run forever
109 int do_local;
110
111 /* communicate to the thread while running */
112 volatile int finish;
113
114 };
115
116 static
reader_thread(void * arg)117 void* reader_thread (void *arg)
118 // Return the time to read
119 {
120 struct timeval start_time, end_time;
121 gettimeofday(&start_time, 0);
122
123 DB_TXN *txn;
124 struct reader_thread_state *rs = (struct reader_thread_state *)arg;
125
126 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); }
127 char key[20];
128 char data [200];
129 DBT keyd, datad;
130 keyd.data = key;
131 keyd.size = 0;
132 keyd.ulen = sizeof(key);
133 keyd.flags = DB_DBT_USERMEM;
134 datad.data = data;
135 datad.size = 0;
136 datad.ulen = sizeof(data);
137 datad.flags = DB_DBT_USERMEM;
138
139 #define N_DISTINCT 16
140 unsigned long long vals[N_DISTINCT];
141 if (rs->do_local) {
142 for (int i=0; i<N_DISTINCT; i++) {
143 vals[i] = random()%n_rows;
144 }
145 }
146
147 uint64_t n_since_commit = 0;
148 long long n_read_so_far = 0;
149 while ((!rs->finish) && ((rs->n_to_read < 0) || (n_read_so_far < rs->n_to_read))) {
150
151 if (n_since_commit++ > N_PER_XACTION) {
152 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); }
153 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); }
154 n_since_commit = 0;
155 }
156 long long value;
157 if (rs->do_local) {
158 long which = random()%N_DISTINCT;
159 value = vals[which];
160 //printf("value=%lld\n", value);
161 } else {
162 value = random()%n_rows;
163 }
164 snprintf(key, sizeof(key), "%016llx", value);
165 keyd.size = strlen(key)+1;
166 int r = db->get(db, txn, &keyd, &datad, 0);
167 #ifdef BLOCKING_ROW_LOCKS_READS_NOT_SHARED
168 invariant(r == 0 || r == DB_LOCK_NOTGRANTED || r == DB_LOCK_DEADLOCK);
169 #else
170 CKERR(r);
171 #endif
172 rs->n_did_read++;
173 n_read_so_far ++;
174 }
175 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); }
176
177 gettimeofday(&end_time, 0);
178 rs->elapsed_time = toku_tdiff(&end_time, &start_time);
179 return NULL;
180 }
181
182 static
do_threads(unsigned long long N,int do_nonlocal)183 void do_threads (unsigned long long N, int do_nonlocal) {
184 toku_pthread_t ths[2];
185 struct reader_thread_state rstates[2] = {{.elapsed_time = 0.0,
186 .n_did_read = 0,
187 .n_to_read = (long long signed)N,
188 .do_local = 1,
189 .finish = 0},
190 {.elapsed_time = 0.0,
191 .n_did_read = 0,
192 .n_to_read = -1,
193 .do_local = 0,
194 .finish = 0}};
195 int n_to_create = do_nonlocal ? 2 : 1;
196 for (int i = 0; i < n_to_create; i++) {
197 int r = toku_pthread_create(toku_uninstrumented,
198 &ths[i],
199 nullptr,
200 reader_thread,
201 static_cast<void *>(&rstates[i]));
202 CKERR(r);
203 }
204 for (int i = 0; i < n_to_create; i++) {
205 void *retval;
206 int r = toku_pthread_join(ths[i], &retval);
207 CKERR(r);
208 assert(retval == 0);
209 if (verbose) {
210 printf("%9s thread time = %8.2fs on %9lld reads (%.3f us/read)\n",
211 (i == 0 ? "local" : "nonlocal"),
212 rstates[i].elapsed_time,
213 rstates[i].n_did_read,
214 rstates[i].elapsed_time / rstates[i].n_did_read * 1e6);
215 }
216 rstates[1].finish = 1;
217 }
218 if (verbose && do_nonlocal) {
219 printf("total %9lld reads (%.3f us/read)\n",
220 rstates[0].n_did_read + rstates[1].n_did_read,
221 (rstates[0].elapsed_time)/(rstates[0].n_did_read + rstates[1].n_did_read) * 1e6);
222 }
223 }
224
225 static volatile unsigned long long n_preads;
226
my_pread(int fd,void * buf,size_t count,off_t offset)227 static ssize_t my_pread (int fd, void *buf, size_t count, off_t offset) {
228 (void) toku_sync_fetch_and_add(&n_preads, 1);
229 usleep(1000); // sleep for a millisecond
230 return pread(fd, buf, count, offset);
231 }
232
233 unsigned long N_default = 100000;
234 unsigned long N;
235
my_parse_args(int argc,char * const argv[])236 static void my_parse_args (int argc, char * const argv[]) {
237 const char *progname = argv[0];
238 argc--; argv++;
239 verbose = 0;
240 N = N_default;
241 while (argc>0) {
242 if (strcmp(argv[0],"-v")==0) {
243 verbose++;
244 } else if (strcmp(argv[0],"-q")==0) {
245 if (verbose>0) verbose--;
246 } else if (strcmp(argv[0],"-n")==0) {
247 argc--; argv++;
248 if (argc==0) goto usage;
249 errno = 0;
250 char *end;
251 N = strtol(argv[0], &end, 10);
252 if (errno!=0 || *end!=0) goto usage;
253 } else {
254 usage:
255 fprintf(stderr, "Usage:\n %s [-v] [-q] [-n <rowcount> (default %ld)]\n", progname, N_default);
256 fprintf(stderr, " -n 10000 is probably good for valgrind.\n");
257 exit(1);
258 }
259 argc--; argv++;
260 }
261
262 }
263
test_main(int argc,char * const argv[])264 int test_main (int argc, char * const argv[]) {
265 my_parse_args(argc, argv);
266
267 unsigned long long M = N*10;
268
269 db_env_set_func_pread(my_pread);
270
271 create_db (N);
272 if (verbose) printf("%lld preads\n", n_preads);
273 do_threads (M, 0);
274 if (verbose) printf("%lld preads\n", n_preads);
275 do_threads (M, 0);
276 if (verbose) printf("%lld preads\n", n_preads);
277 do_threads (M, 1);
278 if (verbose) printf("%lld preads\n", n_preads);
279 { int r = db->close(db, 0); CKERR(r); }
280 { int r = env->close(env, 0); CKERR(r); }
281 if (verbose) printf("%lld preads\n", n_preads);
282 return 0;
283 }
284
285