1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: 3 #ident "$Id$" 4 /*====== 5 This file is part of PerconaFT. 6 7 8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. 9 10 PerconaFT is free software: you can redistribute it and/or modify 11 it under the terms of the GNU General Public License, version 2, 12 as published by the Free Software Foundation. 13 14 PerconaFT is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 21 22 ---------------------------------------- 23 24 PerconaFT is free software: you can redistribute it and/or modify 25 it under the terms of the GNU Affero General Public License, version 3, 26 as published by the Free Software Foundation. 27 28 PerconaFT is distributed in the hope that it will be useful, 29 but WITHOUT ANY WARRANTY; without even the implied warranty of 30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 31 GNU Affero General Public License for more details. 32 33 You should have received a copy of the GNU Affero General Public License 34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 35 ======= */ 36 37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." 38 39 /* This is a performance test. Releasing lock during I/O should mean that given two threads doing queries, 40 * and one of them is in-memory and one of them is out of memory, then the in-memory one should not be slowed down by the out-of-memory one. 41 * 42 * Step 1: Create a dictionary that doesn't fit in main memory. Do it fast (sequential insertions). 43 * Step 2: Measure performance of in-memory requests. 44 * Step 3: Add a thread that does requests in parallel. 45 */ 46 47 #include "test.h" 48 #include <string.h> 49 #include <toku_time.h> 50 #include <toku_pthread.h> 51 #include <portability/toku_atomic.h> 52 53 static const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE; 54 55 #define ROWSIZE 100 56 static const char dbname[] = "data.db"; 57 static unsigned long long n_rows; 58 59 static DB_ENV *env = NULL; 60 static DB *db; 61 62 // BDB cannot handle big transactions by default (runs out of locks). 63 #define N_PER_XACTION 10000 64 65 static void create_db (uint64_t N) { 66 n_rows = N; 67 toku_os_recursive_delete(TOKU_TEST_FILENAME); 68 toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); 69 { int r = db_env_create(&env, 0); CKERR(r); } 70 env->set_errfile(env, stderr); 71 env->set_redzone(env, 0); 72 { int r = env->set_cachesize(env, 0, 400*4096, 1); CKERR(r); } 73 { int r = env->open(env, TOKU_TEST_FILENAME, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); } 74 DB_TXN *txn; 75 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); } 76 { int r = db_create(&db, env, 0); CKERR(r); } 77 { int r = db->set_pagesize(db, 4096); CKERR(r); } 78 { int r = db->open(db, txn, dbname, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r); } 79 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); } 80 81 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); } 82 uint64_t n_since_commit = 0; 83 for (unsigned long long i=0; i<N; i++) { 84 if (n_since_commit++ > N_PER_XACTION) { 85 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); } 86 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); } 87 } 88 char key[20]; 89 char data[200]; 90 snprintf(key, sizeof(key), "%016llx", i); 91 snprintf(data, sizeof(data), "%08lx%08lx%66s", random(), random()%16, ""); 92 DBT keyd, datad; 93 { 94 int r = db->put(db, txn, dbt_init(&keyd, key, strlen(key)+1), dbt_init(&datad, data, strlen(data)+1), 0); 95 CKERR(r); 96 } 97 } 98 //printf("n_rows=%lld\n", n_rows); 99 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); } 100 } 101 102 struct reader_thread_state { 103 /* output */ 104 double elapsed_time; 105 unsigned long long n_did_read; 106 107 /* input */ 108 signed long long n_to_read; // Negative if we just run forever 109 int do_local; 110 111 /* communicate to the thread while running */ 112 volatile int finish; 113 114 }; 115 116 static 117 void* reader_thread (void *arg) 118 // Return the time to read 119 { 120 struct timeval start_time, end_time; 121 gettimeofday(&start_time, 0); 122 123 DB_TXN *txn; 124 struct reader_thread_state *rs = (struct reader_thread_state *)arg; 125 126 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); } 127 char key[20]; 128 char data [200]; 129 DBT keyd, datad; 130 keyd.data = key; 131 keyd.size = 0; 132 keyd.ulen = sizeof(key); 133 keyd.flags = DB_DBT_USERMEM; 134 datad.data = data; 135 datad.size = 0; 136 datad.ulen = sizeof(data); 137 datad.flags = DB_DBT_USERMEM; 138 139 #define N_DISTINCT 16 140 unsigned long long vals[N_DISTINCT]; 141 if (rs->do_local) { 142 for (int i=0; i<N_DISTINCT; i++) { 143 vals[i] = random()%n_rows; 144 } 145 } 146 147 uint64_t n_since_commit = 0; 148 long long n_read_so_far = 0; 149 while ((!rs->finish) && ((rs->n_to_read < 0) || (n_read_so_far < rs->n_to_read))) { 150 151 if (n_since_commit++ > N_PER_XACTION) { 152 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); } 153 { int r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); } 154 n_since_commit = 0; 155 } 156 long long value; 157 if (rs->do_local) { 158 long which = random()%N_DISTINCT; 159 value = vals[which]; 160 //printf("value=%lld\n", value); 161 } else { 162 value = random()%n_rows; 163 } 164 snprintf(key, sizeof(key), "%016llx", value); 165 keyd.size = strlen(key)+1; 166 int r = db->get(db, txn, &keyd, &datad, 0); 167 #ifdef BLOCKING_ROW_LOCKS_READS_NOT_SHARED 168 invariant(r == 0 || r == DB_LOCK_NOTGRANTED || r == DB_LOCK_DEADLOCK); 169 #else 170 CKERR(r); 171 #endif 172 rs->n_did_read++; 173 n_read_so_far ++; 174 } 175 { int r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); } 176 177 gettimeofday(&end_time, 0); 178 rs->elapsed_time = toku_tdiff(&end_time, &start_time); 179 return NULL; 180 } 181 182 static 183 void do_threads (unsigned long long N, int do_nonlocal) { 184 toku_pthread_t ths[2]; 185 struct reader_thread_state rstates[2] = {{.elapsed_time = 0.0, 186 .n_did_read = 0, 187 .n_to_read = (long long signed)N, 188 .do_local = 1, 189 .finish = 0}, 190 {.elapsed_time = 0.0, 191 .n_did_read = 0, 192 .n_to_read = -1, 193 .do_local = 0, 194 .finish = 0}}; 195 int n_to_create = do_nonlocal ? 2 : 1; 196 for (int i = 0; i < n_to_create; i++) { 197 int r = toku_pthread_create(toku_uninstrumented, 198 &ths[i], 199 nullptr, 200 reader_thread, 201 static_cast<void *>(&rstates[i])); 202 CKERR(r); 203 } 204 for (int i = 0; i < n_to_create; i++) { 205 void *retval; 206 int r = toku_pthread_join(ths[i], &retval); 207 CKERR(r); 208 assert(retval == 0); 209 if (verbose) { 210 printf("%9s thread time = %8.2fs on %9lld reads (%.3f us/read)\n", 211 (i == 0 ? "local" : "nonlocal"), 212 rstates[i].elapsed_time, 213 rstates[i].n_did_read, 214 rstates[i].elapsed_time / rstates[i].n_did_read * 1e6); 215 } 216 rstates[1].finish = 1; 217 } 218 if (verbose && do_nonlocal) { 219 printf("total %9lld reads (%.3f us/read)\n", 220 rstates[0].n_did_read + rstates[1].n_did_read, 221 (rstates[0].elapsed_time)/(rstates[0].n_did_read + rstates[1].n_did_read) * 1e6); 222 } 223 } 224 225 static volatile unsigned long long n_preads; 226 227 static ssize_t my_pread (int fd, void *buf, size_t count, off_t offset) { 228 (void) toku_sync_fetch_and_add(&n_preads, 1); 229 usleep(1000); // sleep for a millisecond 230 return pread(fd, buf, count, offset); 231 } 232 233 unsigned long N_default = 100000; 234 unsigned long N; 235 236 static void my_parse_args (int argc, char * const argv[]) { 237 const char *progname = argv[0]; 238 argc--; argv++; 239 verbose = 0; 240 N = N_default; 241 while (argc>0) { 242 if (strcmp(argv[0],"-v")==0) { 243 verbose++; 244 } else if (strcmp(argv[0],"-q")==0) { 245 if (verbose>0) verbose--; 246 } else if (strcmp(argv[0],"-n")==0) { 247 argc--; argv++; 248 if (argc==0) goto usage; 249 errno = 0; 250 char *end; 251 N = strtol(argv[0], &end, 10); 252 if (errno!=0 || *end!=0) goto usage; 253 } else { 254 usage: 255 fprintf(stderr, "Usage:\n %s [-v] [-q] [-n <rowcount> (default %ld)]\n", progname, N_default); 256 fprintf(stderr, " -n 10000 is probably good for valgrind.\n"); 257 exit(1); 258 } 259 argc--; argv++; 260 } 261 262 } 263 264 int test_main (int argc, char * const argv[]) { 265 my_parse_args(argc, argv); 266 267 unsigned long long M = N*10; 268 269 db_env_set_func_pread(my_pread); 270 271 create_db (N); 272 if (verbose) printf("%lld preads\n", n_preads); 273 do_threads (M, 0); 274 if (verbose) printf("%lld preads\n", n_preads); 275 do_threads (M, 0); 276 if (verbose) printf("%lld preads\n", n_preads); 277 do_threads (M, 1); 278 if (verbose) printf("%lld preads\n", n_preads); 279 { int r = db->close(db, 0); CKERR(r); } 280 { int r = env->close(env, 0); CKERR(r); } 281 if (verbose) printf("%lld preads\n", n_preads); 282 return 0; 283 } 284 285