1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "test.h"
40
41 #include <toku_portability.h>
42 #include <errno.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <unistd.h>
46 #include <db.h>
47 #include <string.h>
48
49 #include <memory.h>
50
51 #include <portability/toku_atomic.h>
52 #include <portability/toku_pthread.h>
53 #include <portability/toku_random.h>
54
55 #include <util/omt.h>
56 #include <util/rwlock.h>
57
58 namespace toku {
59
60 namespace test {
61
fudge(const uint32_t x)62 static inline uint32_t fudge(const uint32_t x) { return x + 300; }
defudge(const uint32_t fx)63 static inline uint32_t defudge(const uint32_t fx) { return fx - 300; }
64
65 int test_iterator(const uint32_t &v, const uint32_t idx, bool *const UU(unused));
test_iterator(const uint32_t & v,const uint32_t idx,bool * const UU (unused))66 int test_iterator(const uint32_t &v, const uint32_t idx, bool *const UU(unused)) {
67 invariant(defudge(v) == idx);
68 return 0;
69 }
70
71 int check_iterator_before(const uint32_t &v, const uint32_t idx, bool *const called);
check_iterator_before(const uint32_t & v,const uint32_t idx,bool * const called)72 int check_iterator_before(const uint32_t &v, const uint32_t idx, bool *const called) {
73 invariant(defudge(v) == idx);
74 invariant(idx % 10 < 5);
75 called[idx] = true;
76 return 0;
77 }
78
79 int check_iterator_after(const uint32_t &v, const uint32_t UU(idx), bool *const called);
check_iterator_after(const uint32_t & v,const uint32_t UU (idx),bool * const called)80 int check_iterator_after(const uint32_t &v, const uint32_t UU(idx), bool *const called) {
81 invariant(defudge(v) % 10 >= 5);
82 called[defudge(v)] = true;
83 return 0;
84 }
85
86 int die(const uint32_t &UU(v), const uint32_t UU(idx), void *const UU(unused));
die(const uint32_t & UU (v),const uint32_t UU (idx),void * const UU (unused))87 int die(const uint32_t &UU(v), const uint32_t UU(idx), void *const UU(unused)) {
88 abort();
89 return 0; // hahaha
90 }
91
run_test(uint32_t nelts)92 static void run_test(uint32_t nelts) {
93 assert(nelts % 10 == 0); // run_test depends on nelts being a multiple of 10
94
95 omt<uint32_t, uint32_t, true> omt;
96 omt.create();
97 omt.verify_marks_consistent();
98 for (uint32_t i = 0; i < nelts; ++i) {
99 omt.insert_at(fudge(i), i);
100 }
101 omt.verify_marks_consistent();
102
103 int r;
104 for (uint32_t i = 0; i < nelts / 10; ++i) {
105 r = omt.iterate_and_mark_range<bool, test_iterator>(i * 10, i * 10 + 5, nullptr);
106 invariant_zero(r);
107 omt.verify_marks_consistent();
108 }
109
110 bool called[nelts];
111 ZERO_ARRAY(called);
112 r = omt.iterate_over_marked<bool, check_iterator_before>(called);
113 invariant_zero(r);
114 for (uint32_t i = 0; i < nelts; ++i) {
115 if (i % 10 < 5) {
116 invariant(called[i]);
117 } else {
118 invariant(!called[i]);
119 }
120 }
121 omt.verify_marks_consistent();
122
123 invariant(omt.size() == nelts);
124
125 omt.delete_all_marked();
126 omt.verify_marks_consistent();
127
128 invariant(omt.size() * 2 == nelts);
129
130 r = omt.iterate_over_marked<void, die>(nullptr);
131 invariant_zero(r);
132
133 ZERO_ARRAY(called);
134 r = omt.iterate<bool, check_iterator_after>(called);
135 invariant_zero(r);
136 omt.verify_marks_consistent();
137
138 for (uint32_t i = 0; i < nelts; ++i) {
139 if (i % 10 < 5) {
140 invariant(!called[i]);
141 } else {
142 invariant(called[i]);
143 }
144 }
145
146 omt.destroy();
147 }
148
149 typedef omt<uint32_t, uint32_t, true> stress_omt;
150
151 int int_heaviside(const uint32_t &v, const uint32_t &target);
int_heaviside(const uint32_t & v,const uint32_t & target)152 int int_heaviside(const uint32_t &v, const uint32_t &target) {
153 return (v > target) - (v < target);
154 }
155
156 struct stress_shared {
157 stress_omt *omt;
158 volatile bool running;
159 struct st_rwlock lock;
160 toku_mutex_t mutex;
161 int num_marker_threads;
162 };
163
164 struct reader_extra {
165 int tid;
166 stress_shared *shared;
167 uint64_t iterations;
168 uint64_t last_iteration;
169 char buf_read[8];
170 char buf_write[8];
171 struct random_data rand_read;
172 struct random_data rand_write;
173 };
174
generate_range(struct random_data * rng,const struct stress_shared & shared,uint32_t * begin,uint32_t * limit)175 static void generate_range(struct random_data *rng, const struct stress_shared &shared, uint32_t *begin, uint32_t *limit) {
176 const uint32_t nelts = shared.omt->size();
177 double range_limit_d = nelts;
178 range_limit_d /= 1000;
179 range_limit_d /= shared.num_marker_threads;
180 range_limit_d += 1;
181 uint32_t range_limit = static_cast<uint32_t>(range_limit_d);
182 if (range_limit < 5) {
183 range_limit = 5;
184 }
185 if (range_limit > 1000) {
186 range_limit = 1000;
187 }
188 *begin = rand_choices(rng, nelts - 1);
189 if (*begin + range_limit > nelts) {
190 range_limit = nelts - *begin;
191 }
192 *limit = *begin + rand_choices(rng, range_limit);
193 }
194
195 struct pair {
196 uint32_t begin;
197 uint32_t limit;
198 };
199
200 int mark_read_iterator(const uint32_t &UU(v), const uint32_t idx, struct pair * const pair);
mark_read_iterator(const uint32_t & UU (v),const uint32_t idx,struct pair * const pair)201 int mark_read_iterator(const uint32_t &UU(v), const uint32_t idx, struct pair * const pair) {
202 invariant(defudge(v) == idx);
203 invariant(idx >= pair->begin);
204 invariant(idx < pair->limit);
205 return 0;
206 }
207
stress_mark_worker(void * extrav)208 static void *stress_mark_worker(void *extrav) {
209 struct reader_extra *CAST_FROM_VOIDP(extra, extrav);
210 struct stress_shared &shared = *extra->shared;
211 toku_mutex_t &mutex = shared.mutex;
212
213 while (shared.running) {
214 toku_mutex_lock(&mutex);
215 rwlock_read_lock(&shared.lock, &mutex);
216 toku_mutex_unlock(&mutex);
217
218 struct pair range;
219 generate_range(&extra->rand_read, shared, &range.begin, &range.limit);
220
221 shared.omt->iterate_and_mark_range<pair, mark_read_iterator>(range.begin, range.limit, &range);
222
223 ++extra->iterations;
224
225 toku_mutex_lock(&mutex);
226 rwlock_read_unlock(&shared.lock);
227 toku_mutex_unlock(&mutex);
228
229 usleep(1);
230 }
231
232 return nullptr;
233 }
234
235 template<typename T>
236 class array_ftor {
237 int m_count;
238 T *m_array;
239 public:
array_ftor(int size)240 array_ftor(int size) : m_count(0) {
241 XMALLOC_N(size, m_array);
242 }
~array_ftor()243 ~array_ftor() {
244 toku_free(m_array);
245 }
operator ()(const T & x)246 void operator() (const T &x) { m_array[m_count++] = x; }
247 template<class callback_t>
iterate(callback_t & cb) const248 void iterate(callback_t &cb) const {
249 for (int i = 0; i < m_count; ++i) {
250 cb(m_array[i]);
251 }
252 }
253 };
254
255 int use_array_ftor(const uint32_t &v, const uint32_t UU(idx), array_ftor<uint32_t> *const fp);
use_array_ftor(const uint32_t & v,const uint32_t UU (idx),array_ftor<uint32_t> * const fp)256 int use_array_ftor(const uint32_t &v, const uint32_t UU(idx), array_ftor<uint32_t> *const fp) {
257 array_ftor<uint32_t> &f = *fp;
258 f(v);
259 return 0;
260 }
261
262 class inserter {
263 stress_omt *m_omt;
264 public:
inserter(stress_omt * omt)265 inserter(stress_omt *omt) : m_omt(omt) {}
operator ()(const uint32_t & x)266 void operator() (const uint32_t &x) {
267 m_omt->insert<uint32_t, int_heaviside>(x, x, nullptr);
268 }
269 };
270
271 /*
272 * split the range evenly/not evenly between marker threads
273 * context tells it the range
274 * context also holds iteration number
275 *
276 * N threads
277 * N 'contexts' holds iteration number, seed
278 *
279 * create rng based on seed
280 * loop:
281 * generate random range. Mark that range, increment iteration number
282 *
283 *
284 *
285 *
286 * for each context
287 * create rng based on context->last_seed
288 * loop (iteration number times)
289 * mark (in array) random range
290 * context->last_seed := context->seed
291 * check the array and the omt
292 *
293 */
294
simulate_reader_marks_on_array(struct reader_extra * const reader,const struct stress_shared & shared,bool * const should_be_marked)295 static void simulate_reader_marks_on_array(struct reader_extra *const reader, const struct stress_shared &shared, bool *const should_be_marked) {
296 if (verbose) {
297 fprintf(stderr, "thread %d ran %" PRIu64 " iterations\n", reader->tid, reader->iterations - reader->last_iteration);
298 }
299 for (; reader->last_iteration < reader->iterations; ++reader->last_iteration) {
300 uint32_t begin;
301 uint32_t limit;
302
303 generate_range(&reader->rand_write, shared, &begin, &limit);
304
305 for (uint32_t i = begin; i < limit; i++) {
306 should_be_marked[i] = true;
307 }
308 }
309 }
310
311 int copy_marks(const uint32_t &v, const uint32_t idx, bool * const is_marked);
copy_marks(const uint32_t & v,const uint32_t idx,bool * const is_marked)312 int copy_marks(const uint32_t &v, const uint32_t idx, bool * const is_marked) {
313 invariant(defudge(v) == idx);
314 is_marked[idx] = true;
315 return 0;
316 }
317
count_true(const bool * const bools,uint32_t n)318 static inline uint32_t count_true(const bool *const bools, uint32_t n) {
319 uint32_t count = 0;
320 for (uint32_t i = 0; i < n; ++i) {
321 if (bools[i]) {
322 ++count;
323 }
324 }
325 return count;
326 }
327
stress_deleter(struct reader_extra * const readers,int num_marker_threads,stress_omt * omt)328 static void stress_deleter(struct reader_extra *const readers, int num_marker_threads, stress_omt *omt) {
329 // Verify (iterate_over_marked) agrees exactly with iterate_and_mark_range (multithreaded)
330 stress_shared &shared = *readers[0].shared;
331 bool should_be_marked[omt->size()];
332 ZERO_ARRAY(should_be_marked);
333
334 for (int i = 0; i < num_marker_threads; i++) {
335 simulate_reader_marks_on_array(&readers[i], shared, should_be_marked);
336 }
337
338 bool is_marked_according_to_iterate[omt->size()];
339 ZERO_ARRAY(is_marked_according_to_iterate);
340
341 omt->verify_marks_consistent();
342 omt->iterate_over_marked<bool, copy_marks>(&is_marked_according_to_iterate[0]);
343 omt->verify_marks_consistent();
344
345 invariant(!memcmp(should_be_marked, is_marked_according_to_iterate, sizeof(should_be_marked)));
346
347 if (verbose) {
348 double frac_marked = count_true(should_be_marked, omt->size());
349 frac_marked /= omt->size();
350
351 fprintf(stderr, "Marked: %0.4f\n", frac_marked);
352 omt->verify_marks_consistent();
353 }
354
355 array_ftor<uint32_t> aftor(omt->size());
356 omt->iterate_over_marked<array_ftor<uint32_t>, use_array_ftor>(&aftor);
357 omt->delete_all_marked();
358 omt->verify_marks_consistent();
359 omt->iterate_over_marked<void, die>(nullptr);
360 inserter ins(omt);
361 aftor.iterate(ins);
362 omt->verify_marks_consistent();
363 }
364
stress_delete_worker(void * extrav)365 static void *stress_delete_worker(void *extrav) {
366 reader_extra *CAST_FROM_VOIDP(readers, extrav);
367 stress_shared &shared = *readers[0].shared;
368 int num_marker_threads = shared.num_marker_threads;
369 toku_mutex_t &mutex = shared.mutex;
370 const double repetitions = 20;
371 for (int i = 0; i < repetitions; ++i) {
372 // sleep 0 - 0.15s
373 // early iterations sleep for a short time
374 // later iterations sleep longer
375 int sleep_for = 1000 * 100 * (1.5 * (i+1) / repetitions);
376 usleep(sleep_for);
377
378 toku_mutex_lock(&mutex);
379 rwlock_write_lock(&shared.lock, &mutex);
380 toku_mutex_unlock(&mutex);
381
382 stress_deleter(readers, num_marker_threads, shared.omt);
383
384 toku_mutex_lock(&mutex);
385 rwlock_write_unlock(&shared.lock);
386 toku_mutex_unlock(&mutex);
387 }
388 toku_sync_bool_compare_and_swap(&shared.running, true, false);
389 return nullptr;
390 }
391
stress_test(int nelts)392 static void stress_test(int nelts) {
393 stress_omt omt;
394 omt.create();
395 for (int i = 0; i < nelts; ++i) {
396 omt.insert_at(fudge(i), i);
397 }
398
399 const int num_marker_threads = 5;
400 struct stress_shared extra;
401 ZERO_STRUCT(extra);
402 extra.omt = &omt;
403 toku_mutex_init(toku_uninstrumented, &extra.mutex, nullptr);
404 rwlock_init(toku_uninstrumented, &extra.lock);
405 extra.running = true;
406 extra.num_marker_threads = num_marker_threads;
407
408 struct reader_extra readers[num_marker_threads];
409 ZERO_ARRAY(readers);
410
411 srandom(time(NULL));
412 toku_pthread_t marker_threads[num_marker_threads];
413 for (int i = 0; i < num_marker_threads; ++i) {
414 struct reader_extra &reader = readers[i];
415 reader.tid = i;
416 reader.shared = &extra;
417
418 int r;
419 int seed = random();
420 r = myinitstate_r(seed, reader.buf_read, 8, &reader.rand_read);
421 invariant_zero(r);
422 r = myinitstate_r(seed, reader.buf_write, 8, &reader.rand_write);
423 invariant_zero(r);
424
425 toku_pthread_create(toku_uninstrumented,
426 &marker_threads[i],
427 nullptr,
428 stress_mark_worker,
429 &reader);
430 }
431
432 toku_pthread_t deleter_thread;
433 toku_pthread_create(toku_uninstrumented,
434 &deleter_thread,
435 nullptr,
436 stress_delete_worker,
437 &readers[0]);
438 toku_pthread_join(deleter_thread, NULL);
439
440 for (int i = 0; i < num_marker_threads; ++i) {
441 toku_pthread_join(marker_threads[i], NULL);
442 }
443
444 rwlock_destroy(&extra.lock);
445 toku_mutex_destroy(&extra.mutex);
446
447 omt.destroy();
448 }
449
450 } // end namespace test
451
452 } // end namespace toku
453
test_main(int argc,const char * argv[])454 int test_main(int argc, const char *argv[]) {
455 default_parse_args(argc, argv);
456
457 for (int i = 10; i <= 80; i*=2) {
458 toku::test::run_test(i);
459 }
460
461 toku::test::run_test(9000);
462
463 toku::test::stress_test(1000 * 100);
464
465 return 0;
466 }
467