1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 /* This code can either test the PARTITIONED_COUNTER abstraction or it can time various implementations. */
40
41 /* Try to make counter that requires no cache misses to increment, and to get the value can be slow.
42 * I don't care much about races between the readers and writers on the counter.
43 *
44 * The problem: We observed that incrementing a counter with multiple threads is quite expensive.
45 * Here are some performance numbers:
46 * Machines: mork or mindy (Intel Xeon L5520 2.27GHz)
47 * bradley's 4-core laptop laptop (Intel Core i7-2640M 2.80GHz) sandy bridge
48 * alf 16-core server (xeon E5-2665 2.4GHz) sandybridge
49 *
50 * mork mindy bradley alf
51 * 1.22ns 1.07ns 1.27ns 0.61ns to do a ++, but it's got a race in it.
52 * 27.11ns 20.47ns 18.75ns 34.15ns to do a sync_fetch_and_add().
53 * 0.26ns 0.29ns 0.71ns 0.19ns to do with a single version of a counter
54 * 0.35ns 0.33ns 0.69ns 0.18ns pure thread-local variable (no way to add things up)
55 * 0.76ns 1.50ns 0.35ns partitioned_counter.c (using link-time optimization, otherwise the function all overwhelms everything)
56 * 2.21ns 3.32ns 0.70ns partitioned_counter.c (using gcc, the C version at r46097, not C++) This one is a little slower because it has an extra branch in it.
57 *
58 * Surprisingly, compiling this code without -fPIC doesn't make it any faster (even the pure thread-local variable is the same). -fPIC access to
59 * thread-local variables look slower since they have a function all, but they don't seem to be any slower in practice. In fact, even the puretl-ptr test
60 * which simply increments a thread-local pointer is basically the same speed as accessing thread_local variable.
61 *
62 * How it works. Each thread has a thread-local counter structure with an integer in it. To increment, we increment the thread-local structure.
63 * The other operation is to query the counters to get the sum of all the thread-local variables.
64 * The first time a pthread increments the variable we add the variable to a linked list.
65 * When a pthread ends, we use the pthread_key destructor to remove the variable from the linked list. We also have to remember the sum of everything.
66 * that has been removed from the list.
67 * To get the sum we add the sum of the destructed items, plus everything in the list.
68 *
69 */
70
71 #include <pthread.h>
72 #include <stdio.h>
73 #include <stdlib.h>
74 #include <string.h>
75 #include <sys/time.h>
76 #include <unistd.h>
77 #include <toku_race_tools.h>
78 #include <toku_assert.h>
79 #include <portability/toku_atomic.h>
80 #include <memory.h>
81 #include <util/partitioned_counter.h>
82 #include "test.h"
83
84 // The test code includes the fastest version I could figure out to make, implemented below.
85
86 struct counter_s {
87 bool inited;
88 volatile int counter;
89 struct counter_s *prev, *next;
90 int myid;
91 };
92 static __thread struct counter_s counter = {false,0, NULL,NULL,0};
93
94 static int finished_counter=0; // counter for all threads that are done.
95
96 // We use a single mutex for anything complex. We'd like to use a mutex per partitioned counter, but we must cope with the possibility of a race between
97 // a terminating pthread (which calls destroy_counter()), and a call to the counter destructor. So we use a global mutex.
98 static pthread_mutex_t pc_mutex = PTHREAD_MUTEX_INITIALIZER;
99 static struct counter_s *head=NULL;
100 static pthread_key_t counter_key;
101
pc_lock(void)102 static void pc_lock (void)
103 // Effect: Lock the pc mutex.
104 {
105 int r = pthread_mutex_lock(&pc_mutex);
106 assert(r==0);
107 }
108
pc_unlock(void)109 static void pc_unlock (void)
110 // Effect: Unlock the pc mutex.
111 {
112 int r = pthread_mutex_unlock(&pc_mutex);
113 assert(r==0);
114 }
115
destroy_counter(void * counterp)116 static void destroy_counter (void *counterp)
117 // Effect: This is the function passed to pthread_key_create that is to run whenever a thread terminates.
118 // The thread-local part of the counter must be copied into the shared state, and the thread-local part of the counter must be
119 // removed from the linked list of all thread-local parts.
120 {
121 assert((struct counter_s*)counterp==&counter);
122 pc_lock();
123 if (counter.prev==NULL) {
124 assert(head==&counter);
125 head = counter.next;
126 } else {
127 counter.prev->next = counter.next;
128 }
129 if (counter.next!=NULL) {
130 counter.next->prev = counter.prev;
131 }
132 finished_counter += counter.counter;
133 TOKU_VALGRIND_HG_ENABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // stop ignoring races
134 //printf("finished counter now %d\n", finished_counter);
135 pc_unlock();
136 }
137
138 static int idcounter=0;
139
increment(void)140 static inline void increment (void) {
141 if (!counter.inited) {
142 pc_lock();
143 struct counter_s *cp = &counter;
144 { int r = pthread_setspecific(counter_key, cp); assert(r==0); }
145 cp->prev = NULL;
146 cp->next = head;
147 if (head!=NULL) {
148 head->prev = cp;
149 }
150 head = cp;
151 cp->counter = 0;
152 cp->inited = true;
153 cp->myid = idcounter++;
154 TOKU_VALGRIND_HG_DISABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // the counter increment is kind of racy.
155 pc_unlock();
156 }
157 counter.counter++;
158 }
159
getvals(void)160 static int getvals (void) {
161 pc_lock();
162 int sum=finished_counter;
163 for (struct counter_s *p=head; p; p=p->next) {
164 sum+=p->counter;
165 }
166 pc_unlock();
167 return sum;
168 }
169
170 /**********************************************************************************/
171 /* And now for some actual test code. */
172 /**********************************************************************************/
173
174 static const int N=10000000;
175 static const int T=20;
176
177
178 PARTITIONED_COUNTER pc;
pc_doit(void * v)179 static void *pc_doit (void *v) {
180 for (int i=0; i<N; i++) {
181 increment_partitioned_counter(pc, 1);
182 }
183 //printf("val=%ld\n", read_partitioned_counter(pc));
184 return v;
185 }
186
new_doit(void * v)187 static void* new_doit (void* v) {
188 for (int i=0; i<N; i++) {
189 increment();
190 //if (i%0x2000 == 0) sched_yield();
191 }
192 if (0) printf("done id=%d, getvals=%d\n", counter.myid, getvals());
193 return v;
194 }
195
196 static int oldcounter=0;
197
old_doit(void * v)198 static void* old_doit (void* v) {
199 for (int i=0; i<N; i++) {
200 (void)toku_sync_fetch_and_add(&oldcounter, 1);
201 //if (i%0x1000 == 0) sched_yield();
202 }
203 return v;
204 }
205
206 static volatile int oldcounter_nonatomic=0;
207
old_doit_nonatomic(void * v)208 static void* old_doit_nonatomic (void* v) {
209 for (int i=0; i<N; i++) {
210 oldcounter_nonatomic++;
211 //if (i%0x1000 == 0) sched_yield();
212 }
213 return v;
214 }
215
216 static __thread volatile int thread_local_counter=0;
tl_doit(void * v)217 static void* tl_doit (void *v) {
218 for (int i=0; i<N; i++) {
219 thread_local_counter++;
220 }
221 return v;
222 }
223
tdiff(struct timeval * start,struct timeval * end)224 static float tdiff (struct timeval *start, struct timeval *end) {
225 return (end->tv_sec-start->tv_sec) +1e-6*(end->tv_usec - start->tv_usec);
226 }
227
pt_create(pthread_t * thread,void * (* f)(void *),void * extra)228 static void pt_create (pthread_t *thread, void *(*f)(void*), void *extra) {
229 int r = pthread_create(thread, NULL, f, extra);
230 assert(r==0);
231 }
232
pt_join(pthread_t thread,void * expect_extra)233 static void pt_join (pthread_t thread, void *expect_extra) {
234 void *result;
235 int r = pthread_join(thread, &result);
236 assert(r==0);
237 assert(result==expect_extra);
238 }
239
timeit(const char * description,void * (* f)(void *))240 static void timeit (const char *description, void* (*f)(void*)) {
241 struct timeval start, end;
242 pthread_t threads[T];
243 gettimeofday(&start, 0);
244 for (int i=0; i<T; i++) {
245 pt_create(&threads[i], f, NULL);
246 }
247 for (int i=0; i<T; i++) {
248 pt_join(threads[i], NULL);
249 }
250 gettimeofday(&end, 0);
251 printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
252 }
253
254 // Do a measurement where it really is only a pointer dereference to increment the variable, which is thread local.
tl_doit_ptr(void * v)255 static void* tl_doit_ptr (void *v) {
256 volatile uint64_t *p = (uint64_t *)v;
257 for (int i=0; i<N; i++) {
258 (*p)++;
259 }
260 return v;
261 }
262
263
timeit_with_thread_local_pointer(const char * description,void * (* f)(void *))264 static void timeit_with_thread_local_pointer (const char *description, void* (*f)(void*)) {
265 struct timeval start, end;
266 pthread_t threads[T];
267 struct { uint64_t values[8] __attribute__((__aligned__(64))); } values[T]; // pad to different cache lines.
268 gettimeofday(&start, 0);
269 for (int i=0; i<T; i++) {
270 values[i].values[0]=0;
271 pt_create(&threads[i], f, &values[i].values[0]);
272 }
273 for (int i=0; i<T; i++) {
274 pt_join(threads[i], &values[i].values[0]);
275 }
276 gettimeofday(&end, 0);
277 printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
278 }
279
280 static int verboseness_cmdarg=0;
281 static bool time_cmdarg=false;
282
parse_args(int argc,const char * argv[])283 static void parse_args (int argc, const char *argv[]) {
284 const char *progname = argv[1];
285 argc--; argv++;
286 while (argc>0) {
287 if (strcmp(argv[0], "-v")==0) verboseness_cmdarg++;
288 else if (strcmp(argv[0], "--time")==0) time_cmdarg=true;
289 else {
290 printf("Usage: %s [-v] [--time]\n Default is to run tests. --time produces timing output.\n", progname);
291 exit(1);
292 }
293 argc--; argv++;
294 }
295 }
296
do_timeit(void)297 static void do_timeit (void) {
298 { int r = pthread_key_create(&counter_key, destroy_counter); assert(r==0); }
299 printf("%d threads\n%d increments per thread\n", T, N);
300 timeit("++", old_doit_nonatomic);
301 timeit("atomic++", old_doit);
302 timeit("fast", new_doit);
303 timeit("puretl", tl_doit);
304 timeit_with_thread_local_pointer("puretl-ptr", tl_doit_ptr);
305 pc = create_partitioned_counter();
306 timeit("pc", pc_doit);
307 destroy_partitioned_counter(pc);
308 }
309
310 struct test_arguments {
311 PARTITIONED_COUNTER pc;
312 uint64_t limit;
313 uint64_t total_increment_per_writer;
314 volatile uint64_t unfinished_count;
315 };
316
reader_test_fun(void * ta_v)317 static void *reader_test_fun (void *ta_v) {
318 struct test_arguments *ta = (struct test_arguments *)ta_v;
319 uint64_t lastval = 0;
320 while (ta->unfinished_count>0) {
321 uint64_t thisval = read_partitioned_counter(ta->pc);
322 assert(lastval <= thisval);
323 assert(thisval <= ta->limit+2);
324 lastval = thisval;
325 if (verboseness_cmdarg && (0==(thisval & (thisval-1)))) printf("ufc=%" PRIu64 " Thisval=%" PRIu64 "\n", ta->unfinished_count,thisval);
326 }
327 uint64_t thisval = read_partitioned_counter(ta->pc);
328 assert(thisval==ta->limit+2); // we incremented two extra times in the test
329 return ta_v;
330 }
331
writer_test_fun(void * ta_v)332 static void *writer_test_fun (void *ta_v) {
333 struct test_arguments *ta = (struct test_arguments *)ta_v;
334 for (uint64_t i=0; i<ta->total_increment_per_writer; i++) {
335 if (i%1000 == 0) sched_yield();
336 increment_partitioned_counter(ta->pc, 1);
337 }
338 uint64_t c __attribute__((__unused__)) = toku_sync_fetch_and_sub(&ta->unfinished_count, 1);
339 return ta_v;
340 }
341
342
do_testit(void)343 static void do_testit (void) {
344 const int NGROUPS = 2;
345 uint64_t limits[NGROUPS];
346 limits [0] = 2000000;
347 limits [1] = 1000000;
348 uint64_t n_writers[NGROUPS];
349 n_writers[0] = 20;
350 n_writers[1] = 40;
351 struct test_arguments tas[NGROUPS];
352 pthread_t reader_threads[NGROUPS];
353 pthread_t *writer_threads[NGROUPS];
354 for (int i=0; i<NGROUPS; i++) {
355 tas[i].pc = create_partitioned_counter();
356 tas[i].limit = limits[i];
357 tas[i].unfinished_count = n_writers[i];
358 tas[i].total_increment_per_writer = limits[i]/n_writers[i];
359 assert(tas[i].total_increment_per_writer * n_writers[i] == limits[i]);
360 pt_create(&reader_threads[i], reader_test_fun, &tas[i]);
361 increment_partitioned_counter(tas[i].pc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
362 MALLOC_N(n_writers[i], writer_threads[i]);
363 for (uint64_t j=0; j<n_writers[i] ; j++) {
364 pt_create(&writer_threads[i][j], writer_test_fun, &tas[i]);
365 }
366 increment_partitioned_counter(tas[i].pc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
367 }
368 for (int i=0; i<NGROUPS; i++) {
369 pt_join(reader_threads[i], &tas[i]);
370 for (uint64_t j=0; j<n_writers[i] ; j++) {
371 pt_join(writer_threads[i][j], &tas[i]);
372 }
373 toku_free(writer_threads[i]);
374 destroy_partitioned_counter(tas[i].pc);
375 }
376 }
377
378 volatile int spinwait=0;
test2_fun(void * mypc_v)379 static void* test2_fun (void* mypc_v) {
380 PARTITIONED_COUNTER mypc = (PARTITIONED_COUNTER)mypc_v;
381 increment_partitioned_counter(mypc, 3);
382 spinwait=1;
383 while (spinwait==1);
384 // mypc no longer points at a valid data structure.
385 return NULL;
386 }
387
do_testit2(void)388 static void do_testit2 (void)
389 // This test checks to see what happens if a thread is still live when we destruct a counter.
390 // A thread increments the counter, then lets us know through a spin wait, then waits until we destroy the counter.
391 {
392 pthread_t t;
393 TOKU_VALGRIND_HG_DISABLE_CHECKING(&spinwait, sizeof(spinwait)); // this is a racy volatile variable.
394 {
395 PARTITIONED_COUNTER mypc = create_partitioned_counter();
396 increment_partitioned_counter(mypc, 1); // make sure that the long-lived thread also increments the partitioned counter, to test for #5321.
397 pt_create(&t, test2_fun, mypc);
398 while(spinwait==0); // wait until he incremented the counter.
399 increment_partitioned_counter(mypc, -1);
400 assert(read_partitioned_counter(mypc)==3);
401 destroy_partitioned_counter(mypc);
402 } // leave scope, so the counter goes away.
403 spinwait=2; // tell the other guy to finish up.
404 pt_join(t, NULL);
405 }
406
test_main(int argc,const char * argv[])407 int test_main (int argc, const char *argv[]) {
408 parse_args(argc, argv);
409 if (time_cmdarg) {
410 do_timeit();
411 } else {
412 do_testit();
413 do_testit2();
414 }
415 return 0;
416 }
417