1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35
36 ----------------------------------------
37
38 Licensed under the Apache License, Version 2.0 (the "License");
39 you may not use this file except in compliance with the License.
40 You may obtain a copy of the License at
41
42 http://www.apache.org/licenses/LICENSE-2.0
43
44 Unless required by applicable law or agreed to in writing, software
45 distributed under the License is distributed on an "AS IS" BASIS,
46 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47 See the License for the specific language governing permissions and
48 limitations under the License.
49 ======= */
50
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52
53 #include <toku_race_tools.h>
54 #include <sys/types.h>
55 #include <pthread.h>
56
57 #include "memory.h"
58 #include "partitioned_counter.h"
59 #include "doubly_linked_list.h"
60 #include "growable_array.h"
61 #include <portability/toku_atomic.h>
62
63 #ifdef __APPLE__
64 // TODO(leif): The __thread declspec is broken in ways I don't understand
65 // on Darwin. Partitioned counters use them and it would be prohibitive
66 // to tease them apart before a week after 6.5.0, so instead, we're just
67 // not going to use them in the most brutal way possible. This is a
68 // terrible implementation of the API in partitioned_counter.h but it
69 // should be correct enough to release a non-performant version on OSX for
70 // development. Soon, we need to either make portable partitioned
71 // counters, or we need to do this disabling in a portable way.
72
73 struct partitioned_counter {
74 uint64_t v;
75 };
76
create_partitioned_counter(void)77 PARTITIONED_COUNTER create_partitioned_counter(void) {
78 PARTITIONED_COUNTER XCALLOC(counter);
79 return counter;
80 }
81
destroy_partitioned_counter(PARTITIONED_COUNTER counter)82 void destroy_partitioned_counter(PARTITIONED_COUNTER counter) {
83 toku_free(counter);
84 }
85
increment_partitioned_counter(PARTITIONED_COUNTER counter,uint64_t delta)86 void increment_partitioned_counter(PARTITIONED_COUNTER counter, uint64_t delta) {
87 (void) toku_sync_fetch_and_add(&counter->v, delta);
88 }
89
read_partitioned_counter(PARTITIONED_COUNTER counter)90 uint64_t read_partitioned_counter(PARTITIONED_COUNTER counter) {
91 return counter->v;
92 }
93
partitioned_counters_init(void)94 void partitioned_counters_init(void) {}
partitioned_counters_destroy(void)95 void partitioned_counters_destroy(void) {}
96
97 #else // __APPLE__
98
99 //******************************************************************************
100 //
101 // Representation: The representation of a partitioned counter comprises a
102 // sum, called sum_of_dead; an index, called the ckey, which indexes into a
103 // thread-local array to find a thread-local part of the counter; and a
104 // linked list of thread-local parts.
105 //
106 // There is also a linked list, for each thread that has a thread-local part
107 // of any counter, of all the thread-local parts of all the counters.
108 //
109 // There is a pthread_key which gives us a hook to clean up thread-local
110 // state when a thread terminates. For each thread-local part of a counter
111 // that the thread has, we add in the thread-local sum into the sum_of_dead.
112 //
113 // Finally there is a list of all the thread-local arrays so that when we
114 // destroy the partitioned counter before the threads are done, we can find
115 // and destroy the thread_local_arrays before destroying the pthread_key.
116 //
117 // Abstraction function: The sum is represented by the sum of _sum and the
118 // sum's of the thread-local parts of the counter.
119 //
120 // Representation invariant: Every thread-local part is in the linked list of
121 // the thread-local parts of its counter, as well as in the linked list of
122 // the counters of a the thread.
123 //
124 //******************************************************************************
125
126 //******************************************************************************
127 // The mutex for the PARTITIONED_COUNTER
128 // We have a single mutex for all the counters because
129 // (a) the mutex is obtained infrequently, and
130 // (b) it helps us avoid race conditions when destroying the counters.
131 // The alternative that I couldn't make work is to have a mutex per counter.
132 // But the problem is that the counter can be destroyed before threads
133 // terminate, or maybe a thread terminates before the counter is destroyed.
134 // If the counter is destroyed first, then the mutex is no longer available.
135 //******************************************************************************
136
137 using namespace toku;
138
139 static pthread_mutex_t partitioned_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
140
pc_lock(void)141 static void pc_lock (void)
142 // Effect: Lock the mutex.
143 {
144 int r = pthread_mutex_lock(&partitioned_counter_mutex);
145 assert(r==0);
146 }
147
pc_unlock(void)148 static void pc_unlock (void)
149 // Effect: Unlock the mutex.
150 {
151 int r = pthread_mutex_unlock(&partitioned_counter_mutex);
152 assert(r==0);
153 }
154
155 //******************************************************************************
156 // Key creation primitives
157 //******************************************************************************
pk_create(pthread_key_t * key,void (* destructor)(void *))158 static void pk_create (pthread_key_t *key, void (*destructor)(void*)) {
159 int r = pthread_key_create(key, destructor);
160 assert(r==0);
161 }
162
pk_delete(pthread_key_t key)163 static void pk_delete (pthread_key_t key) {
164 int r = pthread_key_delete(key);
165 assert(r==0);
166 }
167
pk_setspecific(pthread_key_t key,const void * value)168 static void pk_setspecific (pthread_key_t key, const void *value) {
169 int r = pthread_setspecific(key, value);
170 assert(r==0);
171 }
172
173 //******************************************************************************
174 // The counter itself.
175 // The thread local part of a counter, comprising the thread-local sum a pointer
176 // to the partitioned_counter, a pointer to the thread_local list head, and two
177 // linked lists. One of the lists is all the thread-local parts that belong to
178 // the same counter, and the other is all the thread-local parts that belogn to
179 // the same thread.
180 //******************************************************************************
181
182 struct local_counter;
183
184 struct partitioned_counter {
185 uint64_t sum_of_dead; // The sum of all thread-local counts from threads that have terminated.
186 uint64_t pc_key; // A unique integer among all counters that have been created but not yet destroyed.
187 DoublyLinkedList<struct local_counter *> ll_counter_head; // A linked list of all the thread-local information for this counter.
188 };
189
190 struct local_counter {
191 uint64_t sum; // The thread-local sum.
192 PARTITIONED_COUNTER owner_pc; // The partitioned counter that this is part of.
193 GrowableArray<struct local_counter *> *thread_local_array; // The thread local array for this thread holds this local_counter at offset owner_pc->pc_key.
194 LinkedListElement<struct local_counter *> ll_in_counter; // Element for the doubly-linked list of thread-local information for this PARTITIONED_COUNTER.
195 };
196
197 // Try to get it it into one cache line by aligning it.
198 static __thread GrowableArray<struct local_counter *> thread_local_array;
199 static __thread bool thread_local_array_inited = false;
200
201 static DoublyLinkedList<GrowableArray<struct local_counter *> *> all_thread_local_arrays;
202 static __thread LinkedListElement<GrowableArray<struct local_counter *> *> thread_local_ll_elt;
203
204 static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me);
destroy_thread_local_part_of_partitioned_counters(void * ignore_me)205 static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me __attribute__((__unused__)))
206 // Effect: This function is called whenever a thread terminates using the
207 // destructor of the thread_destructor_key (defined below). First grab the
208 // lock, then go through all the partitioned counters and removes the part that
209 // is local to this thread. We don't actually need the contents of the
210 // thread_destructor_key except to cause this function to run. The content of
211 // the key is a static string, so don't try to free it.
212 {
213 pc_lock();
214 for (size_t i=0; i<thread_local_array.get_size(); i++) {
215 struct local_counter *lc = thread_local_array.fetch_unchecked(i);
216 if (lc==NULL) continue;
217 PARTITIONED_COUNTER owner = lc->owner_pc;
218 owner->sum_of_dead += lc->sum;
219 owner->ll_counter_head.remove(&lc->ll_in_counter);
220 toku_free(lc);
221 }
222 all_thread_local_arrays.remove(&thread_local_ll_elt);
223 thread_local_array_inited = false;
224 thread_local_array.deinit();
225 pc_unlock();
226 }
227
228 //******************************************************************************
229 // We employ a system-wide pthread_key simply to get a notification when a
230 // thread terminates. The key will simply contain a constant string (it's "dont
231 // care", but it doesn't matter what it is, as long as it's not NULL. We need
232 // a constructor function to set up the pthread_key. We used a constructor
233 // function intead of a C++ constructor because that's what we are used to,
234 // rather than because it's necessarily better. Whenever a thread tries to
235 // increment a partitioned_counter for the first time, it sets the
236 // pthread_setspecific for the thread_destructor_key. It's OK if the key gets
237 // setspecific multiple times, it's always the same value. When a thread (that
238 // has created a thread-local part of any partitioned counter) terminates, the
239 // destroy_thread_local_part_of_partitioned_counters will run. It may run
240 // before or after other pthread_key destructors, but the thread-local
241 // ll_thread_head variable is still present until the thread is completely done
242 // running.
243 //******************************************************************************
244
245 static pthread_key_t thread_destructor_key;
246
247 //******************************************************************************
248 // We don't like using up pthread_keys (macos provides only 128 of them),
249 // so we built our own. Also, looking at the source code for linux libc,
250 // it looks like pthread_keys get slower if there are a lot of them.
251 // So we use only one pthread_key.
252 //******************************************************************************
253
254 GrowableArray<bool> counters_in_use;
255
allocate_counter(void)256 static uint64_t allocate_counter (void)
257 // Effect: Find an unused counter number, and allocate it, returning the counter number.
258 // Grabs the pc_lock.
259 {
260 uint64_t ret;
261 pc_lock();
262 size_t size = counters_in_use.get_size();
263 for (uint64_t i=0; i<size; i++) {
264 if (!counters_in_use.fetch_unchecked(i)) {
265 counters_in_use.store_unchecked(i, true);
266 ret = i;
267 goto unlock;
268 }
269 }
270 counters_in_use.push(true);
271 ret = size;
272 unlock:
273 pc_unlock();
274 return ret;
275 }
276
277
free_counter(uint64_t counternum)278 static void free_counter(uint64_t counternum)
279 // Effect: Free a counter.
280 // Requires: The pc mutex is held before calling.
281 {
282 assert(counternum < counters_in_use.get_size());
283 assert(counters_in_use.fetch_unchecked(counternum));
284 counters_in_use.store_unchecked(counternum, false);
285 }
286
destroy_counters(void)287 static void destroy_counters (void) {
288 counters_in_use.deinit();
289 }
290
291
292 //******************************************************************************
293 // Now for the code that actually creates a counter.
294 //******************************************************************************
295
create_partitioned_counter(void)296 PARTITIONED_COUNTER create_partitioned_counter(void)
297 // Effect: Create a counter, initialized to zero.
298 {
299 PARTITIONED_COUNTER XMALLOC(result);
300 result->sum_of_dead = 0;
301 result->pc_key = allocate_counter();
302 result->ll_counter_head.init();
303 return result;
304 }
305
destroy_partitioned_counter(PARTITIONED_COUNTER pc)306 void destroy_partitioned_counter(PARTITIONED_COUNTER pc)
307 // Effect: Destroy the counter. No operations on this counter are permitted after.
308 // Implementation note: Since we have a global lock, we can destroy all the thread-local
309 // versions as well.
310 {
311 pc_lock();
312 uint64_t pc_key = pc->pc_key;
313 LinkedListElement<struct local_counter *> *first;
314 while (pc->ll_counter_head.pop(&first)) {
315 // We just removed first from the counter list, now we must remove it from the thread-local array.
316 struct local_counter *lc = first->get_container();
317 assert(pc == lc->owner_pc);
318 GrowableArray<struct local_counter *> *tla = lc->thread_local_array;
319 tla->store_unchecked(pc_key, NULL);
320 toku_free(lc);
321 }
322 toku_free(pc);
323 free_counter(pc_key);
324 pc_unlock();
325 }
326
get_thread_local_counter(uint64_t pc_key,GrowableArray<struct local_counter * > * a)327 static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, GrowableArray<struct local_counter *> *a)
328 {
329 if (pc_key >= a->get_size()) {
330 return NULL;
331 } else {
332 return a->fetch_unchecked(pc_key);
333 }
334 }
335
get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc)336 static struct local_counter *get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc)
337 {
338 // Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL
339 // when a counter is destroyed (and in that case there should be no race because no other thread should be
340 // trying to access the same local counter at the same time.
341 uint64_t pc_key = pc->pc_key;
342 struct local_counter *lc = get_thread_local_counter(pc->pc_key, &thread_local_array);
343 if (lc == NULL) {
344 XMALLOC(lc); // Might as well do the malloc without holding the pc lock. But most of the rest of this work needs the lock.
345 pc_lock();
346
347 // Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters.
348 if (!thread_local_array_inited) {
349 pk_setspecific(thread_destructor_key, "dont care");
350 thread_local_array_inited=true;
351 thread_local_array.init();
352 all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array);
353 }
354
355 lc->sum = 0;
356 TOKU_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy.
357 lc->owner_pc = pc;
358 lc->thread_local_array = &thread_local_array;
359
360 // Grow the array if needed, filling in NULLs
361 while (thread_local_array.get_size() <= pc_key) {
362 thread_local_array.push(NULL);
363 }
364 thread_local_array.store_unchecked(pc_key, lc);
365 pc->ll_counter_head.insert(&lc->ll_in_counter, lc);
366 pc_unlock();
367 }
368 return lc;
369 }
370
increment_partitioned_counter(PARTITIONED_COUNTER pc,uint64_t amount)371 void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
372 // Effect: Increment the counter by amount.
373 // Requires: No overflows. This is a 64-bit unsigned counter.
374 {
375 struct local_counter *lc = get_or_alloc_thread_local_counter(pc);
376 lc->sum += amount;
377 }
378
sumit(struct local_counter * lc,uint64_t * sum)379 static int sumit(struct local_counter *lc, uint64_t *sum) {
380 (*sum)+=lc->sum;
381 return 0;
382 }
383
read_partitioned_counter(PARTITIONED_COUNTER pc)384 uint64_t read_partitioned_counter(PARTITIONED_COUNTER pc)
385 // Effect: Return the current value of the counter.
386 // Implementation note: Sum all the thread-local counts along with the sum_of_the_dead.
387 {
388 pc_lock();
389 uint64_t sum = pc->sum_of_dead;
390 int r = pc->ll_counter_head.iterate<uint64_t *>(sumit, &sum);
391 assert(r==0);
392 pc_unlock();
393 return sum;
394 }
395
partitioned_counters_init(void)396 void partitioned_counters_init(void)
397 // Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run.
398 {
399 pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters);
400 all_thread_local_arrays.init();
401 }
402
partitioned_counters_destroy(void)403 void partitioned_counters_destroy(void)
404 // Effect: Destroy any partitioned counters data structures.
405 {
406 pc_lock();
407 LinkedListElement<GrowableArray<struct local_counter *> *> *a_ll;
408 while (all_thread_local_arrays.pop(&a_ll)) {
409 a_ll->get_container()->deinit();
410 }
411
412 pk_delete(thread_destructor_key);
413 destroy_counters();
414 pc_unlock();
415 }
416
417 #endif // __APPLE__
418