1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "test.h"
40 
41 #include <toku_portability.h>
42 #include <errno.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <unistd.h>
46 #include <db.h>
47 #include <string.h>
48 
49 #include <memory.h>
50 
51 #include <portability/toku_atomic.h>
52 #include <portability/toku_pthread.h>
53 #include <portability/toku_random.h>
54 
55 #include <util/omt.h>
56 #include <util/rwlock.h>
57 
58 namespace toku {
59 
60 namespace test {
61 
fudge(const uint32_t x)62 static inline uint32_t fudge(const uint32_t x) { return x + 300; }
defudge(const uint32_t fx)63 static inline uint32_t defudge(const uint32_t fx) { return fx - 300; }
64 
65 int test_iterator(const uint32_t &v, const uint32_t idx, bool *const UU(unused));
test_iterator(const uint32_t & v,const uint32_t idx,bool * const UU (unused))66 int test_iterator(const uint32_t &v, const uint32_t idx, bool *const UU(unused)) {
67     invariant(defudge(v) == idx);
68     return 0;
69 }
70 
71 int check_iterator_before(const uint32_t &v, const uint32_t idx, bool *const called);
check_iterator_before(const uint32_t & v,const uint32_t idx,bool * const called)72 int check_iterator_before(const uint32_t &v, const uint32_t idx, bool *const called) {
73     invariant(defudge(v) == idx);
74     invariant(idx % 10 < 5);
75     called[idx] = true;
76     return 0;
77 }
78 
79 int check_iterator_after(const uint32_t &v, const uint32_t UU(idx), bool *const called);
check_iterator_after(const uint32_t & v,const uint32_t UU (idx),bool * const called)80 int check_iterator_after(const uint32_t &v, const uint32_t UU(idx), bool *const called) {
81     invariant(defudge(v) % 10 >= 5);
82     called[defudge(v)] = true;
83     return 0;
84 }
85 
86 int die(const uint32_t &UU(v), const uint32_t UU(idx), void *const UU(unused));
die(const uint32_t & UU (v),const uint32_t UU (idx),void * const UU (unused))87 int die(const uint32_t &UU(v), const uint32_t UU(idx), void *const UU(unused)) {
88     abort();
89     return 0; // hahaha
90 }
91 
run_test(uint32_t nelts)92 static void run_test(uint32_t nelts) {
93     assert(nelts % 10 == 0);  // run_test depends on nelts being a multiple of 10
94 
95     omt<uint32_t, uint32_t, true> omt;
96     omt.create();
97     omt.verify_marks_consistent();
98     for (uint32_t i = 0; i < nelts; ++i) {
99         omt.insert_at(fudge(i), i);
100     }
101     omt.verify_marks_consistent();
102 
103     int r;
104     for (uint32_t i = 0; i < nelts / 10; ++i) {
105         r = omt.iterate_and_mark_range<bool, test_iterator>(i * 10, i * 10 + 5, nullptr);
106         invariant_zero(r);
107         omt.verify_marks_consistent();
108     }
109 
110     bool called[nelts];
111     ZERO_ARRAY(called);
112     r = omt.iterate_over_marked<bool, check_iterator_before>(called);
113     invariant_zero(r);
114     for (uint32_t i = 0; i < nelts; ++i) {
115         if (i % 10 < 5) {
116             invariant(called[i]);
117         } else {
118             invariant(!called[i]);
119         }
120     }
121     omt.verify_marks_consistent();
122 
123     invariant(omt.size() == nelts);
124 
125     omt.delete_all_marked();
126     omt.verify_marks_consistent();
127 
128     invariant(omt.size() * 2 == nelts);
129 
130     r = omt.iterate_over_marked<void, die>(nullptr);
131     invariant_zero(r);
132 
133     ZERO_ARRAY(called);
134     r = omt.iterate<bool, check_iterator_after>(called);
135     invariant_zero(r);
136     omt.verify_marks_consistent();
137 
138     for (uint32_t i = 0; i < nelts; ++i) {
139         if (i % 10 < 5) {
140             invariant(!called[i]);
141         } else {
142             invariant(called[i]);
143         }
144     }
145 
146     omt.destroy();
147 }
148 
149 typedef omt<uint32_t, uint32_t, true> stress_omt;
150 
151 int int_heaviside(const uint32_t &v, const uint32_t &target);
int_heaviside(const uint32_t & v,const uint32_t & target)152 int int_heaviside(const uint32_t &v, const uint32_t &target) {
153     return (v > target) - (v < target);
154 }
155 
156 struct stress_shared {
157     stress_omt *omt;
158     volatile bool running;
159     struct st_rwlock lock;
160     toku_mutex_t mutex;
161     int num_marker_threads;
162 };
163 
164 struct reader_extra {
165     int tid;
166     stress_shared *shared;
167     uint64_t iterations;
168     uint64_t last_iteration;
169     char buf_read[8];
170     char buf_write[8];
171     struct random_data rand_read;
172     struct random_data rand_write;
173 };
174 
generate_range(struct random_data * rng,const struct stress_shared & shared,uint32_t * begin,uint32_t * limit)175 static void generate_range(struct random_data *rng, const struct stress_shared &shared, uint32_t *begin, uint32_t *limit) {
176     const uint32_t nelts = shared.omt->size();
177     double range_limit_d = nelts;
178     range_limit_d /= 1000;
179     range_limit_d /= shared.num_marker_threads;
180     range_limit_d += 1;
181     uint32_t range_limit = static_cast<uint32_t>(range_limit_d);
182     if (range_limit < 5) {
183         range_limit = 5;
184     }
185     if (range_limit > 1000) {
186         range_limit = 1000;
187     }
188     *begin = rand_choices(rng, nelts - 1);
189     if (*begin + range_limit > nelts) {
190         range_limit = nelts - *begin;
191     }
192     *limit = *begin + rand_choices(rng, range_limit);
193 }
194 
195 struct pair {
196     uint32_t begin;
197     uint32_t limit;
198 };
199 
200 int mark_read_iterator(const uint32_t &UU(v), const uint32_t idx, struct pair * const pair);
mark_read_iterator(const uint32_t & UU (v),const uint32_t idx,struct pair * const pair)201 int mark_read_iterator(const uint32_t &UU(v), const uint32_t idx, struct pair * const pair) {
202     invariant(defudge(v) == idx);
203     invariant(idx >= pair->begin);
204     invariant(idx < pair->limit);
205     return 0;
206 }
207 
stress_mark_worker(void * extrav)208 static void *stress_mark_worker(void *extrav) {
209     struct reader_extra *CAST_FROM_VOIDP(extra, extrav);
210     struct stress_shared &shared = *extra->shared;
211     toku_mutex_t &mutex = shared.mutex;
212 
213     while (shared.running) {
214         toku_mutex_lock(&mutex);
215         rwlock_read_lock(&shared.lock, &mutex);
216         toku_mutex_unlock(&mutex);
217 
218         struct pair range;
219         generate_range(&extra->rand_read, shared, &range.begin, &range.limit);
220 
221         shared.omt->iterate_and_mark_range<pair, mark_read_iterator>(range.begin, range.limit, &range);
222 
223         ++extra->iterations;
224 
225         toku_mutex_lock(&mutex);
226         rwlock_read_unlock(&shared.lock);
227         toku_mutex_unlock(&mutex);
228 
229         usleep(1);
230     }
231 
232     return nullptr;
233 }
234 
235 template<typename T>
236 class array_ftor {
237     int m_count;
238     T *m_array;
239 public:
array_ftor(int size)240     array_ftor(int size) : m_count(0) {
241         XMALLOC_N(size, m_array);
242     }
~array_ftor()243     ~array_ftor() {
244         toku_free(m_array);
245     }
operator ()(const T & x)246     void operator() (const T &x) { m_array[m_count++] = x; }
247     template<class callback_t>
iterate(callback_t & cb) const248     void iterate(callback_t &cb) const {
249         for (int i = 0; i < m_count; ++i) {
250             cb(m_array[i]);
251         }
252     }
253 };
254 
255 int use_array_ftor(const uint32_t &v, const uint32_t UU(idx), array_ftor<uint32_t> *const fp);
use_array_ftor(const uint32_t & v,const uint32_t UU (idx),array_ftor<uint32_t> * const fp)256 int use_array_ftor(const uint32_t &v, const uint32_t UU(idx), array_ftor<uint32_t> *const fp) {
257     array_ftor<uint32_t> &f = *fp;
258     f(v);
259     return 0;
260 }
261 
262 class inserter {
263     stress_omt *m_omt;
264 public:
inserter(stress_omt * omt)265     inserter(stress_omt *omt) : m_omt(omt) {}
operator ()(const uint32_t & x)266     void operator() (const uint32_t &x) {
267         m_omt->insert<uint32_t, int_heaviside>(x, x, nullptr);
268     }
269 };
270 
271 /*
272  * split the range evenly/not evenly between marker threads
273  * context tells it the range
274  * context also holds iteration number
275  *
276  * N threads
277  * N 'contexts' holds iteration number, seed
278  *
279  * create rng based on seed
280  * loop:
281  *   generate random range.  Mark that range, increment iteration number
282  *
283  *
284  *
285  *
286  * for each context
287      * create rng based on context->last_seed
288      *   loop (iteration number times)
289      *     mark (in array) random range
290      * context->last_seed := context->seed
291  * check the array and the omt
292  *
293  */
294 
simulate_reader_marks_on_array(struct reader_extra * const reader,const struct stress_shared & shared,bool * const should_be_marked)295 static void simulate_reader_marks_on_array(struct reader_extra *const reader, const struct stress_shared &shared, bool *const should_be_marked) {
296     if (verbose) {
297         fprintf(stderr, "thread %d ran %" PRIu64 " iterations\n", reader->tid, reader->iterations - reader->last_iteration);
298     }
299     for (; reader->last_iteration < reader->iterations; ++reader->last_iteration) {
300         uint32_t begin;
301         uint32_t limit;
302 
303         generate_range(&reader->rand_write, shared, &begin, &limit);
304 
305         for (uint32_t i = begin; i < limit; i++) {
306             should_be_marked[i] = true;
307         }
308     }
309 }
310 
311 int copy_marks(const uint32_t &v, const uint32_t idx, bool * const is_marked);
copy_marks(const uint32_t & v,const uint32_t idx,bool * const is_marked)312 int copy_marks(const uint32_t &v, const uint32_t idx, bool * const is_marked) {
313     invariant(defudge(v) == idx);
314     is_marked[idx] = true;
315     return 0;
316 }
317 
count_true(const bool * const bools,uint32_t n)318 static inline uint32_t count_true(const bool *const bools, uint32_t n) {
319     uint32_t count = 0;
320     for (uint32_t i = 0; i < n; ++i) {
321         if (bools[i]) {
322             ++count;
323         }
324     }
325     return count;
326 }
327 
stress_deleter(struct reader_extra * const readers,int num_marker_threads,stress_omt * omt)328 static void stress_deleter(struct reader_extra *const readers, int num_marker_threads, stress_omt *omt) {
329     // Verify (iterate_over_marked) agrees exactly with iterate_and_mark_range (multithreaded)
330     stress_shared &shared = *readers[0].shared;
331     bool should_be_marked[omt->size()];
332     ZERO_ARRAY(should_be_marked);
333 
334     for (int i = 0; i < num_marker_threads; i++) {
335         simulate_reader_marks_on_array(&readers[i], shared, should_be_marked);
336     }
337 
338     bool is_marked_according_to_iterate[omt->size()];
339     ZERO_ARRAY(is_marked_according_to_iterate);
340 
341     omt->verify_marks_consistent();
342     omt->iterate_over_marked<bool, copy_marks>(&is_marked_according_to_iterate[0]);
343     omt->verify_marks_consistent();
344 
345     invariant(!memcmp(should_be_marked, is_marked_according_to_iterate, sizeof(should_be_marked)));
346 
347     if (verbose) {
348         double frac_marked = count_true(should_be_marked, omt->size());
349         frac_marked /= omt->size();
350 
351         fprintf(stderr, "Marked: %0.4f\n", frac_marked);
352         omt->verify_marks_consistent();
353     }
354 
355     array_ftor<uint32_t> aftor(omt->size());
356     omt->iterate_over_marked<array_ftor<uint32_t>, use_array_ftor>(&aftor);
357     omt->delete_all_marked();
358     omt->verify_marks_consistent();
359     omt->iterate_over_marked<void, die>(nullptr);
360     inserter ins(omt);
361     aftor.iterate(ins);
362     omt->verify_marks_consistent();
363 }
364 
stress_delete_worker(void * extrav)365 static void *stress_delete_worker(void *extrav) {
366     reader_extra *CAST_FROM_VOIDP(readers, extrav);
367     stress_shared &shared = *readers[0].shared;
368     int num_marker_threads = shared.num_marker_threads;
369     toku_mutex_t &mutex = shared.mutex;
370     const double repetitions = 20;
371     for (int i = 0; i < repetitions; ++i) {
372         // sleep 0 - 0.15s
373         // early iterations sleep for a short time
374         // later iterations sleep longer
375         int sleep_for = 1000 * 100 * (1.5 * (i+1) / repetitions);
376         usleep(sleep_for);
377 
378         toku_mutex_lock(&mutex);
379         rwlock_write_lock(&shared.lock, &mutex);
380         toku_mutex_unlock(&mutex);
381 
382         stress_deleter(readers, num_marker_threads, shared.omt);
383 
384         toku_mutex_lock(&mutex);
385         rwlock_write_unlock(&shared.lock);
386         toku_mutex_unlock(&mutex);
387     }
388     toku_sync_bool_compare_and_swap(&shared.running, true, false);
389     return nullptr;
390 }
391 
stress_test(int nelts)392 static void stress_test(int nelts) {
393     stress_omt omt;
394     omt.create();
395     for (int i = 0; i < nelts; ++i) {
396         omt.insert_at(fudge(i), i);
397     }
398 
399     const int num_marker_threads = 5;
400     struct stress_shared extra;
401     ZERO_STRUCT(extra);
402     extra.omt = &omt;
403     toku_mutex_init(toku_uninstrumented, &extra.mutex, nullptr);
404     rwlock_init(toku_uninstrumented, &extra.lock);
405     extra.running = true;
406     extra.num_marker_threads = num_marker_threads;
407 
408     struct reader_extra readers[num_marker_threads];
409     ZERO_ARRAY(readers);
410 
411     srandom(time(NULL));
412     toku_pthread_t marker_threads[num_marker_threads];
413     for (int i = 0; i < num_marker_threads; ++i) {
414         struct reader_extra &reader = readers[i];
415         reader.tid = i;
416         reader.shared = &extra;
417 
418         int r;
419         int seed = random();
420         r = myinitstate_r(seed, reader.buf_read, 8, &reader.rand_read);
421         invariant_zero(r);
422         r = myinitstate_r(seed, reader.buf_write, 8, &reader.rand_write);
423         invariant_zero(r);
424 
425         toku_pthread_create(toku_uninstrumented,
426                             &marker_threads[i],
427                             nullptr,
428                             stress_mark_worker,
429                             &reader);
430     }
431 
432     toku_pthread_t deleter_thread;
433     toku_pthread_create(toku_uninstrumented,
434                         &deleter_thread,
435                         nullptr,
436                         stress_delete_worker,
437                         &readers[0]);
438     toku_pthread_join(deleter_thread, NULL);
439 
440     for (int i = 0; i < num_marker_threads; ++i) {
441         toku_pthread_join(marker_threads[i], NULL);
442     }
443 
444     rwlock_destroy(&extra.lock);
445     toku_mutex_destroy(&extra.mutex);
446 
447     omt.destroy();
448 }
449 
450 } // end namespace test
451 
452 } // end namespace toku
453 
test_main(int argc,const char * argv[])454 int test_main(int argc, const char *argv[]) {
455     default_parse_args(argc, argv);
456 
457     for (int i = 10; i <= 80; i*=2) {
458         toku::test::run_test(i);
459     }
460 
461     toku::test::run_test(9000);
462 
463     toku::test::stress_test(1000 * 100);
464 
465     return 0;
466 }
467