1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 
36 ----------------------------------------
37 
38    Licensed under the Apache License, Version 2.0 (the "License");
39    you may not use this file except in compliance with the License.
40    You may obtain a copy of the License at
41 
42        http://www.apache.org/licenses/LICENSE-2.0
43 
44    Unless required by applicable law or agreed to in writing, software
45    distributed under the License is distributed on an "AS IS" BASIS,
46    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
47    See the License for the specific language governing permissions and
48    limitations under the License.
49 ======= */
50 
51 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
52 
53 #include <toku_race_tools.h>
54 #include <sys/types.h>
55 #include <pthread.h>
56 
57 #include "memory.h"
58 #include "partitioned_counter.h"
59 #include "doubly_linked_list.h"
60 #include "growable_array.h"
61 #include <portability/toku_atomic.h>
62 
63 #ifdef __APPLE__
64 // TODO(leif): The __thread declspec is broken in ways I don't understand
65 // on Darwin.  Partitioned counters use them and it would be prohibitive
66 // to tease them apart before a week after 6.5.0, so instead, we're just
67 // not going to use them in the most brutal way possible.  This is a
68 // terrible implementation of the API in partitioned_counter.h but it
69 // should be correct enough to release a non-performant version on OSX for
70 // development.  Soon, we need to either make portable partitioned
71 // counters, or we need to do this disabling in a portable way.
72 
73 struct partitioned_counter {
74     uint64_t v;
75 };
76 
create_partitioned_counter(void)77 PARTITIONED_COUNTER create_partitioned_counter(void) {
78     PARTITIONED_COUNTER XCALLOC(counter);
79     return counter;
80 }
81 
destroy_partitioned_counter(PARTITIONED_COUNTER counter)82 void destroy_partitioned_counter(PARTITIONED_COUNTER counter) {
83     toku_free(counter);
84 }
85 
increment_partitioned_counter(PARTITIONED_COUNTER counter,uint64_t delta)86 void increment_partitioned_counter(PARTITIONED_COUNTER counter, uint64_t delta) {
87     (void) toku_sync_fetch_and_add(&counter->v, delta);
88 }
89 
read_partitioned_counter(PARTITIONED_COUNTER counter)90 uint64_t read_partitioned_counter(PARTITIONED_COUNTER counter) {
91     return counter->v;
92 }
93 
partitioned_counters_init(void)94 void partitioned_counters_init(void) {}
partitioned_counters_destroy(void)95 void partitioned_counters_destroy(void) {}
96 
97 #else // __APPLE__
98 
99 //******************************************************************************
100 //
101 // Representation: The representation of a partitioned counter comprises a
102 //  sum, called sum_of_dead; an index, called the ckey, which indexes into a
103 //  thread-local array to find a thread-local part of the counter; and a
104 //  linked list of thread-local parts.
105 //
106 //  There is also a linked list, for each thread that has a thread-local part
107 //  of any counter, of all the thread-local parts of all the counters.
108 //
109 //  There is a pthread_key which gives us a hook to clean up thread-local
110 //  state when a thread terminates.  For each thread-local part of a counter
111 //  that the thread has, we add in the thread-local sum into the sum_of_dead.
112 //
113 //  Finally there is a list of all the thread-local arrays so that when we
114 //  destroy the partitioned counter before the threads are done, we can find
115 //  and destroy the thread_local_arrays before destroying the pthread_key.
116 //
117 // Abstraction function: The sum is represented by the sum of _sum and the
118 //  sum's of the thread-local parts of the counter.
119 //
120 // Representation invariant: Every thread-local part is in the linked list of
121 //  the thread-local parts of its counter, as well as in the linked list of
122 //  the counters of a the thread.
123 //
124 //******************************************************************************
125 
126 //******************************************************************************
127 // The mutex for the PARTITIONED_COUNTER
128 // We have a single mutex for all the counters because
129 //  (a) the mutex is obtained infrequently, and
130 //  (b) it helps us avoid race conditions when destroying the counters.
131 // The alternative that I couldn't make work is to have a mutex per counter.
132 //   But the problem is that the counter can be destroyed before threads
133 //   terminate, or maybe a thread terminates before the counter is destroyed.
134 //   If the counter is destroyed first, then the mutex is no longer available.
135 //******************************************************************************
136 
137 using namespace toku;
138 
139 static pthread_mutex_t partitioned_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
140 
pc_lock(void)141 static void pc_lock (void)
142 // Effect: Lock the mutex.
143 {
144     int r = pthread_mutex_lock(&partitioned_counter_mutex);
145     assert(r==0);
146 }
147 
pc_unlock(void)148 static void pc_unlock (void)
149 // Effect: Unlock the mutex.
150 {
151     int r = pthread_mutex_unlock(&partitioned_counter_mutex);
152     assert(r==0);
153 }
154 
155 //******************************************************************************
156 // Key creation primitives
157 //******************************************************************************
pk_create(pthread_key_t * key,void (* destructor)(void *))158 static void pk_create (pthread_key_t *key, void (*destructor)(void*)) {
159     int r = pthread_key_create(key, destructor);
160     assert(r==0);
161 }
162 
pk_delete(pthread_key_t key)163 static void pk_delete (pthread_key_t key) {
164     int r = pthread_key_delete(key);
165     assert(r==0);
166 }
167 
pk_setspecific(pthread_key_t key,const void * value)168 static void pk_setspecific (pthread_key_t key, const void *value) {
169     int r = pthread_setspecific(key, value);
170     assert(r==0);
171 }
172 
173 //******************************************************************************
174 // The counter itself.
175 // The thread local part of a counter, comprising the thread-local sum a pointer
176 //  to the partitioned_counter, a pointer to the thread_local list head, and two
177 //  linked lists. One of the lists is all the thread-local parts that belong to
178 //  the same counter, and the other is all the thread-local parts that belogn to
179 //  the same thread.
180 //******************************************************************************
181 
182 struct local_counter;
183 
184 struct partitioned_counter {
185     uint64_t       sum_of_dead;                                     // The sum of all thread-local counts from threads that have terminated.
186     uint64_t       pc_key;                                          // A unique integer among all counters that have been created but not yet destroyed.
187     DoublyLinkedList<struct local_counter *> ll_counter_head; // A linked list of all the thread-local information for this counter.
188 };
189 
190 struct local_counter {
191     uint64_t                                   sum;                // The thread-local sum.
192     PARTITIONED_COUNTER                        owner_pc;           // The partitioned counter that this is part of.
193     GrowableArray<struct local_counter *>     *thread_local_array; // The thread local array for this thread holds this local_counter at offset owner_pc->pc_key.
194     LinkedListElement<struct local_counter *>  ll_in_counter;      // Element for the doubly-linked list of thread-local information for this PARTITIONED_COUNTER.
195 };
196 
197 // Try to get it it into one cache line by aligning it.
198 static __thread GrowableArray<struct local_counter *> thread_local_array;
199 static __thread bool                                  thread_local_array_inited = false;
200 
201 static DoublyLinkedList<GrowableArray<struct local_counter *> *> all_thread_local_arrays;
202 static __thread LinkedListElement<GrowableArray<struct local_counter *> *> thread_local_ll_elt;
203 
204 static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me);
destroy_thread_local_part_of_partitioned_counters(void * ignore_me)205 static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me __attribute__((__unused__)))
206 // Effect: This function is called whenever a thread terminates using the
207 //  destructor of the thread_destructor_key (defined below).  First grab the
208 //  lock, then go through all the partitioned counters and removes the part that
209 //  is local to this thread.  We don't actually need the contents of the
210 //  thread_destructor_key except to cause this function to run.  The content of
211 //  the key is a static string, so don't try to free it.
212 {
213     pc_lock();
214     for (size_t i=0; i<thread_local_array.get_size(); i++) {
215         struct local_counter *lc = thread_local_array.fetch_unchecked(i);
216         if (lc==NULL) continue;
217         PARTITIONED_COUNTER owner = lc->owner_pc;
218         owner->sum_of_dead += lc->sum;
219         owner->ll_counter_head.remove(&lc->ll_in_counter);
220         toku_free(lc);
221     }
222     all_thread_local_arrays.remove(&thread_local_ll_elt);
223     thread_local_array_inited = false;
224     thread_local_array.deinit();
225     pc_unlock();
226 }
227 
228 //******************************************************************************
229 // We employ a system-wide pthread_key simply to get a notification when a
230 //  thread terminates. The key will simply contain a constant string (it's "dont
231 //  care", but it doesn't matter what it is, as long as it's not NULL.  We need
232 //  a constructor function to set up the pthread_key.  We used a constructor
233 //  function intead of a C++ constructor because that's what we are used to,
234 //  rather than because it's necessarily better.  Whenever a thread tries to
235 //  increment a partitioned_counter for the first time, it sets the
236 //  pthread_setspecific for the thread_destructor_key.  It's OK if the key gets
237 //  setspecific multiple times, it's always the same value.  When a thread (that
238 //  has created a thread-local part of any partitioned counter) terminates, the
239 //  destroy_thread_local_part_of_partitioned_counters will run.  It may run
240 //  before or after other pthread_key destructors, but the thread-local
241 //  ll_thread_head variable is still present until the thread is completely done
242 //  running.
243 //******************************************************************************
244 
245 static pthread_key_t thread_destructor_key;
246 
247 //******************************************************************************
248 // We don't like using up pthread_keys (macos provides only 128 of them),
249 // so we built our own.   Also, looking at the source code for linux libc,
250 // it looks like pthread_keys get slower if there are a lot of them.
251 // So we use only one pthread_key.
252 //******************************************************************************
253 
254 GrowableArray<bool> counters_in_use;
255 
allocate_counter(void)256 static uint64_t allocate_counter (void)
257 // Effect: Find an unused counter number, and allocate it, returning the counter number.
258 //  Grabs the pc_lock.
259 {
260     uint64_t ret;
261     pc_lock();
262     size_t size = counters_in_use.get_size();
263     for (uint64_t i=0; i<size; i++) {
264         if (!counters_in_use.fetch_unchecked(i)) {
265             counters_in_use.store_unchecked(i, true);
266             ret = i;
267             goto unlock;
268         }
269     }
270     counters_in_use.push(true);
271     ret = size;
272 unlock:
273     pc_unlock();
274     return ret;
275 }
276 
277 
free_counter(uint64_t counternum)278 static void free_counter(uint64_t counternum)
279 // Effect: Free a counter.
280 // Requires: The pc mutex is held before calling.
281 {
282     assert(counternum < counters_in_use.get_size());
283     assert(counters_in_use.fetch_unchecked(counternum));
284     counters_in_use.store_unchecked(counternum, false);
285 }
286 
destroy_counters(void)287 static void destroy_counters (void) {
288     counters_in_use.deinit();
289 }
290 
291 
292 //******************************************************************************
293 // Now for the code that actually creates a counter.
294 //******************************************************************************
295 
create_partitioned_counter(void)296 PARTITIONED_COUNTER create_partitioned_counter(void)
297 // Effect: Create a counter, initialized to zero.
298 {
299     PARTITIONED_COUNTER XMALLOC(result);
300     result->sum_of_dead = 0;
301     result->pc_key = allocate_counter();
302     result->ll_counter_head.init();
303     return result;
304 }
305 
destroy_partitioned_counter(PARTITIONED_COUNTER pc)306 void destroy_partitioned_counter(PARTITIONED_COUNTER pc)
307 // Effect: Destroy the counter.  No operations on this counter are permitted after.
308 // Implementation note: Since we have a global lock, we can destroy all the thread-local
309 //  versions as well.
310 {
311     pc_lock();
312     uint64_t pc_key = pc->pc_key;
313     LinkedListElement<struct local_counter *> *first;
314     while (pc->ll_counter_head.pop(&first)) {
315         // We just removed first from the counter list, now we must remove it from the thread-local array.
316         struct local_counter *lc = first->get_container();
317         assert(pc == lc->owner_pc);
318         GrowableArray<struct local_counter *> *tla = lc->thread_local_array;
319         tla->store_unchecked(pc_key, NULL);
320         toku_free(lc);
321     }
322     toku_free(pc);
323     free_counter(pc_key);
324     pc_unlock();
325 }
326 
get_thread_local_counter(uint64_t pc_key,GrowableArray<struct local_counter * > * a)327 static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, GrowableArray<struct local_counter *> *a)
328 {
329     if (pc_key >= a->get_size()) {
330         return NULL;
331     } else {
332         return a->fetch_unchecked(pc_key);
333     }
334 }
335 
get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc)336 static struct local_counter *get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc)
337 {
338     // Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL
339     // when a counter is destroyed (and in that case there should be no race because no other thread should be
340     // trying to access the same local counter at the same time.
341     uint64_t pc_key = pc->pc_key;
342     struct local_counter *lc = get_thread_local_counter(pc->pc_key, &thread_local_array);
343     if (lc == NULL) {
344         XMALLOC(lc);    // Might as well do the malloc without holding the pc lock.  But most of the rest of this work needs the lock.
345         pc_lock();
346 
347         // Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters.
348         if (!thread_local_array_inited) {
349             pk_setspecific(thread_destructor_key, "dont care");
350             thread_local_array_inited=true;
351             thread_local_array.init();
352             all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array);
353         }
354 
355         lc->sum         = 0;
356         TOKU_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy.
357         lc->owner_pc    = pc;
358         lc->thread_local_array = &thread_local_array;
359 
360         // Grow the array if needed, filling in NULLs
361         while (thread_local_array.get_size() <= pc_key) {
362             thread_local_array.push(NULL);
363         }
364         thread_local_array.store_unchecked(pc_key, lc);
365         pc->ll_counter_head.insert(&lc->ll_in_counter, lc);
366         pc_unlock();
367     }
368     return lc;
369 }
370 
increment_partitioned_counter(PARTITIONED_COUNTER pc,uint64_t amount)371 void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
372 // Effect: Increment the counter by amount.
373 // Requires: No overflows.  This is a 64-bit unsigned counter.
374 {
375     struct local_counter *lc = get_or_alloc_thread_local_counter(pc);
376     lc->sum += amount;
377 }
378 
sumit(struct local_counter * lc,uint64_t * sum)379 static int sumit(struct local_counter *lc, uint64_t *sum) {
380     (*sum)+=lc->sum;
381     return 0;
382 }
383 
read_partitioned_counter(PARTITIONED_COUNTER pc)384 uint64_t read_partitioned_counter(PARTITIONED_COUNTER pc)
385 // Effect: Return the current value of the counter.
386 // Implementation note: Sum all the thread-local counts along with the sum_of_the_dead.
387 {
388     pc_lock();
389     uint64_t sum = pc->sum_of_dead;
390     int r = pc->ll_counter_head.iterate<uint64_t *>(sumit, &sum);
391     assert(r==0);
392     pc_unlock();
393     return sum;
394 }
395 
partitioned_counters_init(void)396 void partitioned_counters_init(void)
397 // Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run.
398 {
399     pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters);
400     all_thread_local_arrays.init();
401 }
402 
partitioned_counters_destroy(void)403 void partitioned_counters_destroy(void)
404 // Effect: Destroy any partitioned counters data structures.
405 {
406     pc_lock();
407     LinkedListElement<GrowableArray<struct local_counter *> *> *a_ll;
408     while (all_thread_local_arrays.pop(&a_ll)) {
409         a_ll->get_container()->deinit();
410     }
411 
412     pk_delete(thread_destructor_key);
413     destroy_counters();
414     pc_unlock();
415 }
416 
417 #endif // __APPLE__
418