1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 /* This code can either test the PARTITIONED_COUNTER abstraction or it can time various implementations. */
40 
41 /* Try to make counter that requires no cache misses to increment, and to get the value can be slow.
42  * I don't care much about races between the readers and writers on the counter.
43  *
44  * The problem: We observed that incrementing a counter with multiple threads is quite expensive.
45  * Here are some performance numbers:
46  * Machines:  mork or mindy (Intel Xeon L5520 2.27GHz)
47  *            bradley's 4-core laptop laptop (Intel Core i7-2640M 2.80GHz) sandy bridge
48  *            alf       16-core server (xeon E5-2665 2.4GHz) sandybridge
49  *
50  *      mork  mindy  bradley  alf
51  *     1.22ns  1.07ns  1.27ns  0.61ns   to do a ++, but it's got a race in it.
52  *    27.11ns 20.47ns 18.75ns 34.15ns   to do a sync_fetch_and_add().
53  *     0.26ns  0.29ns  0.71ns  0.19ns   to do with a single version of a counter
54  *     0.35ns  0.33ns  0.69ns  0.18ns   pure thread-local variable (no way to add things up)
55  *             0.76ns  1.50ns  0.35ns   partitioned_counter.c (using link-time optimization, otherwise the function all overwhelms everything)
56  *     2.21ns          3.32ns  0.70ns   partitioned_counter.c (using gcc, the C version at r46097, not C++)  This one is a little slower because it has an extra branch in it.
57  *
58  * Surprisingly, compiling this code without -fPIC doesn't make it any faster (even the pure thread-local variable is the same).  -fPIC access to
59  * thread-local variables look slower since they have a function all, but they don't seem to be any slower in practice.  In fact, even the puretl-ptr test
60  * which simply increments a thread-local pointer is basically the same speed as accessing thread_local variable.
61  *
62  * How it works.  Each thread has a thread-local counter structure with an integer in it.  To increment, we increment the thread-local structure.
63  *   The other operation is to query the counters to get the sum of all the thread-local variables.
64  *   The first time a pthread increments the variable we add the variable to a linked list.
65  *   When a pthread ends, we use the pthread_key destructor to remove the variable from the linked list.  We also have to remember the sum of everything.
66  *    that has been removed from the list.
67  *   To get the sum we add the sum of the destructed items, plus everything in the list.
68  *
69  */
70 
71 #include <pthread.h>
72 #include <stdio.h>
73 #include <stdlib.h>
74 #include <string.h>
75 #include <sys/time.h>
76 #include <unistd.h>
77 #include <toku_race_tools.h>
78 #include <toku_assert.h>
79 #include <portability/toku_atomic.h>
80 #include <memory.h>
81 #include <util/partitioned_counter.h>
82 #include "test.h"
83 
84 // The test code includes the fastest version I could figure out to make, implemented below.
85 
86 struct counter_s {
87     bool inited;
88     volatile int counter;
89     struct counter_s *prev, *next;
90     int myid;
91 };
92 static __thread struct counter_s counter = {false,0, NULL,NULL,0};
93 
94 static int finished_counter=0; // counter for all threads that are done.
95 
96 // We use a single mutex for anything complex.  We'd like to use a mutex per partitioned counter, but we must cope with the possibility of a race between
97 // a terminating pthread (which calls destroy_counter()), and a call to the counter destructor.  So we use a global mutex.
98 static pthread_mutex_t pc_mutex = PTHREAD_MUTEX_INITIALIZER;
99 static struct counter_s *head=NULL;
100 static pthread_key_t   counter_key;
101 
pc_lock(void)102 static void pc_lock (void)
103 // Effect: Lock the pc mutex.
104 {
105     int r = pthread_mutex_lock(&pc_mutex);
106     assert(r==0);
107 }
108 
pc_unlock(void)109 static void pc_unlock (void)
110 // Effect: Unlock the pc mutex.
111 {
112     int r = pthread_mutex_unlock(&pc_mutex);
113     assert(r==0);
114 }
115 
destroy_counter(void * counterp)116 static void destroy_counter (void *counterp)
117 // Effect: This is the function passed to pthread_key_create that is to run whenever a thread terminates.
118 //   The thread-local part of the counter must be copied into the shared state, and the thread-local part of the counter must be
119 //   removed from the linked list of all thread-local parts.
120 {
121     assert((struct counter_s*)counterp==&counter);
122     pc_lock();
123     if (counter.prev==NULL) {
124 	assert(head==&counter);
125 	head = counter.next;
126     } else {
127 	counter.prev->next = counter.next;
128     }
129     if (counter.next!=NULL) {
130 	counter.next->prev = counter.prev;
131     }
132     finished_counter += counter.counter;
133     TOKU_VALGRIND_HG_ENABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // stop ignoring races
134     //printf("finished counter now %d\n", finished_counter);
135     pc_unlock();
136 }
137 
138 static int idcounter=0;
139 
increment(void)140 static inline void increment (void) {
141     if (!counter.inited) {
142         pc_lock();
143         struct counter_s *cp = &counter;
144 	{ int r = pthread_setspecific(counter_key, cp); assert(r==0); }
145 	cp->prev = NULL;
146 	cp->next = head;
147 	if (head!=NULL) {
148 	    head->prev = cp;
149 	}
150         head = cp;
151 	cp->counter = 0;
152 	cp->inited = true;
153 	cp->myid = idcounter++;
154 	TOKU_VALGRIND_HG_DISABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // the counter increment is kind of racy.
155         pc_unlock();
156     }
157     counter.counter++;
158 }
159 
getvals(void)160 static int getvals (void) {
161     pc_lock();
162     int sum=finished_counter;
163     for (struct counter_s *p=head; p; p=p->next) {
164 	sum+=p->counter;
165     }
166     pc_unlock();
167     return sum;
168 }
169 
170 /**********************************************************************************/
171 /* And now for some actual test code.                                             */
172 /**********************************************************************************/
173 
174 static const int N=10000000;
175 static const int T=20;
176 
177 
178 PARTITIONED_COUNTER pc;
pc_doit(void * v)179 static void *pc_doit (void *v) {
180     for (int i=0; i<N; i++) {
181 	increment_partitioned_counter(pc, 1);
182     }
183     //printf("val=%ld\n", read_partitioned_counter(pc));
184     return v;
185 }
186 
new_doit(void * v)187 static void* new_doit (void* v) {
188     for (int i=0; i<N; i++) {
189 	increment();
190 	//if (i%0x2000 == 0) sched_yield();
191     }
192     if (0) printf("done id=%d, getvals=%d\n", counter.myid, getvals());
193     return v;
194 }
195 
196 static int oldcounter=0;
197 
old_doit(void * v)198 static void* old_doit (void* v) {
199     for (int i=0; i<N; i++) {
200 	(void)toku_sync_fetch_and_add(&oldcounter, 1);
201 	//if (i%0x1000 == 0) sched_yield();
202     }
203     return v;
204 }
205 
206 static volatile int oldcounter_nonatomic=0;
207 
old_doit_nonatomic(void * v)208 static void* old_doit_nonatomic (void* v) {
209     for (int i=0; i<N; i++) {
210 	oldcounter_nonatomic++;
211 	//if (i%0x1000 == 0) sched_yield();
212     }
213     return v;
214 }
215 
216 static __thread volatile int thread_local_counter=0;
tl_doit(void * v)217 static void* tl_doit (void *v) {
218     for (int i=0; i<N; i++) {
219 	thread_local_counter++;
220     }
221     return v;
222 }
223 
tdiff(struct timeval * start,struct timeval * end)224 static float tdiff (struct timeval *start, struct timeval *end) {
225     return (end->tv_sec-start->tv_sec) +1e-6*(end->tv_usec - start->tv_usec);
226 }
227 
pt_create(pthread_t * thread,void * (* f)(void *),void * extra)228 static void pt_create (pthread_t *thread, void *(*f)(void*), void *extra) {
229     int r = pthread_create(thread, NULL, f, extra);
230     assert(r==0);
231 }
232 
pt_join(pthread_t thread,void * expect_extra)233 static void pt_join (pthread_t thread, void *expect_extra) {
234     void *result;
235     int r = pthread_join(thread, &result);
236     assert(r==0);
237     assert(result==expect_extra);
238 }
239 
timeit(const char * description,void * (* f)(void *))240 static void timeit (const char *description, void* (*f)(void*)) {
241     struct timeval start, end;
242     pthread_t threads[T];
243     gettimeofday(&start, 0);
244     for (int i=0; i<T; i++) {
245 	pt_create(&threads[i], f, NULL);
246     }
247     for (int i=0; i<T; i++) {
248 	pt_join(threads[i], NULL);
249     }
250     gettimeofday(&end, 0);
251     printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
252 }
253 
254 // Do a measurement where it really is only a pointer dereference to increment the variable, which is thread local.
tl_doit_ptr(void * v)255 static void* tl_doit_ptr (void *v) {
256     volatile uint64_t *p = (uint64_t *)v;
257     for (int i=0; i<N; i++) {
258 	(*p)++;
259     }
260     return v;
261 }
262 
263 
timeit_with_thread_local_pointer(const char * description,void * (* f)(void *))264 static void timeit_with_thread_local_pointer (const char *description, void* (*f)(void*)) {
265     struct timeval start, end;
266     pthread_t threads[T];
267     struct { uint64_t values[8] __attribute__((__aligned__(64))); } values[T]; // pad to different cache lines.
268     gettimeofday(&start, 0);
269     for (int i=0; i<T; i++) {
270         values[i].values[0]=0;
271 	pt_create(&threads[i], f, &values[i].values[0]);
272     }
273     for (int i=0; i<T; i++) {
274 	pt_join(threads[i], &values[i].values[0]);
275     }
276     gettimeofday(&end, 0);
277     printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
278 }
279 
280 static int verboseness_cmdarg=0;
281 static bool time_cmdarg=false;
282 
parse_args(int argc,const char * argv[])283 static void parse_args (int argc, const char *argv[]) {
284     const char *progname = argv[1];
285     argc--; argv++;
286     while (argc>0) {
287 	if (strcmp(argv[0], "-v")==0) verboseness_cmdarg++;
288 	else if (strcmp(argv[0], "--time")==0) time_cmdarg=true;
289 	else {
290 	    printf("Usage: %s [-v] [--time]\n Default is to run tests.  --time produces timing output.\n", progname);
291 	    exit(1);
292 	}
293 	argc--; argv++;
294     }
295 }
296 
do_timeit(void)297 static void do_timeit (void) {
298     { int r = pthread_key_create(&counter_key, destroy_counter); assert(r==0); }
299     printf("%d threads\n%d increments per thread\n", T, N);
300     timeit("++",         old_doit_nonatomic);
301     timeit("atomic++",   old_doit);
302     timeit("fast",       new_doit);
303     timeit("puretl",     tl_doit);
304     timeit_with_thread_local_pointer("puretl-ptr", tl_doit_ptr);
305     pc = create_partitioned_counter();
306     timeit("pc",       pc_doit);
307     destroy_partitioned_counter(pc);
308 }
309 
310 struct test_arguments {
311     PARTITIONED_COUNTER pc;
312     uint64_t            limit;
313     uint64_t            total_increment_per_writer;
314     volatile uint64_t   unfinished_count;
315 };
316 
reader_test_fun(void * ta_v)317 static void *reader_test_fun (void *ta_v) {
318     struct test_arguments *ta = (struct test_arguments *)ta_v;
319     uint64_t lastval = 0;
320     while (ta->unfinished_count>0) {
321 	uint64_t thisval = read_partitioned_counter(ta->pc);
322 	assert(lastval <= thisval);
323 	assert(thisval <= ta->limit+2);
324 	lastval = thisval;
325 	if (verboseness_cmdarg && (0==(thisval & (thisval-1)))) printf("ufc=%" PRIu64 " Thisval=%" PRIu64 "\n", ta->unfinished_count,thisval);
326     }
327     uint64_t thisval = read_partitioned_counter(ta->pc);
328     assert(thisval==ta->limit+2); // we incremented two extra times in the test
329     return ta_v;
330 }
331 
writer_test_fun(void * ta_v)332 static void *writer_test_fun (void *ta_v) {
333     struct test_arguments *ta = (struct test_arguments *)ta_v;
334     for (uint64_t i=0; i<ta->total_increment_per_writer; i++) {
335 	if (i%1000 == 0) sched_yield();
336 	increment_partitioned_counter(ta->pc, 1);
337     }
338     uint64_t c __attribute__((__unused__)) = toku_sync_fetch_and_sub(&ta->unfinished_count, 1);
339     return ta_v;
340 }
341 
342 
do_testit(void)343 static void do_testit (void) {
344     const int NGROUPS = 2;
345     uint64_t limits[NGROUPS];
346     limits [0] = 2000000;
347     limits [1] = 1000000;
348     uint64_t n_writers[NGROUPS];
349     n_writers[0] = 20;
350     n_writers[1] = 40;
351     struct test_arguments tas[NGROUPS];
352     pthread_t reader_threads[NGROUPS];
353     pthread_t *writer_threads[NGROUPS];
354     for (int i=0; i<NGROUPS; i++) {
355         tas[i].pc                         = create_partitioned_counter();
356 	tas[i].limit                      = limits[i];
357 	tas[i].unfinished_count           = n_writers[i];
358 	tas[i].total_increment_per_writer = limits[i]/n_writers[i];
359 	assert(tas[i].total_increment_per_writer * n_writers[i] == limits[i]);
360 	pt_create(&reader_threads[i], reader_test_fun, &tas[i]);
361         increment_partitioned_counter(tas[i].pc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
362 	MALLOC_N(n_writers[i], writer_threads[i]);
363 	for (uint64_t j=0; j<n_writers[i] ; j++) {
364 	    pt_create(&writer_threads[i][j], writer_test_fun, &tas[i]);
365 	}
366         increment_partitioned_counter(tas[i].pc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
367     }
368     for (int i=0; i<NGROUPS; i++) {
369 	pt_join(reader_threads[i], &tas[i]);
370 	for (uint64_t j=0; j<n_writers[i] ; j++) {
371 	    pt_join(writer_threads[i][j], &tas[i]);
372 	}
373 	toku_free(writer_threads[i]);
374         destroy_partitioned_counter(tas[i].pc);
375     }
376 }
377 
378 volatile int spinwait=0;
test2_fun(void * mypc_v)379 static void* test2_fun (void* mypc_v) {
380     PARTITIONED_COUNTER mypc = (PARTITIONED_COUNTER)mypc_v;
381     increment_partitioned_counter(mypc, 3);
382     spinwait=1;
383     while (spinwait==1);
384     // mypc no longer points at a valid data structure.
385     return NULL;
386 }
387 
do_testit2(void)388 static void do_testit2 (void)
389 // This test checks to see what happens if a thread is still live when we destruct a counter.
390 //   A thread increments the counter, then lets us know through a spin wait, then waits until we destroy the counter.
391 {
392     pthread_t t;
393     TOKU_VALGRIND_HG_DISABLE_CHECKING(&spinwait, sizeof(spinwait)); // this is a racy volatile variable.
394     {
395         PARTITIONED_COUNTER mypc = create_partitioned_counter();
396         increment_partitioned_counter(mypc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
397         pt_create(&t, test2_fun, mypc);
398         while(spinwait==0); // wait until he incremented the counter.
399         increment_partitioned_counter(mypc, -1);
400         assert(read_partitioned_counter(mypc)==3);
401         destroy_partitioned_counter(mypc);
402     } // leave scope, so the counter goes away.
403     spinwait=2; // tell the other guy to finish up.
404     pt_join(t, NULL);
405 }
406 
test_main(int argc,const char * argv[])407 int test_main (int argc, const char *argv[]) {
408     parse_args(argc, argv);
409     if (time_cmdarg) {
410 	do_timeit();
411     } else {
412 	do_testit();
413         do_testit2();
414     }
415     return 0;
416 }
417