1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include <string.h>
41 #include <time.h>
42 #include <stdarg.h>
43 
44 #include <portability/memory.h>
45 #include <portability/toku_race_tools.h>
46 #include <portability/toku_atomic.h>
47 #include <portability/toku_pthread.h>
48 #include <portability/toku_portability.h>
49 #include <portability/toku_stdlib.h>
50 #include <portability/toku_time.h>
51 
52 #include "ft/cachetable/cachetable.h"
53 #include "ft/cachetable/cachetable-internal.h"
54 #include "ft/cachetable/checkpoint.h"
55 #include "ft/logger/log-internal.h"
56 #include "util/rwlock.h"
57 #include "util/scoped_malloc.h"
58 #include "util/status.h"
59 #include "util/context.h"
60 
61 toku_instr_key *cachetable_m_mutex_key;
62 toku_instr_key *cachetable_ev_thread_lock_mutex_key;
63 
64 toku_instr_key *cachetable_m_list_lock_key;
65 toku_instr_key *cachetable_m_pending_lock_expensive_key;
66 toku_instr_key *cachetable_m_pending_lock_cheap_key;
67 toku_instr_key *cachetable_m_lock_key;
68 
69 toku_instr_key *cachetable_value_key;
70 toku_instr_key *cachetable_disk_nb_rwlock_key;
71 
72 toku_instr_key *cachetable_p_refcount_wait_key;
73 toku_instr_key *cachetable_m_flow_control_cond_key;
74 toku_instr_key *cachetable_m_ev_thread_cond_key;
75 
76 toku_instr_key *cachetable_disk_nb_mutex_key;
77 toku_instr_key *log_internal_lock_mutex_key;
78 toku_instr_key *eviction_thread_key;
79 
80 ///////////////////////////////////////////////////////////////////////////////////
81 // Engine status
82 //
83 // Status is intended for display to humans to help understand system behavior.
84 // It does not need to be perfectly thread-safe.
85 
86 // These should be in the cachetable object, but we make them file-wide so that gdb can get them easily.
87 // They were left here after engine status cleanup (#2949, rather than moved into the status struct)
88 // so they are still easily available to the debugger and to save lots of typing.
89 static uint64_t cachetable_miss;
90 static uint64_t cachetable_misstime;     // time spent waiting for disk read
91 static uint64_t cachetable_prefetches;    // how many times has a block been prefetched into the cachetable?
92 static uint64_t cachetable_evictions;
93 static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed
94 
95 
96 // Note, toku_cachetable_get_status() is below, after declaration of cachetable.
97 
98 static void * const zero_value = nullptr;
99 static PAIR_ATTR const zero_attr = {
100     .size = 0,
101     .nonleaf_size = 0,
102     .leaf_size = 0,
103     .rollback_size = 0,
104     .cache_pressure_size = 0,
105     .is_valid = true
106 };
107 
108 
ctpair_destroy(PAIR p)109 static inline void ctpair_destroy(PAIR p) {
110     p->value_rwlock.deinit();
111     paranoid_invariant(p->refcount == 0);
112     nb_mutex_destroy(&p->disk_nb_mutex);
113     toku_cond_destroy(&p->refcount_wait);
114     toku_free(p);
115 }
116 
pair_lock(PAIR p)117 static inline void pair_lock(PAIR p) {
118     toku_mutex_lock(p->mutex);
119 }
120 
pair_unlock(PAIR p)121 static inline void pair_unlock(PAIR p) {
122     toku_mutex_unlock(p->mutex);
123 }
124 
125 // adds a reference to the PAIR
126 // on input and output, PAIR mutex is held
pair_add_ref_unlocked(PAIR p)127 static void pair_add_ref_unlocked(PAIR p) {
128     p->refcount++;
129 }
130 
131 // releases a reference to the PAIR
132 // on input and output, PAIR mutex is held
pair_release_ref_unlocked(PAIR p)133 static void pair_release_ref_unlocked(PAIR p) {
134     paranoid_invariant(p->refcount > 0);
135     p->refcount--;
136     if (p->refcount == 0 && p->num_waiting_on_refs > 0) {
137         toku_cond_broadcast(&p->refcount_wait);
138     }
139 }
140 
pair_wait_for_ref_release_unlocked(PAIR p)141 static void pair_wait_for_ref_release_unlocked(PAIR p) {
142     p->num_waiting_on_refs++;
143     while (p->refcount > 0) {
144         toku_cond_wait(&p->refcount_wait, p->mutex);
145     }
146     p->num_waiting_on_refs--;
147 }
148 
toku_ctpair_is_write_locked(PAIR pair)149 bool toku_ctpair_is_write_locked(PAIR pair) {
150     return pair->value_rwlock.writers() == 1;
151 }
152 
153 void
toku_cachetable_get_status(CACHETABLE ct,CACHETABLE_STATUS statp)154 toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
155     ct_status.init();
156     CT_STATUS_VAL(CT_MISS)                   = cachetable_miss;
157     CT_STATUS_VAL(CT_MISSTIME)               = cachetable_misstime;
158     CT_STATUS_VAL(CT_PREFETCHES)             = cachetable_prefetches;
159     CT_STATUS_VAL(CT_EVICTIONS)              = cachetable_evictions;
160     CT_STATUS_VAL(CT_CLEANER_EXECUTIONS)     = cleaner_executions;
161     CT_STATUS_VAL(CT_CLEANER_PERIOD)         = toku_get_cleaner_period_unlocked(ct);
162     CT_STATUS_VAL(CT_CLEANER_ITERATIONS)     = toku_get_cleaner_iterations_unlocked(ct);
163     toku_kibbutz_get_status(ct->client_kibbutz,
164                             &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS),
165                             &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS_ACTIVE),
166                             &CT_STATUS_VAL(CT_POOL_CLIENT_QUEUE_SIZE),
167                             &CT_STATUS_VAL(CT_POOL_CLIENT_MAX_QUEUE_SIZE),
168                             &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED),
169                             &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME));
170     toku_kibbutz_get_status(ct->ct_kibbutz,
171                             &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS),
172                             &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE),
173                             &CT_STATUS_VAL(CT_POOL_CACHETABLE_QUEUE_SIZE),
174                             &CT_STATUS_VAL(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE),
175                             &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED),
176                             &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME));
177     toku_kibbutz_get_status(ct->checkpointing_kibbutz,
178                             &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS),
179                             &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE),
180                             &CT_STATUS_VAL(CT_POOL_CHECKPOINT_QUEUE_SIZE),
181                             &CT_STATUS_VAL(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE),
182                             &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED),
183                             &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME));
184     ct->ev.fill_engine_status();
185     *statp = ct_status;
186 }
187 
188 // FIXME global with no toku prefix
remove_background_job_from_cf(CACHEFILE cf)189 void remove_background_job_from_cf(CACHEFILE cf)
190 {
191     bjm_remove_background_job(cf->bjm);
192 }
193 
194 // FIXME global with no toku prefix
cachefile_kibbutz_enq(CACHEFILE cf,void (* f)(void *),void * extra)195 void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra)
196 // The function f must call remove_background_job_from_cf when it completes
197 {
198     int r = bjm_add_background_job(cf->bjm);
199     // if client is adding a background job, then it must be done
200     // at a time when the manager is accepting background jobs, otherwise
201     // the client is screwing up
202     assert_zero(r);
203     toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra);
204 }
205 
206 static int
checkpoint_thread(void * checkpointer_v)207 checkpoint_thread (void *checkpointer_v)
208 // Effect:  If checkpoint_period>0 thn periodically run a checkpoint.
209 //  If someone changes the checkpoint_period (calling toku_set_checkpoint_period), then the checkpoint will run sooner or later.
210 //  If someone sets the checkpoint_shutdown boolean , then this thread exits.
211 // This thread notices those changes by waiting on a condition variable.
212 {
213     CHECKPOINTER CAST_FROM_VOIDP(cp, checkpointer_v);
214     int r = toku_checkpoint(cp, cp->get_logger(), NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT);
215     invariant_zero(r);
216     return r;
217 }
218 
toku_set_checkpoint_period(CACHETABLE ct,uint32_t new_period)219 void toku_set_checkpoint_period (CACHETABLE ct, uint32_t new_period) {
220     ct->cp.set_checkpoint_period(new_period);
221 }
222 
toku_get_checkpoint_period_unlocked(CACHETABLE ct)223 uint32_t toku_get_checkpoint_period_unlocked (CACHETABLE ct) {
224     return ct->cp.get_checkpoint_period();
225 }
226 
toku_set_cleaner_period(CACHETABLE ct,uint32_t new_period)227 void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period) {
228     if(force_recovery) {
229         return;
230     }
231     ct->cl.set_period(new_period);
232 }
233 
toku_get_cleaner_period_unlocked(CACHETABLE ct)234 uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct) {
235     return ct->cl.get_period_unlocked();
236 }
237 
toku_set_cleaner_iterations(CACHETABLE ct,uint32_t new_iterations)238 void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations) {
239     ct->cl.set_iterations(new_iterations);
240 }
241 
toku_get_cleaner_iterations(CACHETABLE ct)242 uint32_t toku_get_cleaner_iterations (CACHETABLE ct) {
243     return ct->cl.get_iterations();
244 }
245 
toku_get_cleaner_iterations_unlocked(CACHETABLE ct)246 uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) {
247     return ct->cl.get_iterations();
248 }
249 
toku_set_enable_partial_eviction(CACHETABLE ct,bool enabled)250 void toku_set_enable_partial_eviction (CACHETABLE ct, bool enabled) {
251     ct->ev.set_enable_partial_eviction(enabled);
252 }
253 
toku_get_enable_partial_eviction(CACHETABLE ct)254 bool toku_get_enable_partial_eviction (CACHETABLE ct) {
255     return ct->ev.get_enable_partial_eviction();
256 }
257 
258 // reserve 25% as "unreservable".  The loader cannot have it.
259 #define unreservable_memory(size) ((size)/4)
260 
toku_cachetable_create_ex(CACHETABLE * ct_result,long size_limit,unsigned long client_pool_threads,unsigned long cachetable_pool_threads,unsigned long checkpoint_pool_threads,LSN UU (initial_lsn),TOKULOGGER logger)261 int toku_cachetable_create_ex(CACHETABLE *ct_result, long size_limit,
262                            unsigned long client_pool_threads,
263                            unsigned long cachetable_pool_threads,
264                            unsigned long checkpoint_pool_threads,
265                            LSN UU(initial_lsn), TOKULOGGER logger) {
266     int result = 0;
267     int r;
268 
269     if (size_limit == 0) {
270         size_limit = 128*1024*1024;
271     }
272 
273     CACHETABLE XCALLOC(ct);
274     ct->list.init();
275     ct->cf_list.init();
276 
277     int num_processors = toku_os_get_number_active_processors();
278     int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1;
279     r = toku_kibbutz_create(client_pool_threads ? client_pool_threads : num_processors,
280                             &ct->client_kibbutz);
281     if (r != 0) {
282         result = r;
283         goto cleanup;
284     }
285     r = toku_kibbutz_create(cachetable_pool_threads ? cachetable_pool_threads : 2*num_processors,
286                             &ct->ct_kibbutz);
287     if (r != 0) {
288         result = r;
289         goto cleanup;
290     }
291     r = toku_kibbutz_create(checkpoint_pool_threads ? checkpoint_pool_threads : checkpointing_nworkers,
292                             &ct->checkpointing_kibbutz);
293     if (r != 0) {
294         result = r;
295         goto cleanup;
296     }
297     // must be done after creating ct_kibbutz
298     r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD);
299     if (r != 0) {
300         result = r;
301         goto cleanup;
302     }
303     r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list);
304     if (r != 0) {
305         result = r;
306         goto cleanup;
307     }
308     r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration
309     if (r != 0) {
310         result = r;
311         goto cleanup;
312     }
313     ct->env_dir = toku_xstrdup(".");
314 cleanup:
315     if (result == 0) {
316         *ct_result = ct;
317     } else {
318         toku_cachetable_close(&ct);
319     }
320     return result;
321 }
322 
323 // Returns a pointer to the checkpoint contained within
324 // the given cachetable.
toku_cachetable_get_checkpointer(CACHETABLE ct)325 CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) {
326     return &ct->cp;
327 }
328 
toku_cachetable_reserve_memory(CACHETABLE ct,double fraction,uint64_t upper_bound)329 uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction, uint64_t upper_bound) {
330     uint64_t reserved_memory = ct->ev.reserve_memory(fraction, upper_bound);
331     return reserved_memory;
332 }
333 
toku_cachetable_release_reserved_memory(CACHETABLE ct,uint64_t reserved_memory)334 void toku_cachetable_release_reserved_memory(CACHETABLE ct, uint64_t reserved_memory) {
335     ct->ev.release_reserved_memory(reserved_memory);
336 }
337 
338 void
toku_cachetable_set_env_dir(CACHETABLE ct,const char * env_dir)339 toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir) {
340     toku_free(ct->env_dir);
341     ct->env_dir = toku_xstrdup(env_dir);
342 }
343 
344 // What cachefile goes with particular iname (iname relative to env)?
345 // The transaction that is adding the reference might not have a reference
346 // to the ft, therefore the cachefile might be closing.
347 // If closing, we want to return that it is not there, but must wait till after
348 // the close has finished.
349 // Once the close has finished, there must not be a cachefile with that name
350 // in the cachetable.
toku_cachefile_of_iname_in_env(CACHETABLE ct,const char * iname_in_env,CACHEFILE * cf)351 int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf) {
352     return ct->cf_list.cachefile_of_iname_in_env(iname_in_env, cf);
353 }
354 
355 // What cachefile goes with particular fd?
356 // This function can only be called if the ft is still open, so file must
357 // still be open
toku_cachefile_of_filenum(CACHETABLE ct,FILENUM filenum,CACHEFILE * cf)358 int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
359     return ct->cf_list.cachefile_of_filenum(filenum, cf);
360 }
361 
362 // TEST-ONLY function
363 // If something goes wrong, close the fd.  After this, the caller shouldn't close the fd, but instead should close the cachefile.
toku_cachetable_openfd(CACHEFILE * cfptr,CACHETABLE ct,int fd,const char * fname_in_env)364 int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_in_env) {
365     FILENUM filenum = toku_cachetable_reserve_filenum(ct);
366     bool was_open;
367     return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_in_env, filenum, &was_open);
368 }
369 
370 // Get a unique filenum from the cachetable
371 FILENUM
toku_cachetable_reserve_filenum(CACHETABLE ct)372 toku_cachetable_reserve_filenum(CACHETABLE ct) {
373     return ct->cf_list.reserve_filenum();
374 }
375 
create_new_cachefile(CACHETABLE ct,FILENUM filenum,uint32_t hash_id,int fd,const char * fname_in_env,struct fileid fileid,CACHEFILE * cfptr)376 static void create_new_cachefile(
377     CACHETABLE ct,
378     FILENUM filenum,
379     uint32_t hash_id,
380     int fd,
381     const char *fname_in_env,
382     struct fileid fileid,
383     CACHEFILE *cfptr
384     ) {
385     // File is not open.  Make a new cachefile.
386     CACHEFILE newcf = NULL;
387     XCALLOC(newcf);
388     newcf->cachetable = ct;
389     newcf->hash_id = hash_id;
390     newcf->fileid = fileid;
391 
392     newcf->filenum = filenum;
393     newcf->fd = fd;
394     newcf->fname_in_env = toku_xstrdup(fname_in_env);
395     bjm_init(&newcf->bjm);
396     *cfptr = newcf;
397 }
398 
toku_cachetable_openfd_with_filenum(CACHEFILE * cfptr,CACHETABLE ct,int fd,const char * fname_in_env,FILENUM filenum,bool * was_open)399 int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd,
400                                          const char *fname_in_env,
401                                          FILENUM filenum, bool* was_open) {
402     int r;
403     CACHEFILE newcf;
404     struct fileid fileid;
405 
406     assert(filenum.fileid != FILENUM_NONE.fileid);
407     r = toku_os_get_unique_file_id(fd, &fileid);
408     if (r != 0) {
409         r = get_error_errno();
410         close(fd);
411         return r;
412     }
413     ct->cf_list.write_lock();
414     CACHEFILE existing_cf = ct->cf_list.find_cachefile_unlocked(&fileid);
415     if (existing_cf) {
416         *was_open = true;
417         // Reuse an existing cachefile and close the caller's fd, whose
418         // responsibility has been passed to us.
419         r = close(fd);
420         assert(r == 0);
421         *cfptr = existing_cf;
422         r = 0;
423         goto exit;
424     }
425     *was_open = false;
426     ct->cf_list.verify_unused_filenum(filenum);
427     // now let's try to find it in the stale cachefiles
428     existing_cf = ct->cf_list.find_stale_cachefile_unlocked(&fileid);
429     // found the stale file,
430     if (existing_cf) {
431         // fix up the fields in the cachefile
432         existing_cf->filenum = filenum;
433         existing_cf->fd = fd;
434         existing_cf->fname_in_env = toku_xstrdup(fname_in_env);
435         bjm_init(&existing_cf->bjm);
436 
437         // now we need to move all the PAIRs in it back into the cachetable
438         ct->list.write_list_lock();
439         for (PAIR curr_pair = existing_cf->cf_head; curr_pair; curr_pair = curr_pair->cf_next) {
440             pair_lock(curr_pair);
441             ct->list.add_to_cachetable_only(curr_pair);
442             pair_unlock(curr_pair);
443         }
444         ct->list.write_list_unlock();
445         // move the cachefile back to the list of active cachefiles
446         ct->cf_list.remove_stale_cf_unlocked(existing_cf);
447         ct->cf_list.add_cf_unlocked(existing_cf);
448         *cfptr = existing_cf;
449         r = 0;
450         goto exit;
451     }
452 
453     create_new_cachefile(
454         ct,
455         filenum,
456         ct->cf_list.get_new_hash_id_unlocked(),
457         fd,
458         fname_in_env,
459         fileid,
460         &newcf
461         );
462 
463     ct->cf_list.add_cf_unlocked(newcf);
464 
465     *cfptr = newcf;
466     r = 0;
467  exit:
468     ct->cf_list.write_unlock();
469     return r;
470 }
471 
472 static void cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, bool evict_completely);
473 
474 //TEST_ONLY_FUNCTION
toku_cachetable_openf(CACHEFILE * cfptr,CACHETABLE ct,const char * fname_in_env,int flags,mode_t mode)475 int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in_env, int flags, mode_t mode) {
476     char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env);
477     int fd = open(fname_in_cwd, flags+O_BINARY, mode);
478     int r;
479     if (fd < 0) {
480         r = get_error_errno();
481     } else {
482         r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env);
483     }
484     toku_free(fname_in_cwd);
485     return r;
486 }
487 
488 char *
toku_cachefile_fname_in_env(CACHEFILE cf)489 toku_cachefile_fname_in_env (CACHEFILE cf) {
490     if (cf) {
491         return cf->fname_in_env;
492     }
493     return nullptr;
494 }
495 
toku_cachefile_set_fname_in_env(CACHEFILE cf,char * new_fname_in_env)496 void toku_cachefile_set_fname_in_env(CACHEFILE cf, char *new_fname_in_env) {
497     cf->fname_in_env = new_fname_in_env;
498 }
499 
500 int
toku_cachefile_get_fd(CACHEFILE cf)501 toku_cachefile_get_fd (CACHEFILE cf) {
502     return cf->fd;
503 }
504 
cachefile_destroy(CACHEFILE cf)505 static void cachefile_destroy(CACHEFILE cf) {
506     if (cf->free_userdata) {
507         cf->free_userdata(cf, cf->userdata);
508     }
509     toku_free(cf);
510 }
511 
toku_cachefile_close(CACHEFILE * cfp,bool oplsn_valid,LSN oplsn)512 void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
513     CACHEFILE cf = *cfp;
514     CACHETABLE ct = cf->cachetable;
515 
516     bjm_wait_for_jobs_to_finish(cf->bjm);
517 
518     // Clients should never attempt to close a cachefile that is being
519     // checkpointed. We notify clients this is happening in the
520     // note_pin_by_checkpoint callback.
521     assert(!cf->for_checkpoint);
522 
523     // Flush the cachefile and remove all of its pairs from the cachetable,
524     // but keep the PAIRs linked in the cachefile. We will store the cachefile
525     // away in case it gets opened immedietely
526     //
527     // if we are unlinking on close, then we want to evict completely,
528     // otherwise, we will keep the PAIRs and cachefile around in case
529     // a subsequent open comes soon
530     cachetable_flush_cachefile(ct, cf, cf->unlink_on_close);
531 
532     // Call the close userdata callback to notify the client this cachefile
533     // and its underlying file are going to be closed
534     if (cf->close_userdata) {
535         cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn);
536     }
537     // fsync and close the fd.
538     toku_file_fsync_without_accounting(cf->fd);
539     int r = close(cf->fd);
540     assert(r == 0);
541     cf->fd = -1;
542 
543     // destroy the parts of the cachefile
544     // that do not persist across opens/closes
545     bjm_destroy(cf->bjm);
546     cf->bjm = NULL;
547 
548     // remove the cf from the list of active cachefiles
549     ct->cf_list.remove_cf(cf);
550     cf->filenum = FILENUM_NONE;
551 
552     // Unlink the file if the bit was set
553     if (cf->unlink_on_close) {
554         char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(cf->cachetable, cf->fname_in_env);
555         r = unlink(fname_in_cwd);
556         assert_zero(r);
557         toku_free(fname_in_cwd);
558     }
559     toku_free(cf->fname_in_env);
560     cf->fname_in_env = NULL;
561 
562     // we destroy the cf if the unlink bit was set or if no PAIRs exist
563     // if no PAIRs exist, there is no sense in keeping the cachefile around
564     bool destroy_cf = cf->unlink_on_close || (cf->cf_head == NULL);
565     if (destroy_cf) {
566         cachefile_destroy(cf);
567     }
568     else {
569         ct->cf_list.add_stale_cf(cf);
570     }
571 }
572 
573 // This hash function comes from Jenkins:  http://burtleburtle.net/bob/c/lookup3.c
574 // The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
575 // Instead we can use a bitmask on a table of size power of two.
576 // This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan
rot(uint32_t x,uint32_t k)577 static inline uint32_t rot(uint32_t x, uint32_t k) {
578     return (x<<k) | (x>>(32-k));
579 }
final(uint32_t a,uint32_t b,uint32_t c)580 static inline uint32_t final (uint32_t a, uint32_t b, uint32_t c) {
581     c ^= b; c -= rot(b,14);
582     a ^= c; a -= rot(c,11);
583     b ^= a; b -= rot(a,25);
584     c ^= b; c -= rot(b,16);
585     a ^= c; a -= rot(c,4);
586     b ^= a; b -= rot(a,14);
587     c ^= b; c -= rot(b,24);
588     return c;
589 }
590 
toku_cachetable_hash(CACHEFILE cachefile,BLOCKNUM key)591 uint32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key)
592 // Effect: Return a 32-bit hash key.  The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
593 {
594     return final(cachefile->hash_id, (uint32_t)(key.b>>32), (uint32_t)key.b);
595 }
596 
597 #define CLOCK_SATURATION 15
598 #define CLOCK_INITIAL_COUNT 3
599 
600 // Requires pair's mutex to be held
pair_touch(PAIR p)601 static void pair_touch (PAIR p) {
602     p->count = (p->count < CLOCK_SATURATION) ? p->count+1 : CLOCK_SATURATION;
603 }
604 
605 // Remove a pair from the cachetable, requires write list lock to be held and p->mutex to be held
606 // Effects: the pair is removed from the LRU list and from the cachetable's hash table.
607 // The size of the objects in the cachetable is adjusted by the size of the pair being
608 // removed.
cachetable_remove_pair(pair_list * list,evictor * ev,PAIR p)609 static void cachetable_remove_pair (pair_list* list, evictor* ev, PAIR p) {
610     list->evict_completely(p);
611     ev->remove_pair_attr(p->attr);
612 }
613 
cachetable_free_pair(PAIR p)614 static void cachetable_free_pair(PAIR p) {
615     CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
616     CACHEKEY key = p->key;
617     void *value = p->value_data;
618     void* disk_data = p->disk_data;
619     void *write_extraargs = p->write_extraargs;
620     PAIR_ATTR old_attr = p->attr;
621 
622     cachetable_evictions++;
623     PAIR_ATTR new_attr = p->attr;
624     // Note that flush_callback is called with write_me false, so the only purpose of this
625     // call is to tell the ft layer to evict the node (keep_me is false).
626     // Also, because we have already removed the PAIR from the cachetable in
627     // cachetable_remove_pair, we cannot pass in p->cachefile and p->cachefile->fd
628     // for the first two parameters, as these may be invalid (#5171), so, we
629     // pass in NULL and -1, dummy values
630     flush_callback(NULL, -1, key, value, &disk_data, write_extraargs, old_attr, &new_attr, false, false, true, false);
631 
632     ctpair_destroy(p);
633 }
634 
635 // assumes value_rwlock and disk_nb_mutex held on entry
636 // responsibility of this function is to only write a locked PAIR to disk
637 // and NOTHING else. We do not manipulate the state of the PAIR
638 // of the cachetable here (with the exception of ct->size_current for clones)
639 //
640 // No pair_list lock should be held, and the PAIR mutex should not be held
641 //
cachetable_only_write_locked_data(evictor * ev,PAIR p,bool for_checkpoint,PAIR_ATTR * new_attr,bool is_clone)642 static void cachetable_only_write_locked_data(
643     evictor* ev,
644     PAIR p,
645     bool for_checkpoint,
646     PAIR_ATTR* new_attr,
647     bool is_clone
648     )
649 {
650     CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
651     CACHEFILE cachefile = p->cachefile;
652     CACHEKEY key = p->key;
653     void *value = is_clone ? p->cloned_value_data : p->value_data;
654     void *disk_data = p->disk_data;
655     void *write_extraargs = p->write_extraargs;
656     PAIR_ATTR old_attr;
657     // we do this for drd. If we are a cloned pair and only
658     // have the disk_nb_mutex, it is a race to access p->attr.
659     // Luckily, old_attr here is only used for some test applications,
660     // so inaccurate non-size fields are ok.
661     if (is_clone) {
662         old_attr = make_pair_attr(p->cloned_value_size);
663     }
664     else {
665         old_attr = p->attr;
666     }
667     bool dowrite = true;
668 
669     // write callback
670     flush_callback(
671         cachefile,
672         cachefile->fd,
673         key,
674         value,
675         &disk_data,
676         write_extraargs,
677         old_attr,
678         new_attr,
679         dowrite,
680         is_clone ? false : true, // keep_me (only keep if this is not cloned pointer)
681         for_checkpoint,
682         is_clone //is_clone
683         );
684     p->disk_data = disk_data;
685     if (is_clone) {
686         p->cloned_value_data = NULL;
687         ev->remove_cloned_data_size(p->cloned_value_size);
688         p->cloned_value_size = 0;
689     }
690 }
691 
692 
693 //
694 // This function writes a PAIR's value out to disk. Currently, it is called
695 // by get_and_pin functions that write a PAIR out for checkpoint, by
696 // evictor threads that evict dirty PAIRS, and by the checkpoint thread
697 // that needs to write out a dirty node for checkpoint.
698 //
699 // Requires on entry for p->mutex to NOT be held, otherwise
700 // calling cachetable_only_write_locked_data will be very expensive
701 //
cachetable_write_locked_pair(evictor * ev,PAIR p,bool for_checkpoint)702 static void cachetable_write_locked_pair(
703     evictor* ev,
704     PAIR p,
705     bool for_checkpoint
706     )
707 {
708     PAIR_ATTR old_attr = p->attr;
709     PAIR_ATTR new_attr = p->attr;
710     // grabbing the disk_nb_mutex here ensures that
711     // after this point, no one is writing out a cloned value
712     // if we grab the disk_nb_mutex inside the if clause,
713     // then we may try to evict a PAIR that is in the process
714     // of having its clone be written out
715     pair_lock(p);
716     nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
717     pair_unlock(p);
718     // make sure that assumption about cloned_value_data is true
719     // if we have grabbed the disk_nb_mutex, then that means that
720     // there should be no cloned value data
721     assert(p->cloned_value_data == NULL);
722     if (p->dirty) {
723         cachetable_only_write_locked_data(ev, p, for_checkpoint, &new_attr, false);
724         //
725         // now let's update variables
726         //
727         if (new_attr.is_valid) {
728             p->attr = new_attr;
729             ev->change_pair_attr(old_attr, new_attr);
730         }
731     }
732     // the pair is no longer dirty once written
733     p->dirty = CACHETABLE_CLEAN;
734     pair_lock(p);
735     nb_mutex_unlock(&p->disk_nb_mutex);
736     pair_unlock(p);
737 }
738 
739 // Worker thread function to writes and evicts  a pair from memory to its cachefile
cachetable_evicter(void * extra)740 static void cachetable_evicter(void* extra) {
741     PAIR p = (PAIR)extra;
742     pair_list* pl = p->list;
743     CACHEFILE cf = p->cachefile;
744     pl->read_pending_exp_lock();
745     bool for_checkpoint = p->checkpoint_pending;
746     p->checkpoint_pending = false;
747     // per the contract of evictor::evict_pair,
748     // the pair's mutex, p->mutex, must be held on entry
749     pair_lock(p);
750     p->ev->evict_pair(p, for_checkpoint);
751     pl->read_pending_exp_unlock();
752     bjm_remove_background_job(cf->bjm);
753 }
754 
cachetable_partial_eviction(void * extra)755 static void cachetable_partial_eviction(void* extra) {
756     PAIR p = (PAIR)extra;
757     CACHEFILE cf = p->cachefile;
758     p->ev->do_partial_eviction(p);
759     bjm_remove_background_job(cf->bjm);
760 }
761 
toku_cachetable_swap_pair_values(PAIR old_pair,PAIR new_pair)762 void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair) {
763     void* old_value = old_pair->value_data;
764     void* new_value = new_pair->value_data;
765     old_pair->value_data = new_value;
766     new_pair->value_data = old_value;
767 }
768 
toku_cachetable_maybe_flush_some(CACHETABLE ct)769 void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
770     // TODO: <CER> Maybe move this...
771     ct->ev.signal_eviction_thread();
772 }
773 
774 // Initializes a pair's members.
775 //
pair_init(PAIR p,CACHEFILE cachefile,CACHEKEY key,void * value,PAIR_ATTR attr,enum cachetable_dirty dirty,uint32_t fullhash,CACHETABLE_WRITE_CALLBACK write_callback,evictor * ev,pair_list * list)776 void pair_init(PAIR p,
777     CACHEFILE cachefile,
778     CACHEKEY key,
779     void *value,
780     PAIR_ATTR attr,
781     enum cachetable_dirty dirty,
782     uint32_t fullhash,
783     CACHETABLE_WRITE_CALLBACK write_callback,
784     evictor *ev,
785     pair_list *list)
786 {
787     p->cachefile = cachefile;
788     p->key = key;
789     p->value_data = value;
790     p->cloned_value_data = NULL;
791     p->cloned_value_size = 0;
792     p->disk_data = NULL;
793     p->attr = attr;
794     p->dirty = dirty;
795     p->fullhash = fullhash;
796 
797     p->flush_callback = write_callback.flush_callback;
798     p->pe_callback = write_callback.pe_callback;
799     p->pe_est_callback = write_callback.pe_est_callback;
800     p->cleaner_callback = write_callback.cleaner_callback;
801     p->clone_callback = write_callback.clone_callback;
802     p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback;
803     p->write_extraargs = write_callback.write_extraargs;
804 
805     p->count = 0;  // <CER> Is zero the correct init value?
806     p->refcount = 0;
807     p->num_waiting_on_refs = 0;
808     toku_cond_init(*cachetable_p_refcount_wait_key, &p->refcount_wait, nullptr);
809     p->checkpoint_pending = false;
810 
811     p->mutex = list->get_mutex_for_pair(fullhash);
812     assert(p->mutex);
813     p->value_rwlock.init(p->mutex
814 #ifdef TOKU_MYSQL_WITH_PFS
815                          ,
816                          *cachetable_value_key
817 #endif
818                          );
819     nb_mutex_init(*cachetable_disk_nb_mutex_key,
820                   *cachetable_disk_nb_rwlock_key,
821                   &p->disk_nb_mutex);
822 
823     p->size_evicting_estimate = 0;  // <CER> Is zero the correct init value?
824 
825     p->ev = ev;
826     p->list = list;
827 
828     p->clock_next = p->clock_prev = NULL;
829     p->pending_next = p->pending_prev = NULL;
830     p->cf_next = p->cf_prev = NULL;
831     p->hash_chain = NULL;
832 }
833 
834 // has ct locked on entry
835 // This function MUST NOT release and reacquire the cachetable lock
836 // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
837 //
838 // Requires pair list's write lock to be held on entry.
839 // the pair's mutex must be held as wel
840 //
841 //
cachetable_insert_at(CACHETABLE ct,CACHEFILE cachefile,CACHEKEY key,void * value,uint32_t fullhash,PAIR_ATTR attr,CACHETABLE_WRITE_CALLBACK write_callback,enum cachetable_dirty dirty)842 static PAIR cachetable_insert_at(CACHETABLE ct,
843                                  CACHEFILE cachefile, CACHEKEY key, void *value,
844                                  uint32_t fullhash,
845                                  PAIR_ATTR attr,
846                                  CACHETABLE_WRITE_CALLBACK write_callback,
847                                  enum cachetable_dirty dirty) {
848     PAIR MALLOC(p);
849     assert(p);
850     memset(p, 0, sizeof *p);
851     pair_init(p,
852         cachefile,
853         key,
854         value,
855         attr,
856         dirty,
857         fullhash,
858         write_callback,
859         &ct->ev,
860         &ct->list
861         );
862 
863     ct->list.put(p);
864     ct->ev.add_pair_attr(attr);
865     return p;
866 }
867 
868 // on input, the write list lock must be held AND
869 // the pair's mutex must be held as wel
cachetable_insert_pair_at(CACHETABLE ct,PAIR p,PAIR_ATTR attr)870 static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) {
871     ct->list.put(p);
872     ct->ev.add_pair_attr(attr);
873 }
874 
875 
876 // has ct locked on entry
877 // This function MUST NOT release and reacquire the cachetable lock
878 // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
879 //
880 // Requires pair list's write lock to be held on entry
881 //
cachetable_put_internal(CACHEFILE cachefile,PAIR p,void * value,PAIR_ATTR attr,CACHETABLE_PUT_CALLBACK put_callback)882 static void cachetable_put_internal(
883     CACHEFILE cachefile,
884     PAIR p,
885     void *value,
886     PAIR_ATTR attr,
887     CACHETABLE_PUT_CALLBACK put_callback
888     )
889 {
890     CACHETABLE ct = cachefile->cachetable;
891     //
892     //
893     // TODO: (Zardosht), make code run in debug only
894     //
895     //
896     //PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash);
897     //invariant_null(dummy_p);
898     cachetable_insert_pair_at(ct, p, attr);
899     invariant_notnull(put_callback);
900     put_callback(p->key, value, p);
901 }
902 
903 // Pair mutex (p->mutex) is may or may not be held on entry,
904 // Holding the pair mutex on entry is not important
905 // for performance or corrrectness
906 // Pair is pinned on entry
907 static void
clone_pair(evictor * ev,PAIR p)908 clone_pair(evictor* ev, PAIR p) {
909     PAIR_ATTR old_attr = p->attr;
910     PAIR_ATTR new_attr;
911     long clone_size = 0;
912 
913     // act of cloning should be fast,
914     // not sure if we have to release
915     // and regrab the cachetable lock,
916     // but doing it for now
917     p->clone_callback(
918         p->value_data,
919         &p->cloned_value_data,
920         &clone_size,
921         &new_attr,
922         true,
923         p->write_extraargs
924         );
925 
926     // now we need to do the same actions we would do
927     // if the PAIR had been written to disk
928     //
929     // because we hold the value_rwlock,
930     // it doesn't matter whether we clear
931     // the pending bit before the clone
932     // or after the clone
933     p->dirty = CACHETABLE_CLEAN;
934     if (new_attr.is_valid) {
935         p->attr = new_attr;
936         ev->change_pair_attr(old_attr, new_attr);
937     }
938     p->cloned_value_size = clone_size;
939     ev->add_cloned_data_size(p->cloned_value_size);
940 }
941 
checkpoint_cloned_pair(void * extra)942 static void checkpoint_cloned_pair(void* extra) {
943     PAIR p = (PAIR)extra;
944     CACHETABLE ct = p->cachefile->cachetable;
945     PAIR_ATTR new_attr;
946     // note that pending lock is not needed here because
947     // we KNOW we are in the middle of a checkpoint
948     // and that a begin_checkpoint cannot happen
949     cachetable_only_write_locked_data(
950         p->ev,
951         p,
952         true, //for_checkpoint
953         &new_attr,
954         true //is_clone
955         );
956     pair_lock(p);
957     nb_mutex_unlock(&p->disk_nb_mutex);
958     pair_unlock(p);
959     ct->cp.remove_background_job();
960 }
961 
962 static void
checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct,PAIR p)963 checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) {
964     toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p);
965 }
966 
967 
968 //
969 // Given a PAIR p with the value_rwlock altready held, do the following:
970 //  - If the PAIR needs to be written out to disk for checkpoint:
971 //   - If the PAIR is cloneable, clone the PAIR and place the work
972 //      of writing the PAIR on a background thread.
973 //   - If the PAIR is not cloneable, write the PAIR to disk for checkpoint
974 //      on the current thread
975 //
976 // On entry, pair's mutex is NOT held
977 //
978 static void
write_locked_pair_for_checkpoint(CACHETABLE ct,PAIR p,bool checkpoint_pending)979 write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending)
980 {
981     if (checkpoint_pending && p->checkpoint_complete_callback) {
982         p->checkpoint_complete_callback(p->value_data);
983     }
984     if (p->dirty && checkpoint_pending) {
985         if (p->clone_callback) {
986             pair_lock(p);
987             nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
988             pair_unlock(p);
989             assert(!p->cloned_value_data);
990             clone_pair(&ct->ev, p);
991             assert(p->cloned_value_data);
992             // place it on the background thread and continue
993             // responsibility of writer thread to release disk_nb_mutex
994             ct->cp.add_background_job();
995             checkpoint_cloned_pair_on_writer_thread(ct, p);
996         }
997         else {
998             // The pair is not cloneable, just write the pair to disk
999             // we already have p->value_rwlock and we just do the write in our own thread.
1000             cachetable_write_locked_pair(&ct->ev, p, true); // keeps the PAIR's write lock
1001         }
1002     }
1003 }
1004 
1005 // On entry and exit: hold the pair's mutex (p->mutex)
1006 // Method:   take write lock
1007 //           maybe write out the node
1008 //           Else release write lock
1009 //
1010 static void
write_pair_for_checkpoint_thread(evictor * ev,PAIR p)1011 write_pair_for_checkpoint_thread (evictor* ev, PAIR p)
1012 {
1013     // Grab an exclusive lock on the pair.
1014     // If we grab an expensive lock, then other threads will return
1015     // TRY_AGAIN rather than waiting.  In production, the only time
1016     // another thread will check if grabbing a lock is expensive is when
1017     // we have a clone_callback (FTNODEs), so the act of checkpointing
1018     // will be cheap.  Also, much of the time we'll just be clearing
1019     // pending bits and that's definitely cheap. (see #5427)
1020     p->value_rwlock.write_lock(false);
1021     if (p->checkpoint_pending && p->checkpoint_complete_callback) {
1022         p->checkpoint_complete_callback(p->value_data);
1023     }
1024     if (p->dirty && p->checkpoint_pending) {
1025         if (p->clone_callback) {
1026             nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1027             assert(!p->cloned_value_data);
1028             clone_pair(ev, p);
1029             assert(p->cloned_value_data);
1030         }
1031         else {
1032             // The pair is not cloneable, just write the pair to disk
1033             // we already have p->value_rwlock and we just do the write in our own thread.
1034             // this will grab and release disk_nb_mutex
1035             pair_unlock(p);
1036             cachetable_write_locked_pair(ev, p, true); // keeps the PAIR's write lock
1037             pair_lock(p);
1038         }
1039         p->checkpoint_pending = false;
1040 
1041         // now release value_rwlock, before we write the PAIR out
1042         // so that the PAIR is available to client threads
1043         p->value_rwlock.write_unlock(); // didn't call cachetable_evict_pair so we have to unlock it ourselves.
1044         if (p->clone_callback) {
1045             // note that pending lock is not needed here because
1046             // we KNOW we are in the middle of a checkpoint
1047             // and that a begin_checkpoint cannot happen
1048             PAIR_ATTR attr;
1049             pair_unlock(p);
1050             cachetable_only_write_locked_data(
1051                 ev,
1052                 p,
1053                 true, //for_checkpoint
1054                 &attr,
1055                 true //is_clone
1056                 );
1057             pair_lock(p);
1058             nb_mutex_unlock(&p->disk_nb_mutex);
1059         }
1060     }
1061     else {
1062         //
1063         // we may clear the pending bit here because we have
1064         // both the cachetable lock and the PAIR lock.
1065         // The rule, as mentioned in  toku_cachetable_begin_checkpoint,
1066         // is that to clear the bit, we must have both the PAIR lock
1067         // and the pending lock
1068         //
1069         p->checkpoint_pending = false;
1070         p->value_rwlock.write_unlock();
1071     }
1072 }
1073 
1074 //
1075 // For each PAIR associated with these CACHEFILEs and CACHEKEYs
1076 // if the checkpoint_pending bit is set and the PAIR is dirty, write the PAIR
1077 // to disk.
1078 // We assume the PAIRs passed in have been locked by the client that made calls
1079 // into the cachetable that eventually make it here.
1080 //
checkpoint_dependent_pairs(CACHETABLE ct,uint32_t num_dependent_pairs,PAIR * dependent_pairs,bool * checkpoint_pending,enum cachetable_dirty * dependent_dirty)1081 static void checkpoint_dependent_pairs(
1082     CACHETABLE ct,
1083     uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1084     PAIR* dependent_pairs,
1085     bool* checkpoint_pending,
1086     enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1087     )
1088 {
1089      for (uint32_t i =0; i < num_dependent_pairs; i++) {
1090          PAIR curr_dep_pair = dependent_pairs[i];
1091          // we need to update the dirtyness of the dependent pair,
1092          // because the client may have dirtied it while holding its lock,
1093          // and if the pair is pending a checkpoint, it needs to be written out
1094          if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY;
1095          if (checkpoint_pending[i]) {
1096              write_locked_pair_for_checkpoint(ct, curr_dep_pair, checkpoint_pending[i]);
1097          }
1098      }
1099 }
1100 
toku_cachetable_put_with_dep_pairs(CACHEFILE cachefile,CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,void * value,PAIR_ATTR attr,CACHETABLE_WRITE_CALLBACK write_callback,void * get_key_and_fullhash_extra,uint32_t num_dependent_pairs,PAIR * dependent_pairs,enum cachetable_dirty * dependent_dirty,CACHEKEY * key,uint32_t * fullhash,CACHETABLE_PUT_CALLBACK put_callback)1101 void toku_cachetable_put_with_dep_pairs(
1102     CACHEFILE cachefile,
1103     CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
1104     void *value,
1105     PAIR_ATTR attr,
1106     CACHETABLE_WRITE_CALLBACK write_callback,
1107     void *get_key_and_fullhash_extra,
1108     uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1109     PAIR* dependent_pairs,
1110     enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs
1111     CACHEKEY* key,
1112     uint32_t* fullhash,
1113     CACHETABLE_PUT_CALLBACK put_callback
1114     )
1115 {
1116     //
1117     // need to get the key and filehash
1118     //
1119     CACHETABLE ct = cachefile->cachetable;
1120     if (ct->ev.should_client_thread_sleep()) {
1121         ct->ev.wait_for_cache_pressure_to_subside();
1122     }
1123     if (ct->ev.should_client_wake_eviction_thread()) {
1124         ct->ev.signal_eviction_thread();
1125     }
1126 
1127     PAIR p = NULL;
1128     XMALLOC(p);
1129     memset(p, 0, sizeof *p);
1130 
1131     ct->list.write_list_lock();
1132     get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
1133     pair_init(
1134         p,
1135         cachefile,
1136         *key,
1137         value,
1138         attr,
1139         CACHETABLE_DIRTY,
1140         *fullhash,
1141         write_callback,
1142         &ct->ev,
1143         &ct->list
1144         );
1145     pair_lock(p);
1146     p->value_rwlock.write_lock(true);
1147     cachetable_put_internal(
1148         cachefile,
1149         p,
1150         value,
1151         attr,
1152         put_callback
1153         );
1154     pair_unlock(p);
1155     bool checkpoint_pending[num_dependent_pairs];
1156     ct->list.write_pending_cheap_lock();
1157     for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1158         checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1159         dependent_pairs[i]->checkpoint_pending = false;
1160     }
1161     ct->list.write_pending_cheap_unlock();
1162     ct->list.write_list_unlock();
1163 
1164     //
1165     // now that we have inserted the row, let's checkpoint the
1166     // dependent nodes, if they need checkpointing
1167     //
1168     checkpoint_dependent_pairs(
1169         ct,
1170         num_dependent_pairs,
1171         dependent_pairs,
1172         checkpoint_pending,
1173         dependent_dirty
1174         );
1175 }
1176 
toku_cachetable_put(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,void * value,PAIR_ATTR attr,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_PUT_CALLBACK put_callback)1177 void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void*value, PAIR_ATTR attr,
1178                         CACHETABLE_WRITE_CALLBACK write_callback,
1179                         CACHETABLE_PUT_CALLBACK put_callback
1180                         ) {
1181     CACHETABLE ct = cachefile->cachetable;
1182     if (ct->ev.should_client_thread_sleep()) {
1183         ct->ev.wait_for_cache_pressure_to_subside();
1184     }
1185     if (ct->ev.should_client_wake_eviction_thread()) {
1186         ct->ev.signal_eviction_thread();
1187     }
1188 
1189     PAIR p = NULL;
1190     XMALLOC(p);
1191     memset(p, 0, sizeof *p);
1192 
1193     ct->list.write_list_lock();
1194     pair_init(
1195         p,
1196         cachefile,
1197         key,
1198         value,
1199         attr,
1200         CACHETABLE_DIRTY,
1201         fullhash,
1202         write_callback,
1203         &ct->ev,
1204         &ct->list
1205         );
1206     pair_lock(p);
1207     p->value_rwlock.write_lock(true);
1208     cachetable_put_internal(
1209         cachefile,
1210         p,
1211         value,
1212         attr,
1213         put_callback
1214         );
1215     pair_unlock(p);
1216     ct->list.write_list_unlock();
1217 }
1218 
get_tnow(void)1219 static uint64_t get_tnow(void) {
1220     struct timeval tv;
1221     int r = gettimeofday(&tv, NULL); assert(r == 0);
1222     return tv.tv_sec * 1000000ULL + tv.tv_usec;
1223 }
1224 
1225 //
1226 // cachetable lock and PAIR lock are held on entry
1227 // On exit, cachetable lock is still held, but PAIR lock
1228 // is either released.
1229 //
1230 // No locks are held on entry (besides the rwlock write lock  of the PAIR)
1231 //
1232 static void
do_partial_fetch(CACHETABLE ct,CACHEFILE cachefile,PAIR p,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,bool keep_pair_locked)1233 do_partial_fetch(
1234     CACHETABLE ct,
1235     CACHEFILE cachefile,
1236     PAIR p,
1237     CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1238     void *read_extraargs,
1239     bool keep_pair_locked
1240     )
1241 {
1242     PAIR_ATTR old_attr = p->attr;
1243     PAIR_ATTR new_attr = zero_attr;
1244     // As of Dr. No, only clean PAIRs may have pieces missing,
1245     // so we do a sanity check here.
1246     assert(!p->dirty);
1247 
1248     pair_lock(p);
1249     invariant(p->value_rwlock.writers());
1250     nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1251     pair_unlock(p);
1252     int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr);
1253     lazy_assert_zero(r);
1254     p->attr = new_attr;
1255     ct->ev.change_pair_attr(old_attr, new_attr);
1256     pair_lock(p);
1257     nb_mutex_unlock(&p->disk_nb_mutex);
1258     if (!keep_pair_locked) {
1259         p->value_rwlock.write_unlock();
1260     }
1261     pair_unlock(p);
1262 }
1263 
toku_cachetable_pf_pinned_pair(void * value,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,CACHEFILE cf,CACHEKEY key,uint32_t fullhash)1264 void toku_cachetable_pf_pinned_pair(
1265     void* value,
1266     CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1267     void* read_extraargs,
1268     CACHEFILE cf,
1269     CACHEKEY key,
1270     uint32_t fullhash
1271     )
1272 {
1273     PAIR_ATTR attr;
1274     PAIR p = NULL;
1275     CACHETABLE ct = cf->cachetable;
1276     ct->list.pair_lock_by_fullhash(fullhash);
1277     p = ct->list.find_pair(cf, key, fullhash);
1278     assert(p != NULL);
1279     assert(p->value_data == value);
1280     assert(p->value_rwlock.writers());
1281     nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1282     pair_unlock(p);
1283 
1284     int fd = cf->fd;
1285     pf_callback(value, p->disk_data, read_extraargs, fd, &attr);
1286 
1287     pair_lock(p);
1288     nb_mutex_unlock(&p->disk_nb_mutex);
1289     pair_unlock(p);
1290 }
1291 
toku_cachetable_get_and_pin(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,void ** value,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,bool may_modify_value,void * read_extraargs)1292 int toku_cachetable_get_and_pin (
1293     CACHEFILE cachefile,
1294     CACHEKEY key,
1295     uint32_t fullhash,
1296     void**value,
1297     CACHETABLE_WRITE_CALLBACK write_callback,
1298     CACHETABLE_FETCH_CALLBACK fetch_callback,
1299     CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1300     CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1301     bool may_modify_value,
1302     void* read_extraargs // parameter for fetch_callback, pf_req_callback, and pf_callback
1303     )
1304 {
1305     pair_lock_type lock_type = may_modify_value ? PL_WRITE_EXPENSIVE : PL_READ;
1306     // We have separate parameters of read_extraargs and write_extraargs because
1307     // the lifetime of the two parameters are different. write_extraargs may be used
1308     // long after this function call (e.g. after a flush to disk), whereas read_extraargs
1309     // will not be used after this function returns. As a result, the caller may allocate
1310     // read_extraargs on the stack, whereas write_extraargs must be allocated
1311     // on the heap.
1312     return toku_cachetable_get_and_pin_with_dep_pairs (
1313         cachefile,
1314         key,
1315         fullhash,
1316         value,
1317         write_callback,
1318         fetch_callback,
1319         pf_req_callback,
1320         pf_callback,
1321         lock_type,
1322         read_extraargs,
1323         0, // number of dependent pairs that we may need to checkpoint
1324         NULL, // array of dependent pairs
1325         NULL // array stating dirty/cleanness of dependent pairs
1326         );
1327 }
1328 
1329 // Read a pair from a cachefile into memory using the pair's fetch callback
1330 // on entry, pair mutex (p->mutex) is NOT held, but pair is pinned
cachetable_fetch_pair(CACHETABLE ct,CACHEFILE cf,PAIR p,CACHETABLE_FETCH_CALLBACK fetch_callback,void * read_extraargs,bool keep_pair_locked)1331 static void cachetable_fetch_pair(
1332     CACHETABLE ct,
1333     CACHEFILE cf,
1334     PAIR p,
1335     CACHETABLE_FETCH_CALLBACK fetch_callback,
1336     void* read_extraargs,
1337     bool keep_pair_locked
1338     )
1339 {
1340     // helgrind
1341     CACHEKEY key = p->key;
1342     uint32_t fullhash = p->fullhash;
1343 
1344     void *toku_value = NULL;
1345     void *disk_data = NULL;
1346     PAIR_ATTR attr;
1347 
1348     // FIXME this should be enum cachetable_dirty, right?
1349     int dirty = 0;
1350 
1351     pair_lock(p);
1352     nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1353     pair_unlock(p);
1354 
1355     int r;
1356     r = fetch_callback(cf, p, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs);
1357     if (dirty) {
1358         p->dirty = CACHETABLE_DIRTY;
1359     }
1360     assert(r == 0);
1361 
1362     p->value_data = toku_value;
1363     p->disk_data = disk_data;
1364     p->attr = attr;
1365     ct->ev.add_pair_attr(attr);
1366     pair_lock(p);
1367     nb_mutex_unlock(&p->disk_nb_mutex);
1368     if (!keep_pair_locked) {
1369         p->value_rwlock.write_unlock();
1370     }
1371     pair_unlock(p);
1372 }
1373 
get_checkpoint_pending(PAIR p,pair_list * pl)1374 static bool get_checkpoint_pending(PAIR p, pair_list* pl) {
1375     bool checkpoint_pending = false;
1376     pl->read_pending_cheap_lock();
1377     checkpoint_pending = p->checkpoint_pending;
1378     p->checkpoint_pending = false;
1379     pl->read_pending_cheap_unlock();
1380     return checkpoint_pending;
1381 }
1382 
checkpoint_pair_and_dependent_pairs(CACHETABLE ct,PAIR p,bool p_is_pending_checkpoint,uint32_t num_dependent_pairs,PAIR * dependent_pairs,bool * dependent_pairs_pending_checkpoint,enum cachetable_dirty * dependent_dirty)1383 static void checkpoint_pair_and_dependent_pairs(
1384     CACHETABLE ct,
1385     PAIR p,
1386     bool p_is_pending_checkpoint,
1387     uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1388     PAIR* dependent_pairs,
1389     bool* dependent_pairs_pending_checkpoint,
1390     enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1391     )
1392 {
1393 
1394     //
1395     // A checkpoint must not begin while we are checking dependent pairs or pending bits.
1396     // Here is why.
1397     //
1398     // Now that we have all of the locks on the pairs we
1399     // care about, we can take care of the necessary checkpointing.
1400     // For each pair, we simply need to write the pair if it is
1401     // pending a checkpoint. If no pair is pending a checkpoint,
1402     // then all of this work will be done with the cachetable lock held,
1403     // so we don't need to worry about a checkpoint beginning
1404     // in the middle of any operation below. If some pair
1405     // is pending a checkpoint, then the checkpoint thread
1406     // will not complete its current checkpoint until it can
1407     // successfully grab a lock on the pending pair and
1408     // remove it from its list of pairs pending a checkpoint.
1409     // This cannot be done until we release the lock
1410     // that we have, which is not done in this function.
1411     // So, the point is, it is impossible for a checkpoint
1412     // to begin while we write any of these locked pairs
1413     // for checkpoint, even though writing a pair releases
1414     // the cachetable lock.
1415     //
1416     write_locked_pair_for_checkpoint(ct, p, p_is_pending_checkpoint);
1417 
1418     checkpoint_dependent_pairs(
1419         ct,
1420         num_dependent_pairs,
1421         dependent_pairs,
1422         dependent_pairs_pending_checkpoint,
1423         dependent_dirty
1424         );
1425 }
1426 
unpin_pair(PAIR p,bool read_lock_grabbed)1427 static void unpin_pair(PAIR p, bool read_lock_grabbed) {
1428     if (read_lock_grabbed) {
1429         p->value_rwlock.read_unlock();
1430     }
1431     else {
1432         p->value_rwlock.write_unlock();
1433     }
1434 }
1435 
1436 
1437 // on input, the pair's mutex is held,
1438 // on output, the pair's mutex is not held.
1439 // if true, we must try again, and pair is not pinned
1440 // if false, we succeeded, the pair is pinned
try_pin_pair(PAIR p,CACHETABLE ct,CACHEFILE cachefile,pair_lock_type lock_type,uint32_t num_dependent_pairs,PAIR * dependent_pairs,enum cachetable_dirty * dependent_dirty,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,bool already_slept)1441 static bool try_pin_pair(
1442     PAIR p,
1443     CACHETABLE ct,
1444     CACHEFILE cachefile,
1445     pair_lock_type lock_type,
1446     uint32_t num_dependent_pairs,
1447     PAIR* dependent_pairs,
1448     enum cachetable_dirty* dependent_dirty,
1449     CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1450     CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1451     void* read_extraargs,
1452     bool already_slept
1453     )
1454 {
1455     bool dep_checkpoint_pending[num_dependent_pairs];
1456     bool try_again = true;
1457     bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
1458     if (lock_type != PL_READ) {
1459         p->value_rwlock.write_lock(expensive);
1460     }
1461     else {
1462         p->value_rwlock.read_lock();
1463     }
1464     pair_touch(p);
1465     pair_unlock(p);
1466 
1467     bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
1468 
1469     if (partial_fetch_required) {
1470         toku::context pf_ctx(CTX_PARTIAL_FETCH);
1471 
1472         if (ct->ev.should_client_thread_sleep() && !already_slept) {
1473             pair_lock(p);
1474             unpin_pair(p, (lock_type == PL_READ));
1475             pair_unlock(p);
1476             try_again = true;
1477             goto exit;
1478         }
1479         if (ct->ev.should_client_wake_eviction_thread()) {
1480             ct->ev.signal_eviction_thread();
1481         }
1482         //
1483         // Just because the PAIR exists does necessarily mean the all the data the caller requires
1484         // is in memory. A partial fetch may be required, which is evaluated above
1485         // if the variable is true, a partial fetch is required so we must grab the PAIR's write lock
1486         // and then call a callback to retrieve what we need
1487         //
1488         assert(partial_fetch_required);
1489         // As of Dr. No, only clean PAIRs may have pieces missing,
1490         // so we do a sanity check here.
1491         assert(!p->dirty);
1492 
1493         if (lock_type == PL_READ) {
1494             pair_lock(p);
1495             p->value_rwlock.read_unlock();
1496             p->value_rwlock.write_lock(true);
1497             pair_unlock(p);
1498         }
1499         else if (lock_type == PL_WRITE_CHEAP) {
1500             pair_lock(p);
1501             p->value_rwlock.write_unlock();
1502             p->value_rwlock.write_lock(true);
1503             pair_unlock(p);
1504         }
1505 
1506         partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
1507         if (partial_fetch_required) {
1508             do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, true);
1509         }
1510         if (lock_type == PL_READ) {
1511             //
1512             // TODO: Zardosht, somehow ensure that a partial eviction cannot happen
1513             // between these two calls
1514             //
1515             pair_lock(p);
1516             p->value_rwlock.write_unlock();
1517             p->value_rwlock.read_lock();
1518             pair_unlock(p);
1519         }
1520         else if (lock_type == PL_WRITE_CHEAP) {
1521             pair_lock(p);
1522             p->value_rwlock.write_unlock();
1523             p->value_rwlock.write_lock(false);
1524             pair_unlock(p);
1525         }
1526         // small hack here for #5439,
1527         // for queries, pf_req_callback does some work for the caller,
1528         // that information may be out of date after a write_unlock
1529         // followed by a relock, so we do it again.
1530         bool pf_required = pf_req_callback(p->value_data,read_extraargs);
1531         assert(!pf_required);
1532     }
1533 
1534     if (lock_type != PL_READ) {
1535         ct->list.read_pending_cheap_lock();
1536         bool p_checkpoint_pending = p->checkpoint_pending;
1537         p->checkpoint_pending = false;
1538         for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1539             dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1540             dependent_pairs[i]->checkpoint_pending = false;
1541         }
1542         ct->list.read_pending_cheap_unlock();
1543         checkpoint_pair_and_dependent_pairs(
1544             ct,
1545             p,
1546             p_checkpoint_pending,
1547             num_dependent_pairs,
1548             dependent_pairs,
1549             dep_checkpoint_pending,
1550             dependent_dirty
1551             );
1552     }
1553 
1554     try_again = false;
1555 exit:
1556     return try_again;
1557 }
1558 
toku_cachetable_get_and_pin_with_dep_pairs(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,void ** value,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,pair_lock_type lock_type,void * read_extraargs,uint32_t num_dependent_pairs,PAIR * dependent_pairs,enum cachetable_dirty * dependent_dirty)1559 int toku_cachetable_get_and_pin_with_dep_pairs (
1560     CACHEFILE cachefile,
1561     CACHEKEY key,
1562     uint32_t fullhash,
1563     void**value,
1564     CACHETABLE_WRITE_CALLBACK write_callback,
1565     CACHETABLE_FETCH_CALLBACK fetch_callback,
1566     CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1567     CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1568     pair_lock_type lock_type,
1569     void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
1570     uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1571     PAIR* dependent_pairs,
1572     enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1573     )
1574 // See cachetable/cachetable.h
1575 {
1576     CACHETABLE ct = cachefile->cachetable;
1577     bool wait = false;
1578     bool already_slept = false;
1579     bool dep_checkpoint_pending[num_dependent_pairs];
1580 
1581     //
1582     // If in the process of pinning the node we add data to the cachetable via a partial fetch
1583     // or a full fetch, we may need to first sleep because there is too much data in the
1584     // cachetable. In those cases, we set the bool wait to true and goto try_again, so that
1585     // we can do our sleep and then restart the function.
1586     //
1587 beginning:
1588     if (wait) {
1589         // We shouldn't be holding the read list lock while
1590         // waiting for the evictor to remove pairs.
1591         already_slept = true;
1592         ct->ev.wait_for_cache_pressure_to_subside();
1593     }
1594 
1595     ct->list.pair_lock_by_fullhash(fullhash);
1596     PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1597     if (p) {
1598         // on entry, holds p->mutex (which is locked via pair_lock_by_fullhash)
1599         // on exit, does not hold p->mutex
1600         bool try_again = try_pin_pair(
1601             p,
1602             ct,
1603             cachefile,
1604             lock_type,
1605             num_dependent_pairs,
1606             dependent_pairs,
1607             dependent_dirty,
1608             pf_req_callback,
1609             pf_callback,
1610             read_extraargs,
1611             already_slept
1612             );
1613         if (try_again) {
1614             wait = true;
1615             goto beginning;
1616         }
1617         else {
1618             goto got_value;
1619         }
1620     }
1621     else {
1622         toku::context fetch_ctx(CTX_FULL_FETCH);
1623 
1624         ct->list.pair_unlock_by_fullhash(fullhash);
1625         // we only want to sleep once per call to get_and_pin. If we have already
1626         // slept and there is still cache pressure, then we might as
1627         // well just complete the call, because the sleep did not help
1628         // By sleeping only once per get_and_pin, we prevent starvation and ensure
1629         // that we make progress (however slow) on each thread, which allows
1630         // assumptions of the form 'x will eventually happen'.
1631         // This happens in extreme scenarios.
1632         if (ct->ev.should_client_thread_sleep() && !already_slept) {
1633             wait = true;
1634             goto beginning;
1635         }
1636         if (ct->ev.should_client_wake_eviction_thread()) {
1637             ct->ev.signal_eviction_thread();
1638         }
1639         // Since the pair was not found, we need the write list
1640         // lock to add it.  So, we have to release the read list lock
1641         // first.
1642         ct->list.write_list_lock();
1643         ct->list.pair_lock_by_fullhash(fullhash);
1644         p = ct->list.find_pair(cachefile, key, fullhash);
1645         if (p != NULL) {
1646             ct->list.write_list_unlock();
1647             // on entry, holds p->mutex,
1648             // on exit, does not hold p->mutex
1649             bool try_again = try_pin_pair(
1650                 p,
1651                 ct,
1652                 cachefile,
1653                 lock_type,
1654                 num_dependent_pairs,
1655                 dependent_pairs,
1656                 dependent_dirty,
1657                 pf_req_callback,
1658                 pf_callback,
1659                 read_extraargs,
1660                 already_slept
1661                 );
1662             if (try_again) {
1663                 wait = true;
1664                 goto beginning;
1665             }
1666             else {
1667                 goto got_value;
1668             }
1669         }
1670         assert(p == NULL);
1671 
1672         // Insert a PAIR into the cachetable
1673         // NOTE: At this point we still have the write list lock held.
1674         p = cachetable_insert_at(
1675             ct,
1676             cachefile,
1677             key,
1678             zero_value,
1679             fullhash,
1680             zero_attr,
1681             write_callback,
1682             CACHETABLE_CLEAN
1683             );
1684         invariant_notnull(p);
1685 
1686         // Pin the pair.
1687         p->value_rwlock.write_lock(true);
1688         pair_unlock(p);
1689 
1690 
1691         if (lock_type != PL_READ) {
1692             ct->list.read_pending_cheap_lock();
1693             invariant(!p->checkpoint_pending);
1694             for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1695                 dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1696                 dependent_pairs[i]->checkpoint_pending = false;
1697             }
1698             ct->list.read_pending_cheap_unlock();
1699         }
1700         // We should release the lock before we perform
1701         // these expensive operations.
1702         ct->list.write_list_unlock();
1703 
1704         if (lock_type != PL_READ) {
1705             checkpoint_dependent_pairs(
1706                 ct,
1707                 num_dependent_pairs,
1708                 dependent_pairs,
1709                 dep_checkpoint_pending,
1710                 dependent_dirty
1711                 );
1712         }
1713         uint64_t t0 = get_tnow();
1714 
1715         // Retrieve the value of the PAIR from disk.
1716         // The pair being fetched will be marked as pending if a checkpoint happens during the
1717         // fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean.
1718         cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, true);
1719         cachetable_miss++;
1720         cachetable_misstime += get_tnow() - t0;
1721 
1722         // If the lock_type requested was a PL_READ, we downgrade to PL_READ,
1723         // but if the request was for a PL_WRITE_CHEAP, we don't bother
1724         // downgrading, because we would have to possibly resolve the
1725         // checkpointing again, and that would just make this function even
1726         // messier.
1727         //
1728         // TODO(yoni): in case of PL_WRITE_CHEAP, write and use
1729         // p->value_rwlock.write_change_status_to_not_expensive(); (Also name it better)
1730         // to downgrade from an expensive write lock to a cheap one
1731         if (lock_type == PL_READ) {
1732             pair_lock(p);
1733             p->value_rwlock.write_unlock();
1734             p->value_rwlock.read_lock();
1735             pair_unlock(p);
1736             // small hack here for #5439,
1737             // for queries, pf_req_callback does some work for the caller,
1738             // that information may be out of date after a write_unlock
1739             // followed by a read_lock, so we do it again.
1740             bool pf_required = pf_req_callback(p->value_data,read_extraargs);
1741             assert(!pf_required);
1742         }
1743         goto got_value;
1744     }
1745 got_value:
1746     *value = p->value_data;
1747     return 0;
1748 }
1749 
1750 // Lookup a key in the cachetable.  If it is found and it is not being written, then
1751 // acquire a read lock on the pair, update the LRU list, and return sucess.
1752 //
1753 // However, if the page is clean or has checkpoint pending, don't return success.
1754 // This will minimize the number of dirty nodes.
1755 // Rationale:  maybe_get_and_pin is used when the system has an alternative to modifying a node.
1756 //  In the context of checkpointing, we don't want to gratuituously dirty a page, because it causes an I/O.
1757 //  For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify
1758 //  the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit.
1759 //  Similarly, if the checkpoint is actually pending, we don't want to block on it.
toku_cachetable_maybe_get_and_pin(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,pair_lock_type lock_type,void ** value)1760 int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
1761     CACHETABLE ct = cachefile->cachetable;
1762     int r = -1;
1763     ct->list.pair_lock_by_fullhash(fullhash);
1764     PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1765     if (p) {
1766         const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
1767         bool got_lock = false;
1768         switch (lock_type) {
1769         case PL_READ:
1770             if (p->value_rwlock.try_read_lock()) {
1771                 got_lock = p->dirty;
1772 
1773                 if (!got_lock) {
1774                     p->value_rwlock.read_unlock();
1775                 }
1776             }
1777             break;
1778         case PL_WRITE_CHEAP:
1779         case PL_WRITE_EXPENSIVE:
1780             if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
1781                 // we got the lock fast, so continue
1782                 ct->list.read_pending_cheap_lock();
1783 
1784                 // if pending a checkpoint, then we don't want to return
1785                 // the value to the user, because we are responsible for
1786                 // handling the checkpointing, which we do not want to do,
1787                 // because it is expensive
1788                 got_lock = p->dirty && !p->checkpoint_pending;
1789 
1790                 ct->list.read_pending_cheap_unlock();
1791                 if (!got_lock) {
1792                     p->value_rwlock.write_unlock();
1793                 }
1794             }
1795             break;
1796         }
1797         if (got_lock) {
1798             pair_touch(p);
1799             *value = p->value_data;
1800             r = 0;
1801         }
1802     }
1803     ct->list.pair_unlock_by_fullhash(fullhash);
1804     return r;
1805 }
1806 
1807 //Used by flusher threads to possibly pin child on client thread if pinning is cheap
1808 //Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless).
1809 //All other conditions remain the same.
toku_cachetable_maybe_get_and_pin_clean(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,pair_lock_type lock_type,void ** value)1810 int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
1811     CACHETABLE ct = cachefile->cachetable;
1812     int r = -1;
1813     ct->list.pair_lock_by_fullhash(fullhash);
1814     PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1815     if (p) {
1816         const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
1817         bool got_lock = false;
1818         switch (lock_type) {
1819         case PL_READ:
1820             if (p->value_rwlock.try_read_lock()) {
1821                 got_lock = true;
1822             } else if (!p->value_rwlock.read_lock_is_expensive()) {
1823                 p->value_rwlock.write_lock(lock_is_expensive);
1824                 got_lock = true;
1825             }
1826             if (got_lock) {
1827                 pair_touch(p);
1828             }
1829             pair_unlock(p);
1830             break;
1831         case PL_WRITE_CHEAP:
1832         case PL_WRITE_EXPENSIVE:
1833             if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
1834                 got_lock = true;
1835             } else if (!p->value_rwlock.write_lock_is_expensive()) {
1836                 p->value_rwlock.write_lock(lock_is_expensive);
1837                 got_lock = true;
1838             }
1839             if (got_lock) {
1840                 pair_touch(p);
1841             }
1842             pair_unlock(p);
1843             if (got_lock) {
1844                 bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
1845                 write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
1846             }
1847             break;
1848         }
1849         if (got_lock) {
1850             *value = p->value_data;
1851             r = 0;
1852         }
1853     } else {
1854         ct->list.pair_unlock_by_fullhash(fullhash);
1855     }
1856     return r;
1857 }
1858 
toku_cachetable_get_attr(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,PAIR_ATTR * attr)1859 int toku_cachetable_get_attr (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, PAIR_ATTR *attr) {
1860     CACHETABLE ct = cachefile->cachetable;
1861     int r;
1862     ct->list.pair_lock_by_fullhash(fullhash);
1863     PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1864     if (p) {
1865         // Assumes pair lock and full hash lock are the same mutex
1866         *attr = p->attr;
1867         r = 0;
1868     } else {
1869         r = -1;
1870     }
1871     ct->list.pair_unlock_by_fullhash(fullhash);
1872     return r;
1873 }
1874 
1875 //
1876 // internal function to unpin a PAIR.
1877 // As of Clayface, this is may be called in two ways:
1878 //  - with flush false
1879 //  - with flush true
1880 // The first is for when this is run during run_unlockers in
1881 // toku_cachetable_get_and_pin_nonblocking, the second is during
1882 // normal operations. Only during normal operations do we want to possibly
1883 // induce evictions or sleep.
1884 //
1885 static int
cachetable_unpin_internal(CACHEFILE cachefile,PAIR p,enum cachetable_dirty dirty,PAIR_ATTR attr,bool flush)1886 cachetable_unpin_internal(
1887     CACHEFILE cachefile,
1888     PAIR p,
1889     enum cachetable_dirty dirty,
1890     PAIR_ATTR attr,
1891     bool flush
1892     )
1893 {
1894     invariant_notnull(p);
1895 
1896     CACHETABLE ct = cachefile->cachetable;
1897     bool added_data_to_cachetable = false;
1898 
1899     // hack for #3969, only exists in case where we run unlockers
1900     pair_lock(p);
1901     PAIR_ATTR old_attr = p->attr;
1902     PAIR_ATTR new_attr = attr;
1903     if (dirty) {
1904         p->dirty = CACHETABLE_DIRTY;
1905     }
1906     if (attr.is_valid) {
1907         p->attr = attr;
1908     }
1909     bool read_lock_grabbed = p->value_rwlock.readers() != 0;
1910     unpin_pair(p, read_lock_grabbed);
1911     pair_unlock(p);
1912 
1913     if (attr.is_valid) {
1914         if (new_attr.size > old_attr.size) {
1915             added_data_to_cachetable = true;
1916         }
1917         ct->ev.change_pair_attr(old_attr, new_attr);
1918     }
1919 
1920     // see comments above this function to understand this code
1921     if (flush && added_data_to_cachetable) {
1922         if (ct->ev.should_client_thread_sleep()) {
1923             ct->ev.wait_for_cache_pressure_to_subside();
1924         }
1925         if (ct->ev.should_client_wake_eviction_thread()) {
1926             ct->ev.signal_eviction_thread();
1927         }
1928     }
1929     return 0;
1930 }
1931 
toku_cachetable_unpin(CACHEFILE cachefile,PAIR p,enum cachetable_dirty dirty,PAIR_ATTR attr)1932 int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
1933     return cachetable_unpin_internal(cachefile, p, dirty, attr, true);
1934 }
toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile,PAIR p,enum cachetable_dirty dirty,PAIR_ATTR attr)1935 int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
1936     return cachetable_unpin_internal(cachefile, p, dirty, attr, false);
1937 }
1938 
1939 static void
run_unlockers(UNLOCKERS unlockers)1940 run_unlockers (UNLOCKERS unlockers) {
1941     while (unlockers) {
1942         assert(unlockers->locked);
1943         unlockers->locked = false;
1944         unlockers->f(unlockers->extra);
1945         unlockers=unlockers->next;
1946     }
1947 }
1948 
1949 //
1950 // This function tries to pin the pair without running the unlockers.
1951 // If it can pin the pair cheaply, it does so, and returns 0.
1952 // If the pin will be expensive, it runs unlockers,
1953 // pins the pair, then releases the pin,
1954 // and then returns TOKUDB_TRY_AGAIN
1955 //
1956 // on entry, pair mutex is held,
1957 // on exit, pair mutex is NOT held
1958 static int
maybe_pin_pair(PAIR p,pair_lock_type lock_type,UNLOCKERS unlockers)1959 maybe_pin_pair(
1960     PAIR p,
1961     pair_lock_type lock_type,
1962     UNLOCKERS unlockers
1963     )
1964 {
1965     int retval = 0;
1966     bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
1967 
1968     // we can pin the PAIR. In each case, we check to see
1969     // if acquiring the pin is expensive. If so, we run the unlockers, set the
1970     // retval to TOKUDB_TRY_AGAIN, pin AND release the PAIR.
1971     // If not, then we pin the PAIR, keep retval at 0, and do not
1972     // run the unlockers, as we intend to return the value to the user
1973     if (lock_type == PL_READ) {
1974         if (p->value_rwlock.read_lock_is_expensive()) {
1975             pair_add_ref_unlocked(p);
1976             pair_unlock(p);
1977             run_unlockers(unlockers);
1978             retval = TOKUDB_TRY_AGAIN;
1979             pair_lock(p);
1980             pair_release_ref_unlocked(p);
1981         }
1982         p->value_rwlock.read_lock();
1983     }
1984     else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){
1985         if (p->value_rwlock.write_lock_is_expensive()) {
1986             pair_add_ref_unlocked(p);
1987             pair_unlock(p);
1988             run_unlockers(unlockers);
1989             // change expensive to false because
1990             // we will unpin the pair immedietely
1991             // after pinning it
1992             expensive = false;
1993             retval = TOKUDB_TRY_AGAIN;
1994             pair_lock(p);
1995             pair_release_ref_unlocked(p);
1996         }
1997         p->value_rwlock.write_lock(expensive);
1998     }
1999     else {
2000         abort();
2001     }
2002 
2003     if (retval == TOKUDB_TRY_AGAIN) {
2004         unpin_pair(p, (lock_type == PL_READ));
2005     }
2006     pair_touch(p);
2007     pair_unlock(p);
2008     return retval;
2009 }
2010 
toku_cachetable_get_and_pin_nonblocking(CACHEFILE cf,CACHEKEY key,uint32_t fullhash,void ** value,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,pair_lock_type lock_type,void * read_extraargs,UNLOCKERS unlockers)2011 int toku_cachetable_get_and_pin_nonblocking(
2012     CACHEFILE cf,
2013     CACHEKEY key,
2014     uint32_t fullhash,
2015     void**value,
2016     CACHETABLE_WRITE_CALLBACK write_callback,
2017     CACHETABLE_FETCH_CALLBACK fetch_callback,
2018     CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
2019     CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
2020     pair_lock_type lock_type,
2021     void *read_extraargs,
2022     UNLOCKERS unlockers
2023     )
2024 // See cachetable/cachetable.h.
2025 {
2026     CACHETABLE ct = cf->cachetable;
2027     assert(lock_type == PL_READ ||
2028         lock_type == PL_WRITE_CHEAP ||
2029         lock_type == PL_WRITE_EXPENSIVE
2030         );
2031 try_again:
2032     ct->list.pair_lock_by_fullhash(fullhash);
2033     PAIR p = ct->list.find_pair(cf, key, fullhash);
2034     if (p == NULL) {
2035         toku::context fetch_ctx(CTX_FULL_FETCH);
2036 
2037         // Not found
2038         ct->list.pair_unlock_by_fullhash(fullhash);
2039         ct->list.write_list_lock();
2040         ct->list.pair_lock_by_fullhash(fullhash);
2041         p = ct->list.find_pair(cf, key, fullhash);
2042         if (p != NULL) {
2043             // we just did another search with the write list lock and
2044             // found the pair this means that in between our
2045             // releasing the read list lock and grabbing the write list lock,
2046             // another thread snuck in and inserted the PAIR into
2047             // the cachetable. For simplicity, we just return
2048             // to the top and restart the function
2049             ct->list.write_list_unlock();
2050             ct->list.pair_unlock_by_fullhash(fullhash);
2051             goto try_again;
2052         }
2053 
2054         p = cachetable_insert_at(
2055             ct,
2056             cf,
2057             key,
2058             zero_value,
2059             fullhash,
2060             zero_attr,
2061             write_callback,
2062             CACHETABLE_CLEAN
2063             );
2064         assert(p);
2065         // grab expensive write lock, because we are about to do a fetch
2066         // off disk
2067         // No one can access this pair because
2068         // we hold the write list lock and we just injected
2069         // the pair into the cachetable. Therefore, this lock acquisition
2070         // will not block.
2071         p->value_rwlock.write_lock(true);
2072         pair_unlock(p);
2073         run_unlockers(unlockers); // we hold the write list_lock.
2074         ct->list.write_list_unlock();
2075 
2076         // at this point, only the pair is pinned,
2077         // and no pair mutex held, and
2078         // no list lock is held
2079         uint64_t t0 = get_tnow();
2080         cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, false);
2081         cachetable_miss++;
2082         cachetable_misstime += get_tnow() - t0;
2083 
2084         if (ct->ev.should_client_thread_sleep()) {
2085             ct->ev.wait_for_cache_pressure_to_subside();
2086         }
2087         if (ct->ev.should_client_wake_eviction_thread()) {
2088             ct->ev.signal_eviction_thread();
2089         }
2090 
2091         return TOKUDB_TRY_AGAIN;
2092     }
2093     else {
2094         int r = maybe_pin_pair(p, lock_type, unlockers);
2095         if (r == TOKUDB_TRY_AGAIN) {
2096             return TOKUDB_TRY_AGAIN;
2097         }
2098         assert_zero(r);
2099 
2100         if (lock_type != PL_READ) {
2101             bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
2102             write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
2103         }
2104 
2105         // At this point, we have pinned the PAIR
2106         // and resolved its checkpointing. The pair's
2107         // mutex is not held. The read list lock IS held. Before
2108         // returning the PAIR to the user, we must
2109         // still check for partial fetch
2110         bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
2111         if (partial_fetch_required) {
2112             toku::context fetch_ctx(CTX_PARTIAL_FETCH);
2113 
2114             run_unlockers(unlockers);
2115 
2116             // we are now getting an expensive write lock, because we
2117             // are doing a partial fetch. So, if we previously have
2118             // either a read lock or a cheap write lock, we need to
2119             // release and reacquire the correct lock type
2120             if (lock_type == PL_READ) {
2121                 pair_lock(p);
2122                 p->value_rwlock.read_unlock();
2123                 p->value_rwlock.write_lock(true);
2124                 pair_unlock(p);
2125             }
2126             else if (lock_type == PL_WRITE_CHEAP) {
2127                 pair_lock(p);
2128                 p->value_rwlock.write_unlock();
2129                 p->value_rwlock.write_lock(true);
2130                 pair_unlock(p);
2131             }
2132 
2133             // Now wait for the I/O to occur.
2134             partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
2135             if (partial_fetch_required) {
2136                 do_partial_fetch(ct, cf, p, pf_callback, read_extraargs, false);
2137             }
2138             else {
2139                 pair_lock(p);
2140                 p->value_rwlock.write_unlock();
2141                 pair_unlock(p);
2142             }
2143 
2144             if (ct->ev.should_client_thread_sleep()) {
2145                 ct->ev.wait_for_cache_pressure_to_subside();
2146             }
2147             if (ct->ev.should_client_wake_eviction_thread()) {
2148                 ct->ev.signal_eviction_thread();
2149             }
2150 
2151             return TOKUDB_TRY_AGAIN;
2152         }
2153         else {
2154             *value = p->value_data;
2155             return 0;
2156         }
2157     }
2158     // We should not get here. Above code should hit a return in all cases.
2159     abort();
2160 }
2161 
2162 struct cachefile_prefetch_args {
2163     PAIR p;
2164     CACHETABLE_FETCH_CALLBACK fetch_callback;
2165     void* read_extraargs;
2166 };
2167 
2168 struct cachefile_partial_prefetch_args {
2169     PAIR p;
2170     CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback;
2171     void *read_extraargs;
2172 };
2173 
2174 // Worker thread function to read a pair from a cachefile to memory
cachetable_reader(void * extra)2175 static void cachetable_reader(void* extra) {
2176     struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra;
2177     CACHEFILE cf = cpargs->p->cachefile;
2178     CACHETABLE ct = cf->cachetable;
2179     cachetable_fetch_pair(
2180         ct,
2181         cpargs->p->cachefile,
2182         cpargs->p,
2183         cpargs->fetch_callback,
2184         cpargs->read_extraargs,
2185         false
2186         );
2187     bjm_remove_background_job(cf->bjm);
2188     toku_free(cpargs);
2189 }
2190 
cachetable_partial_reader(void * extra)2191 static void cachetable_partial_reader(void* extra) {
2192     struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra;
2193     CACHEFILE cf = cpargs->p->cachefile;
2194     CACHETABLE ct = cf->cachetable;
2195     do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs, false);
2196     bjm_remove_background_job(cf->bjm);
2197     toku_free(cpargs);
2198 }
2199 
toku_cachefile_prefetch(CACHEFILE cf,CACHEKEY key,uint32_t fullhash,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,bool * doing_prefetch)2200 int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
2201                             CACHETABLE_WRITE_CALLBACK write_callback,
2202                             CACHETABLE_FETCH_CALLBACK fetch_callback,
2203                             CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
2204                             CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
2205                             void *read_extraargs,
2206                             bool *doing_prefetch)
2207 // Effect: See the documentation for this function in cachetable/cachetable.h
2208 {
2209     int r = 0;
2210     PAIR p = NULL;
2211     if (doing_prefetch) {
2212         *doing_prefetch = false;
2213     }
2214     CACHETABLE ct = cf->cachetable;
2215     // if cachetable has too much data, don't bother prefetching
2216     if (ct->ev.should_client_thread_sleep()) {
2217         goto exit;
2218     }
2219     ct->list.pair_lock_by_fullhash(fullhash);
2220     // lookup
2221     p = ct->list.find_pair(cf, key, fullhash);
2222     // if not found then create a pair and fetch it
2223     if (p == NULL) {
2224         cachetable_prefetches++;
2225         ct->list.pair_unlock_by_fullhash(fullhash);
2226         ct->list.write_list_lock();
2227         ct->list.pair_lock_by_fullhash(fullhash);
2228         p = ct->list.find_pair(cf, key, fullhash);
2229         if (p != NULL) {
2230             ct->list.write_list_unlock();
2231             goto found_pair;
2232         }
2233 
2234         r = bjm_add_background_job(cf->bjm);
2235         assert_zero(r);
2236         p = cachetable_insert_at(
2237             ct,
2238             cf,
2239             key,
2240             zero_value,
2241             fullhash,
2242             zero_attr,
2243             write_callback,
2244             CACHETABLE_CLEAN
2245             );
2246         assert(p);
2247         p->value_rwlock.write_lock(true);
2248         pair_unlock(p);
2249         ct->list.write_list_unlock();
2250 
2251         struct cachefile_prefetch_args *MALLOC(cpargs);
2252         cpargs->p = p;
2253         cpargs->fetch_callback = fetch_callback;
2254         cpargs->read_extraargs = read_extraargs;
2255         toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs);
2256         if (doing_prefetch) {
2257             *doing_prefetch = true;
2258         }
2259         goto exit;
2260     }
2261 
2262 found_pair:
2263     // at this point, p is found, pair's mutex is grabbed, and
2264     // no list lock is held
2265     // TODO(leif): should this also just go ahead and wait if all there
2266     // are to wait for are readers?
2267     if (p->value_rwlock.try_write_lock(true)) {
2268         // nobody else is using the node, so we should go ahead and prefetch
2269         pair_touch(p);
2270         pair_unlock(p);
2271         bool partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
2272 
2273         if (partial_fetch_required) {
2274             r = bjm_add_background_job(cf->bjm);
2275             assert_zero(r);
2276             struct cachefile_partial_prefetch_args *MALLOC(cpargs);
2277             cpargs->p = p;
2278             cpargs->pf_callback = pf_callback;
2279             cpargs->read_extraargs = read_extraargs;
2280             toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs);
2281             if (doing_prefetch) {
2282                 *doing_prefetch = true;
2283             }
2284         }
2285         else {
2286             pair_lock(p);
2287             p->value_rwlock.write_unlock();
2288             pair_unlock(p);
2289         }
2290     }
2291     else {
2292         // Couldn't get the write lock cheaply
2293         pair_unlock(p);
2294     }
2295 exit:
2296     return 0;
2297 }
2298 
toku_cachefile_verify(CACHEFILE cf)2299 void toku_cachefile_verify (CACHEFILE cf) {
2300     toku_cachetable_verify(cf->cachetable);
2301 }
2302 
toku_cachetable_verify(CACHETABLE ct)2303 void toku_cachetable_verify (CACHETABLE ct) {
2304     ct->list.verify();
2305 }
2306 
2307 
2308 
2309 struct pair_flush_for_close{
2310     PAIR p;
2311     BACKGROUND_JOB_MANAGER bjm;
2312 };
2313 
cachetable_flush_pair_for_close(void * extra)2314 static void cachetable_flush_pair_for_close(void* extra) {
2315     struct pair_flush_for_close *CAST_FROM_VOIDP(args, extra);
2316     PAIR p = args->p;
2317     CACHEFILE cf = p->cachefile;
2318     CACHETABLE ct = cf->cachetable;
2319     PAIR_ATTR attr;
2320     cachetable_only_write_locked_data(
2321         &ct->ev,
2322         p,
2323         false, // not for a checkpoint, as we assert above
2324         &attr,
2325         false // not a clone
2326         );
2327     p->dirty = CACHETABLE_CLEAN;
2328     bjm_remove_background_job(args->bjm);
2329     toku_free(args);
2330 }
2331 
2332 
flush_pair_for_close_on_background_thread(PAIR p,BACKGROUND_JOB_MANAGER bjm,CACHETABLE ct)2333 static void flush_pair_for_close_on_background_thread(
2334     PAIR p,
2335     BACKGROUND_JOB_MANAGER bjm,
2336     CACHETABLE ct
2337     )
2338 {
2339     pair_lock(p);
2340     assert(p->value_rwlock.users() == 0);
2341     assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2342     assert(!p->cloned_value_data);
2343     if (p->dirty == CACHETABLE_DIRTY) {
2344         int r = bjm_add_background_job(bjm);
2345         assert_zero(r);
2346         struct pair_flush_for_close *XMALLOC(args);
2347         args->p = p;
2348         args->bjm = bjm;
2349         toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args);
2350     }
2351     pair_unlock(p);
2352 }
2353 
remove_pair_for_close(PAIR p,CACHETABLE ct,bool completely)2354 static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) {
2355     pair_lock(p);
2356     assert(p->value_rwlock.users() == 0);
2357     assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2358     assert(!p->cloned_value_data);
2359     assert(p->dirty == CACHETABLE_CLEAN);
2360     assert(p->refcount == 0);
2361     if (completely) {
2362         cachetable_remove_pair(&ct->list, &ct->ev, p);
2363         pair_unlock(p);
2364         // TODO: Eventually, we should not hold the write list lock during free
2365         cachetable_free_pair(p);
2366     }
2367     else {
2368         // if we are not evicting completely,
2369         // we only want to remove the PAIR from the cachetable,
2370         // that is, remove from the hashtable and various linked
2371         // list, but we will keep the PAIRS and the linked list
2372         // in the cachefile intact, as they will be cached away
2373         // in case an open comes soon.
2374         ct->list.evict_from_cachetable(p);
2375         pair_unlock(p);
2376     }
2377 }
2378 
2379 // helper function for cachetable_flush_cachefile, which happens on a close
2380 // writes out the dirty pairs on background threads and returns when
2381 // the writing is done
write_dirty_pairs_for_close(CACHETABLE ct,CACHEFILE cf)2382 static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) {
2383     BACKGROUND_JOB_MANAGER bjm = NULL;
2384     bjm_init(&bjm);
2385     ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here
2386     PAIR p = NULL;
2387     // write out dirty PAIRs
2388     uint32_t i;
2389     if (cf) {
2390         for (i = 0, p = cf->cf_head;
2391             i < cf->num_pairs;
2392             i++, p = p->cf_next)
2393         {
2394             flush_pair_for_close_on_background_thread(p, bjm, ct);
2395         }
2396     }
2397     else {
2398         for (i = 0, p = ct->list.m_checkpoint_head;
2399             i < ct->list.m_n_in_table;
2400             i++, p = p->clock_next)
2401         {
2402             flush_pair_for_close_on_background_thread(p, bjm, ct);
2403         }
2404     }
2405     ct->list.write_list_unlock();
2406     bjm_wait_for_jobs_to_finish(bjm);
2407     bjm_destroy(bjm);
2408 }
2409 
remove_all_pairs_for_close(CACHETABLE ct,CACHEFILE cf,bool evict_completely)2410 static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf, bool evict_completely) {
2411     ct->list.write_list_lock();
2412     if (cf) {
2413         if (evict_completely) {
2414             // if we are evicting completely, then the PAIRs will
2415             // be removed from the linked list managed by the
2416             // cachefile, so this while loop works
2417             while (cf->num_pairs > 0) {
2418                 PAIR p = cf->cf_head;
2419                 remove_pair_for_close(p, ct, evict_completely);
2420             }
2421         }
2422         else {
2423             // on the other hand, if we are not evicting completely,
2424             // then the cachefile's linked list stays intact, and we must
2425             // iterate like this.
2426             for (PAIR p = cf->cf_head; p; p = p->cf_next) {
2427                 remove_pair_for_close(p, ct, evict_completely);
2428             }
2429         }
2430     }
2431     else {
2432         while (ct->list.m_n_in_table > 0) {
2433             PAIR p = ct->list.m_checkpoint_head;
2434             // if there is no cachefile, then we better
2435             // be evicting completely because we have no
2436             // cachefile to save the PAIRs to. At least,
2437             // we have no guarantees that the cachefile
2438             // will remain good
2439             invariant(evict_completely);
2440             remove_pair_for_close(p, ct, true);
2441         }
2442     }
2443     ct->list.write_list_unlock();
2444 }
2445 
verify_cachefile_flushed(CACHETABLE ct UU (),CACHEFILE cf UU ())2446 static void verify_cachefile_flushed(CACHETABLE ct UU(), CACHEFILE cf UU()) {
2447 #ifdef TOKU_DEBUG_PARANOID
2448     // assert here that cachefile is flushed by checking
2449     // pair_list and finding no pairs belonging to this cachefile
2450     // Make a list of pairs that belong to this cachefile.
2451     if (cf) {
2452         ct->list.write_list_lock();
2453         // assert here that cachefile is flushed by checking
2454         // pair_list and finding no pairs belonging to this cachefile
2455         // Make a list of pairs that belong to this cachefile.
2456         uint32_t i;
2457         PAIR p = NULL;
2458         for (i = 0, p = ct->list.m_checkpoint_head;
2459              i < ct->list.m_n_in_table;
2460              i++, p = p->clock_next)
2461          {
2462              assert(p->cachefile != cf);
2463          }
2464          ct->list.write_list_unlock();
2465     }
2466 #endif
2467 }
2468 
2469 // Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
2470 // the cachefile is NULL.
2471 // Must be holding cachetable lock on entry.
2472 //
2473 // This function assumes that no client thread is accessing or
2474 // trying to access the cachefile while this function is executing.
2475 // This implies no client thread will be trying to lock any nodes
2476 // belonging to the cachefile.
2477 //
2478 // This function also assumes that the cachefile is not in the process
2479 // of being used by a checkpoint. If a checkpoint is currently happening,
2480 // it does NOT include this cachefile.
2481 //
cachetable_flush_cachefile(CACHETABLE ct,CACHEFILE cf,bool evict_completely)2482 static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf, bool evict_completely) {
2483     //
2484     // Because work on a kibbutz is always done by the client thread,
2485     // and this function assumes that no client thread is doing any work
2486     // on the cachefile, we assume that no client thread will be adding jobs
2487     // to this cachefile's kibbutz.
2488     //
2489     // The caller of this function must ensure that there are
2490     // no jobs added to the kibbutz. This implies that the only work other
2491     // threads may be doing is work by the writer threads.
2492     //
2493     // first write out dirty PAIRs
2494     write_dirty_pairs_for_close(ct, cf);
2495 
2496     // now that everything is clean, get rid of everything
2497     remove_all_pairs_for_close(ct, cf, evict_completely);
2498 
2499     verify_cachefile_flushed(ct, cf);
2500 }
2501 
2502 /* Requires that no locks be held that are used by the checkpoint logic */
2503 void
toku_cachetable_minicron_shutdown(CACHETABLE ct)2504 toku_cachetable_minicron_shutdown(CACHETABLE ct) {
2505     int  r = ct->cp.shutdown();
2506     assert(r==0);
2507     ct->cl.destroy();
2508 }
2509 
toku_cachetable_prepare_close(CACHETABLE ct UU ())2510 void toku_cachetable_prepare_close(CACHETABLE ct UU()) {
2511     extern bool toku_serialize_in_parallel;
2512     toku_unsafe_set(&toku_serialize_in_parallel, true);
2513 }
2514 
2515 /* Requires that it all be flushed. */
toku_cachetable_close(CACHETABLE * ctp)2516 void toku_cachetable_close (CACHETABLE *ctp) {
2517     CACHETABLE ct = *ctp;
2518     ct->cp.destroy();
2519     ct->cl.destroy();
2520     ct->cf_list.free_stale_data(&ct->ev);
2521     cachetable_flush_cachefile(ct, NULL, true);
2522     ct->ev.destroy();
2523     ct->list.destroy();
2524     ct->cf_list.destroy();
2525 
2526     if (ct->client_kibbutz)
2527         toku_kibbutz_destroy(ct->client_kibbutz);
2528     if (ct->ct_kibbutz)
2529         toku_kibbutz_destroy(ct->ct_kibbutz);
2530     if (ct->checkpointing_kibbutz)
2531         toku_kibbutz_destroy(ct->checkpointing_kibbutz);
2532     toku_free(ct->env_dir);
2533     toku_free(ct);
2534     *ctp = 0;
2535 }
2536 
test_get_pair(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,bool have_ct_lock)2537 static PAIR test_get_pair(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, bool have_ct_lock) {
2538     CACHETABLE ct = cachefile->cachetable;
2539 
2540     if (!have_ct_lock) {
2541         ct->list.read_list_lock();
2542     }
2543 
2544     PAIR p = ct->list.find_pair(cachefile, key, fullhash);
2545     assert(p != NULL);
2546     if (!have_ct_lock) {
2547         ct->list.read_list_unlock();
2548     }
2549     return p;
2550 }
2551 
2552 //test-only wrapper
toku_test_cachetable_unpin(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,enum cachetable_dirty dirty,PAIR_ATTR attr)2553 int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
2554     // By default we don't have the lock
2555     PAIR p = test_get_pair(cachefile, key, fullhash, false);
2556     return toku_cachetable_unpin(cachefile, p, dirty, attr); // assume read lock is not grabbed, and that it is a write lock
2557 }
2558 
2559 //test-only wrapper
toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,enum cachetable_dirty dirty,PAIR_ATTR attr)2560 int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
2561     // We hold the cachetable mutex.
2562     PAIR p = test_get_pair(cachefile, key, fullhash, true);
2563     return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr);
2564 }
2565 
2566 //test-only wrapper
toku_test_cachetable_unpin_and_remove(CACHEFILE cachefile,CACHEKEY key,CACHETABLE_REMOVE_KEY remove_key,void * remove_key_extra)2567 int toku_test_cachetable_unpin_and_remove (
2568     CACHEFILE cachefile,
2569     CACHEKEY key,
2570     CACHETABLE_REMOVE_KEY remove_key,
2571     void* remove_key_extra)
2572 {
2573     uint32_t fullhash = toku_cachetable_hash(cachefile, key);
2574     PAIR p = test_get_pair(cachefile, key, fullhash, false);
2575     return toku_cachetable_unpin_and_remove(cachefile, p, remove_key, remove_key_extra);
2576 }
2577 
toku_cachetable_unpin_and_remove(CACHEFILE cachefile,PAIR p,CACHETABLE_REMOVE_KEY remove_key,void * remove_key_extra)2578 int toku_cachetable_unpin_and_remove (
2579     CACHEFILE cachefile,
2580     PAIR p,
2581     CACHETABLE_REMOVE_KEY remove_key,
2582     void* remove_key_extra
2583     )
2584 {
2585     invariant_notnull(p);
2586     int r = ENOENT;
2587     CACHETABLE ct = cachefile->cachetable;
2588 
2589     p->dirty = CACHETABLE_CLEAN; // clear the dirty bit.  We're just supposed to remove it.
2590     // grab disk_nb_mutex to ensure any background thread writing
2591     // out a cloned value completes
2592     pair_lock(p);
2593     assert(p->value_rwlock.writers());
2594     nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
2595     pair_unlock(p);
2596     assert(p->cloned_value_data == NULL);
2597 
2598     //
2599     // take care of key removal
2600     //
2601     ct->list.write_list_lock();
2602     ct->list.read_pending_cheap_lock();
2603     bool for_checkpoint = p->checkpoint_pending;
2604     // now let's wipe out the pending bit, because we are
2605     // removing the PAIR
2606     p->checkpoint_pending = false;
2607 
2608     // For the PAIR to not be picked by the
2609     // cleaner thread, we mark the cachepressure_size to be 0
2610     // (This is redundant since we have the write_list_lock)
2611     // This should not be an issue because we call
2612     // cachetable_remove_pair before
2613     // releasing the cachetable lock.
2614     //
2615     CACHEKEY key_to_remove = p->key;
2616     p->attr.cache_pressure_size = 0;
2617     //
2618     // callback for removing the key
2619     // for FTNODEs, this leads to calling
2620     // toku_free_blocknum
2621     //
2622     if (remove_key) {
2623         remove_key(
2624             &key_to_remove,
2625             for_checkpoint,
2626             remove_key_extra
2627             );
2628     }
2629     ct->list.read_pending_cheap_unlock();
2630 
2631     pair_lock(p);
2632     p->value_rwlock.write_unlock();
2633     nb_mutex_unlock(&p->disk_nb_mutex);
2634     //
2635     // As of Clayface (6.5), only these threads may be
2636     // blocked waiting to lock this PAIR:
2637     //  - the checkpoint thread (because a checkpoint is in progress
2638     //     and the PAIR was in the list of pending pairs)
2639     //  - a client thread running get_and_pin_nonblocking, who
2640     //     ran unlockers, then waited on the PAIR lock.
2641     //     While waiting on a PAIR lock, another thread comes in,
2642     //     locks the PAIR, and ends up calling unpin_and_remove,
2643     //     all while get_and_pin_nonblocking is waiting on the PAIR lock.
2644     //     We did not realize this at first, which caused bug #4357
2645     // The following threads CANNOT be blocked waiting on
2646     // the PAIR lock:
2647     //  - a thread trying to run eviction via run_eviction.
2648     //     That cannot happen because run_eviction only
2649     //     attempts to lock PAIRS that are not locked, and this PAIR
2650     //     is locked.
2651     //  - cleaner thread, for the same reason as a thread running
2652     //     eviction
2653     //  - client thread doing a normal get_and_pin. The client is smart
2654     //     enough to not try to lock a PAIR that another client thread
2655     //     is trying to unpin and remove. Note that this includes work
2656     //     done on kibbutzes.
2657     //  - writer thread. Writer threads do not grab PAIR locks. They
2658     //     get PAIR locks transferred to them by client threads.
2659     //
2660 
2661     // first thing we do is remove the PAIR from the various
2662     // cachetable data structures, so no other thread can possibly
2663     // access it. We do not want to risk some other thread
2664     // trying to lock this PAIR if we release the write list lock
2665     // below. If some thread is already waiting on the lock,
2666     // then we let that thread grab the lock and finish, but
2667     // we don't want any NEW threads to try to grab the PAIR
2668     // lock.
2669     //
2670     // Because we call cachetable_remove_pair and wait,
2671     // the threads that may be waiting
2672     // on this PAIR lock must be careful to do NOTHING with the PAIR
2673     // As per our analysis above, we only need
2674     // to make sure the checkpoint thread and get_and_pin_nonblocking do
2675     // nothing, and looking at those functions, it is clear they do nothing.
2676     //
2677     cachetable_remove_pair(&ct->list, &ct->ev, p);
2678     ct->list.write_list_unlock();
2679     if (p->refcount > 0) {
2680         pair_wait_for_ref_release_unlocked(p);
2681     }
2682     if (p->value_rwlock.users() > 0) {
2683         // Need to wait for everyone else to leave
2684         // This write lock will be granted only after all waiting
2685         // threads are done.
2686         p->value_rwlock.write_lock(true);
2687         assert(p->refcount == 0);
2688         assert(p->value_rwlock.users() == 1);  // us
2689         assert(!p->checkpoint_pending);
2690         assert(p->attr.cache_pressure_size == 0);
2691         p->value_rwlock.write_unlock();
2692     }
2693     // just a sanity check
2694     assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2695     assert(p->cloned_value_data == NULL);
2696     //Remove pair.
2697     pair_unlock(p);
2698     cachetable_free_pair(p);
2699     r = 0;
2700     return r;
2701 }
2702 
2703 int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array);
set_filenum_in_array(const FT & ft,const uint32_t index,FILENUM * const array)2704 int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array) {
2705     array[index] = toku_cachefile_filenum(ft->cf);
2706     return 0;
2707 }
2708 
log_open_txn(TOKUTXN txn,void * extra)2709 static int log_open_txn (TOKUTXN txn, void* extra) {
2710     int r;
2711     checkpointer* cp = (checkpointer *)extra;
2712     TOKULOGGER logger = txn->logger;
2713     FILENUMS open_filenums;
2714     uint32_t num_filenums = txn->open_fts.size();
2715     FILENUM array[num_filenums];
2716     if (toku_txn_is_read_only(txn)) {
2717         goto cleanup;
2718     }
2719     else {
2720         cp->increment_num_txns();
2721     }
2722 
2723     open_filenums.num      = num_filenums;
2724     open_filenums.filenums = array;
2725     //Fill in open_filenums
2726     r = txn->open_fts.iterate<FILENUM, set_filenum_in_array>(array);
2727     invariant(r==0);
2728     switch (toku_txn_get_state(txn)) {
2729     case TOKUTXN_LIVE:{
2730         toku_log_xstillopen(logger, NULL, 0, txn,
2731                             toku_txn_get_txnid(txn),
2732                             toku_txn_get_txnid(toku_logger_txn_parent(txn)),
2733                             txn->roll_info.rollentry_raw_count,
2734                             open_filenums,
2735                             txn->force_fsync_on_commit,
2736                             txn->roll_info.num_rollback_nodes,
2737                             txn->roll_info.num_rollentries,
2738                             txn->roll_info.spilled_rollback_head,
2739                             txn->roll_info.spilled_rollback_tail,
2740                             txn->roll_info.current_rollback);
2741         goto cleanup;
2742     }
2743     case TOKUTXN_PREPARING: {
2744         TOKU_XA_XID xa_xid;
2745         toku_txn_get_prepared_xa_xid(txn, &xa_xid);
2746         toku_log_xstillopenprepared(logger, NULL, 0, txn,
2747                                     toku_txn_get_txnid(txn),
2748                                     &xa_xid,
2749                                     txn->roll_info.rollentry_raw_count,
2750                                     open_filenums,
2751                                     txn->force_fsync_on_commit,
2752                                     txn->roll_info.num_rollback_nodes,
2753                                     txn->roll_info.num_rollentries,
2754                                     txn->roll_info.spilled_rollback_head,
2755                                     txn->roll_info.spilled_rollback_tail,
2756                                     txn->roll_info.current_rollback);
2757         goto cleanup;
2758     }
2759     case TOKUTXN_RETIRED:
2760     case TOKUTXN_COMMITTING:
2761     case TOKUTXN_ABORTING: {
2762         assert(0);
2763     }
2764     }
2765     // default is an error
2766     assert(0);
2767 cleanup:
2768     return 0;
2769 }
2770 
2771 // Requires:   All three checkpoint-relevant locks must be held (see checkpoint.c).
2772 // Algorithm:  Write a checkpoint record to the log, noting the LSN of that record.
2773 //             Use the begin_checkpoint callback to take necessary snapshots (header, btt)
2774 //             Mark every dirty node as "pending."  ("Pending" means that the node must be
2775 //                                                    written to disk before it can be modified.)
toku_cachetable_begin_checkpoint(CHECKPOINTER cp,TOKULOGGER UU (logger))2776 void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, TOKULOGGER UU(logger)) {
2777     cp->begin_checkpoint();
2778 }
2779 
2780 
2781 // This is used by the cachetable_race test.
2782 static volatile int toku_checkpointing_user_data_status = 0;
toku_cachetable_set_checkpointing_user_data_status(int v)2783 static void toku_cachetable_set_checkpointing_user_data_status (int v) {
2784     toku_checkpointing_user_data_status = v;
2785 }
toku_cachetable_get_checkpointing_user_data_status(void)2786 int toku_cachetable_get_checkpointing_user_data_status (void) {
2787     return toku_checkpointing_user_data_status;
2788 }
2789 
2790 // Requires:   The big checkpoint lock must be held (see checkpoint.c).
2791 // Algorithm:  Write all pending nodes to disk
2792 //             Use checkpoint callback to write snapshot information to disk (header, btt)
2793 //             Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks
2794 // Note:       If testcallback is null (for testing purposes only), call it after writing dictionary but before writing log
toku_cachetable_end_checkpoint(CHECKPOINTER cp,TOKULOGGER UU (logger),void (* testcallback_f)(void *),void * testextra)2795 void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger),
2796                                void (*testcallback_f)(void*),  void* testextra) {
2797     cp->end_checkpoint(testcallback_f, testextra);
2798 }
2799 
toku_cachefile_logger(CACHEFILE cf)2800 TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
2801     return cf->cachetable->cp.get_logger();
2802 }
2803 
toku_cachefile_filenum(CACHEFILE cf)2804 FILENUM toku_cachefile_filenum (CACHEFILE cf) {
2805     return cf->filenum;
2806 }
2807 
2808 // debug functions
2809 
toku_cachetable_assert_all_unpinned(CACHETABLE ct)2810 int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
2811     uint32_t i;
2812     int some_pinned=0;
2813     ct->list.read_list_lock();
2814     for (i=0; i<ct->list.m_table_size; i++) {
2815         PAIR p;
2816         for (p=ct->list.m_table[i]; p; p=p->hash_chain) {
2817             pair_lock(p);
2818             if (p->value_rwlock.users()) {
2819                 //printf("%s:%d pinned: %" PRId64 " (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data);
2820                 some_pinned=1;
2821             }
2822             pair_unlock(p);
2823         }
2824     }
2825     ct->list.read_list_unlock();
2826     return some_pinned;
2827 }
2828 
toku_cachefile_count_pinned(CACHEFILE cf,int print_them)2829 int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
2830     assert(cf != NULL);
2831     int n_pinned=0;
2832     CACHETABLE ct = cf->cachetable;
2833     ct->list.read_list_lock();
2834 
2835     // Iterate over all the pairs to find pairs specific to the
2836     // given cachefile.
2837     for (uint32_t i = 0; i < ct->list.m_table_size; i++) {
2838         for (PAIR p = ct->list.m_table[i]; p; p = p->hash_chain) {
2839             if (p->cachefile == cf) {
2840                 pair_lock(p);
2841                 if (p->value_rwlock.users()) {
2842                     if (print_them) {
2843                         printf("%s:%d pinned: %" PRId64 " (%p)\n",
2844                                 __FILE__,
2845                                 __LINE__,
2846                                 p->key.b,
2847                                 p->value_data);
2848                     }
2849                     n_pinned++;
2850                 }
2851                 pair_unlock(p);
2852             }
2853         }
2854     }
2855 
2856     ct->list.read_list_unlock();
2857     return n_pinned;
2858 }
2859 
toku_cachetable_print_state(CACHETABLE ct)2860 void toku_cachetable_print_state (CACHETABLE ct) {
2861     uint32_t i;
2862     ct->list.read_list_lock();
2863     for (i=0; i<ct->list.m_table_size; i++) {
2864         PAIR p = ct->list.m_table[i];
2865         if (p != 0) {
2866             pair_lock(p);
2867             printf("t[%u]=", i);
2868             for (p=ct->list.m_table[i]; p; p=p->hash_chain) {
2869                 printf(" {%" PRId64 ", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, (int) p->dirty, p->value_rwlock.users(), p->attr.size);
2870             }
2871             printf("\n");
2872             pair_unlock(p);
2873         }
2874     }
2875     ct->list.read_list_unlock();
2876 }
2877 
toku_cachetable_get_state(CACHETABLE ct,int * num_entries_ptr,int * hash_size_ptr,long * size_current_ptr,long * size_limit_ptr)2878 void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) {
2879     ct->list.get_state(num_entries_ptr, hash_size_ptr);
2880     ct->ev.get_state(size_current_ptr, size_limit_ptr);
2881 }
2882 
toku_cachetable_get_key_state(CACHETABLE ct,CACHEKEY key,CACHEFILE cf,void ** value_ptr,int * dirty_ptr,long long * pin_ptr,long * size_ptr)2883 int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
2884                                    int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
2885     int r = -1;
2886     uint32_t fullhash = toku_cachetable_hash(cf, key);
2887     ct->list.read_list_lock();
2888     PAIR p = ct->list.find_pair(cf, key, fullhash);
2889     if (p) {
2890         pair_lock(p);
2891         if (value_ptr)
2892             *value_ptr = p->value_data;
2893         if (dirty_ptr)
2894             *dirty_ptr = p->dirty;
2895         if (pin_ptr)
2896             *pin_ptr = p->value_rwlock.users();
2897         if (size_ptr)
2898             *size_ptr = p->attr.size;
2899         r = 0;
2900         pair_unlock(p);
2901     }
2902     ct->list.read_list_unlock();
2903     return r;
2904 }
2905 
2906 void
toku_cachefile_set_userdata(CACHEFILE cf,void * userdata,void (* log_fassociate_during_checkpoint)(CACHEFILE,void *),void (* close_userdata)(CACHEFILE,int,void *,bool,LSN),void (* free_userdata)(CACHEFILE,void *),void (* checkpoint_userdata)(CACHEFILE,int,void *),void (* begin_checkpoint_userdata)(LSN,void *),void (* end_checkpoint_userdata)(CACHEFILE,int,void *),void (* note_pin_by_checkpoint)(CACHEFILE,void *),void (* note_unpin_by_checkpoint)(CACHEFILE,void *))2907 toku_cachefile_set_userdata (CACHEFILE cf,
2908                              void *userdata,
2909                              void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
2910                              void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
2911                              void (*free_userdata)(CACHEFILE, void*),
2912                              void (*checkpoint_userdata)(CACHEFILE, int, void*),
2913                              void (*begin_checkpoint_userdata)(LSN, void*),
2914                              void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
2915                              void (*note_pin_by_checkpoint)(CACHEFILE, void*),
2916                              void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
2917     cf->userdata = userdata;
2918     cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
2919     cf->close_userdata = close_userdata;
2920     cf->free_userdata = free_userdata;
2921     cf->checkpoint_userdata = checkpoint_userdata;
2922     cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
2923     cf->end_checkpoint_userdata = end_checkpoint_userdata;
2924     cf->note_pin_by_checkpoint = note_pin_by_checkpoint;
2925     cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint;
2926 }
2927 
toku_cachefile_get_userdata(CACHEFILE cf)2928 void *toku_cachefile_get_userdata(CACHEFILE cf) {
2929     return cf->userdata;
2930 }
2931 
2932 CACHETABLE
toku_cachefile_get_cachetable(CACHEFILE cf)2933 toku_cachefile_get_cachetable(CACHEFILE cf) {
2934     return cf->cachetable;
2935 }
2936 
toku_pair_get_cachefile(PAIR pair)2937 CACHEFILE toku_pair_get_cachefile(PAIR pair) {
2938     return pair->cachefile;
2939 }
2940 
2941 //Only called by ft_end_checkpoint
2942 //Must have access to cf->fd (must be protected)
toku_cachefile_fsync(CACHEFILE cf)2943 void toku_cachefile_fsync(CACHEFILE cf) {
2944     toku_file_fsync(cf->fd);
2945 }
2946 
2947 // Make it so when the cachefile closes, the underlying file is unlinked
toku_cachefile_unlink_on_close(CACHEFILE cf)2948 void toku_cachefile_unlink_on_close(CACHEFILE cf) {
2949     assert(!cf->unlink_on_close);
2950     cf->unlink_on_close = true;
2951 }
2952 
2953 // is this cachefile marked as unlink on close?
toku_cachefile_is_unlink_on_close(CACHEFILE cf)2954 bool toku_cachefile_is_unlink_on_close(CACHEFILE cf) {
2955     return cf->unlink_on_close;
2956 }
2957 
toku_cachefile_skip_log_recover_on_close(CACHEFILE cf)2958 void toku_cachefile_skip_log_recover_on_close(CACHEFILE cf) {
2959     cf->skip_log_recover_on_close = true;
2960 }
2961 
toku_cachefile_do_log_recover_on_close(CACHEFILE cf)2962 void toku_cachefile_do_log_recover_on_close(CACHEFILE cf) {
2963     cf->skip_log_recover_on_close = false;
2964 }
2965 
toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf)2966 bool toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf) {
2967     return cf->skip_log_recover_on_close;
2968 }
2969 
toku_cachefile_size(CACHEFILE cf)2970 uint64_t toku_cachefile_size(CACHEFILE cf) {
2971     int64_t file_size;
2972     int fd = toku_cachefile_get_fd(cf);
2973     int r = toku_os_get_file_size(fd, &file_size);
2974     assert_zero(r);
2975     return file_size;
2976 }
2977 
2978 char *
toku_construct_full_name(int count,...)2979 toku_construct_full_name(int count, ...) {
2980     va_list ap;
2981     char *name = NULL;
2982     size_t n = 0;
2983     int i;
2984     va_start(ap, count);
2985     for (i=0; i<count; i++) {
2986         char *arg = va_arg(ap, char *);
2987         if (arg) {
2988             n += 1 + strlen(arg) + 1;
2989             char *XMALLOC_N(n, newname);
2990             if (name && !toku_os_is_absolute_name(arg))
2991                 snprintf(newname, n, "%s/%s", name, arg);
2992             else
2993                 snprintf(newname, n, "%s", arg);
2994             toku_free(name);
2995             name = newname;
2996         }
2997     }
2998     va_end(ap);
2999 
3000     return name;
3001 }
3002 
3003 char *
toku_cachetable_get_fname_in_cwd(CACHETABLE ct,const char * fname_in_env)3004 toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env) {
3005     return toku_construct_full_name(2, ct->env_dir, fname_in_env);
3006 }
3007 
3008 static long
cleaner_thread_rate_pair(PAIR p)3009 cleaner_thread_rate_pair(PAIR p)
3010 {
3011     return p->attr.cache_pressure_size;
3012 }
3013 
3014 static int const CLEANER_N_TO_CHECK = 8;
3015 
toku_cleaner_thread_for_test(CACHETABLE ct)3016 int toku_cleaner_thread_for_test (CACHETABLE ct) {
3017     return ct->cl.run_cleaner();
3018 }
3019 
toku_cleaner_thread(void * cleaner_v)3020 int toku_cleaner_thread (void *cleaner_v) {
3021     cleaner* cl = (cleaner *) cleaner_v;
3022     assert(cl);
3023     return cl->run_cleaner();
3024 }
3025 
3026 /////////////////////////////////////////////////////////////////////////
3027 //
3028 // cleaner methods
3029 //
3030 ENSURE_POD(cleaner);
3031 
3032 extern uint force_recovery;
3033 
init(uint32_t _cleaner_iterations,pair_list * _pl,CACHETABLE _ct)3034 int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) {
3035     // default is no cleaner, for now
3036     m_cleaner_cron_init = false;
3037     if (force_recovery) return 0;
3038     int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this);
3039     if (r == 0) {
3040         m_cleaner_cron_init = true;
3041     }
3042     TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations);
3043     m_cleaner_iterations = _cleaner_iterations;
3044     m_pl = _pl;
3045     m_ct = _ct;
3046     m_cleaner_init = true;
3047     return r;
3048 }
3049 
3050 // this function is allowed to be called multiple times
destroy(void)3051 void cleaner::destroy(void) {
3052     if (!m_cleaner_init) {
3053         return;
3054     }
3055     if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) {
3056         // for test code only, production code uses toku_cachetable_minicron_shutdown()
3057         int r = toku_minicron_shutdown(&m_cleaner_cron);
3058         assert(r==0);
3059     }
3060 }
3061 
get_iterations(void)3062 uint32_t cleaner::get_iterations(void) {
3063     return m_cleaner_iterations;
3064 }
3065 
set_iterations(uint32_t new_iterations)3066 void cleaner::set_iterations(uint32_t new_iterations) {
3067     m_cleaner_iterations = new_iterations;
3068 }
3069 
get_period_unlocked(void)3070 uint32_t cleaner::get_period_unlocked(void) {
3071     return toku_minicron_get_period_in_seconds_unlocked(&m_cleaner_cron);
3072 }
3073 
3074 //
3075 // Sets how often the cleaner thread will run, in seconds
3076 //
set_period(uint32_t new_period)3077 void cleaner::set_period(uint32_t new_period) {
3078     toku_minicron_change_period(&m_cleaner_cron, new_period*1000);
3079 }
3080 
3081 // Effect:  runs a cleaner.
3082 //
3083 // We look through some number of nodes, the first N that we see which are
3084 // unlocked and are not involved in a cachefile flush, pick one, and call
3085 // the cleaner callback.  While we're picking a node, we have the
3086 // cachetable lock the whole time, so we don't need any extra
3087 // synchronization.  Once we have one we want, we lock it and notify the
3088 // cachefile that we're doing some background work (so a flush won't
3089 // start).  At this point, we can safely unlock the cachetable, do the
3090 // work (callback), and unlock/release our claim to the cachefile.
run_cleaner(void)3091 int cleaner::run_cleaner(void) {
3092     toku::context cleaner_ctx(CTX_CLEANER);
3093 
3094     int r;
3095     uint32_t num_iterations = this->get_iterations();
3096     for (uint32_t i = 0; i < num_iterations; ++i) {
3097         cleaner_executions++;
3098         m_pl->read_list_lock();
3099         PAIR best_pair = NULL;
3100         int n_seen = 0;
3101         long best_score = 0;
3102         const PAIR first_pair = m_pl->m_cleaner_head;
3103         if (first_pair == NULL) {
3104             // nothing in the cachetable, just get out now
3105             m_pl->read_list_unlock();
3106             break;
3107         }
3108         // here we select a PAIR for cleaning
3109         // look at some number of PAIRS, and
3110         // pick what we think is the best one for cleaning
3111         //***** IMPORTANT ******
3112         // we MUST not pick a PAIR whose rating is 0. We have
3113         // numerous assumptions in other parts of the code that
3114         // this is the case:
3115         //  - this is how rollback nodes and leaf nodes are not selected for cleaning
3116         //  - this is how a thread that is calling unpin_and_remove will prevent
3117         //     the cleaner thread from picking its PAIR (see comments in that function)
3118         do {
3119             //
3120             // We are already holding onto best_pair, if we run across a pair that
3121             // has the same mutex due to a collision in the hashtable, we need
3122             // to be careful.
3123             //
3124             if (best_pair && m_pl->m_cleaner_head->mutex == best_pair->mutex) {
3125                 // Advance the cleaner head.
3126                 long score = 0;
3127                 // only bother with this pair if it has no current users
3128                 if (m_pl->m_cleaner_head->value_rwlock.users() == 0) {
3129                     score = cleaner_thread_rate_pair(m_pl->m_cleaner_head);
3130                     if (score > best_score) {
3131                         best_score = score;
3132                         best_pair = m_pl->m_cleaner_head;
3133                     }
3134                 }
3135                 m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next;
3136                 continue;
3137             }
3138             pair_lock(m_pl->m_cleaner_head);
3139             if (m_pl->m_cleaner_head->value_rwlock.users() > 0) {
3140                 pair_unlock(m_pl->m_cleaner_head);
3141             }
3142             else {
3143                 n_seen++;
3144                 long score = 0;
3145                 score = cleaner_thread_rate_pair(m_pl->m_cleaner_head);
3146                 if (score > best_score) {
3147                     best_score = score;
3148                     // Since we found a new best pair, we need to
3149                     // free the old best pair.
3150                     if (best_pair) {
3151                         pair_unlock(best_pair);
3152                     }
3153                     best_pair = m_pl->m_cleaner_head;
3154                 }
3155                 else {
3156                     pair_unlock(m_pl->m_cleaner_head);
3157                 }
3158             }
3159             // Advance the cleaner head.
3160             m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next;
3161         } while (m_pl->m_cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK);
3162         m_pl->read_list_unlock();
3163 
3164         //
3165         // at this point, if we have found a PAIR for cleaning,
3166         // that is, best_pair != NULL, we do the clean
3167         //
3168         // if best_pair !=NULL, then best_pair->mutex is held
3169         // no list lock is held
3170         //
3171         if (best_pair) {
3172             CACHEFILE cf = best_pair->cachefile;
3173             // try to add a background job to the manager
3174             // if we can't, that means the cachefile is flushing, so
3175             // we simply continue the for loop and this iteration
3176             // becomes a no-op
3177             r = bjm_add_background_job(cf->bjm);
3178             if (r) {
3179                 pair_unlock(best_pair);
3180                 continue;
3181             }
3182             best_pair->value_rwlock.write_lock(true);
3183             pair_unlock(best_pair);
3184             // verify a key assumption.
3185             assert(cleaner_thread_rate_pair(best_pair) > 0);
3186             // check the checkpoint_pending bit
3187             m_pl->read_pending_cheap_lock();
3188             bool checkpoint_pending = best_pair->checkpoint_pending;
3189             best_pair->checkpoint_pending = false;
3190             m_pl->read_pending_cheap_unlock();
3191             if (checkpoint_pending) {
3192                 write_locked_pair_for_checkpoint(m_ct, best_pair, true);
3193             }
3194 
3195             bool cleaner_callback_called = false;
3196 
3197             // it's theoretically possible that after writing a PAIR for checkpoint, the
3198             // PAIR's heuristic tells us nothing needs to be done. It is not possible
3199             // in Dr. Noga, but unit tests verify this behavior works properly.
3200             if (cleaner_thread_rate_pair(best_pair) > 0) {
3201                 r = best_pair->cleaner_callback(best_pair->value_data,
3202                                                     best_pair->key,
3203                                                     best_pair->fullhash,
3204                                                     best_pair->write_extraargs);
3205                 assert_zero(r);
3206                 cleaner_callback_called = true;
3207             }
3208 
3209             // The cleaner callback must have unlocked the pair, so we
3210             // don't need to unlock it if the cleaner callback is called.
3211             if (!cleaner_callback_called) {
3212                 pair_lock(best_pair);
3213                 best_pair->value_rwlock.write_unlock();
3214                 pair_unlock(best_pair);
3215             }
3216             // We need to make sure the cachefile sticks around so a close
3217             // can't come destroy it.  That's the purpose of this
3218             // "add/remove_background_job" business, which means the
3219             // cachefile is still valid here, even though the cleaner
3220             // callback unlocks the pair.
3221             bjm_remove_background_job(cf->bjm);
3222         }
3223         else {
3224             // If we didn't find anything this time around the cachetable,
3225             // we probably won't find anything if we run around again, so
3226             // just break out from the for-loop now and
3227             // we'll try again when the cleaner thread runs again.
3228             break;
3229         }
3230     }
3231     return 0;
3232 }
3233 
3234 static_assert(std::is_pod<pair_list>::value, "pair_list isn't POD");
3235 
3236 const uint32_t INITIAL_PAIR_LIST_SIZE = 1<<20;
3237 uint32_t PAIR_LOCK_SIZE = 1<<20;
3238 
toku_pair_list_set_lock_size(uint32_t num_locks)3239 void toku_pair_list_set_lock_size(uint32_t num_locks) {
3240     PAIR_LOCK_SIZE = num_locks;
3241 }
3242 
evict_pair_from_cachefile(PAIR p)3243 static void evict_pair_from_cachefile(PAIR p) {
3244     CACHEFILE cf = p->cachefile;
3245     if (p->cf_next) {
3246         p->cf_next->cf_prev = p->cf_prev;
3247     }
3248     if (p->cf_prev) {
3249         p->cf_prev->cf_next = p->cf_next;
3250     }
3251     else if (p->cachefile->cf_head == p) {
3252         cf->cf_head = p->cf_next;
3253     }
3254     p->cf_prev = p->cf_next = NULL;
3255     cf->num_pairs--;
3256 }
3257 
3258 // Allocates the hash table of pairs inside this pair list.
3259 //
init()3260 void pair_list::init() {
3261     m_table_size = INITIAL_PAIR_LIST_SIZE;
3262     m_num_locks = PAIR_LOCK_SIZE;
3263     m_n_in_table = 0;
3264     m_clock_head = NULL;
3265     m_cleaner_head = NULL;
3266     m_checkpoint_head = NULL;
3267     m_pending_head = NULL;
3268     m_table = NULL;
3269 
3270 
3271     pthread_rwlockattr_t attr;
3272     pthread_rwlockattr_init(&attr);
3273 #if defined(HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP)
3274     pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3275 #else
3276 // TODO: need to figure out how to make writer-preferential rwlocks
3277 // happen on osx
3278 #endif
3279     toku_pthread_rwlock_init(*cachetable_m_list_lock_key, &m_list_lock, &attr);
3280     toku_pthread_rwlock_init(*cachetable_m_pending_lock_expensive_key,
3281                              &m_pending_lock_expensive,
3282                              &attr);
3283     toku_pthread_rwlock_init(
3284         *cachetable_m_pending_lock_cheap_key, &m_pending_lock_cheap, &attr);
3285     XCALLOC_N(m_table_size, m_table);
3286     XCALLOC_N(m_num_locks, m_mutexes);
3287     for (uint64_t i = 0; i < m_num_locks; i++) {
3288         toku_mutex_init(
3289 #ifdef TOKU_PFS_MUTEX_EXTENDED_CACHETABLEMMUTEX
3290             *cachetable_m_mutex_key,
3291 #else
3292             toku_uninstrumented,
3293 #endif
3294             &m_mutexes[i].aligned_mutex,
3295             nullptr);
3296     }
3297 }
3298 
3299 // Frees the pair_list hash table.  It is expected to be empty by
3300 // the time this is called.  Returns an error if there are any
3301 // pairs in any of the hash table slots.
destroy()3302 void pair_list::destroy() {
3303     // Check if any entries exist in the hash table.
3304     for (uint32_t i = 0; i < m_table_size; ++i) {
3305         invariant_null(m_table[i]);
3306     }
3307     for (uint64_t i = 0; i < m_num_locks; i++) {
3308         toku_mutex_destroy(&m_mutexes[i].aligned_mutex);
3309     }
3310     toku_pthread_rwlock_destroy(&m_list_lock);
3311     toku_pthread_rwlock_destroy(&m_pending_lock_expensive);
3312     toku_pthread_rwlock_destroy(&m_pending_lock_cheap);
3313     toku_free(m_table);
3314     toku_free(m_mutexes);
3315 }
3316 
3317 // adds a PAIR to the cachetable's structures,
3318 // but does NOT add it to the list maintained by
3319 // the cachefile
add_to_cachetable_only(PAIR p)3320 void pair_list::add_to_cachetable_only(PAIR p) {
3321     // sanity check to make sure that the PAIR does not already exist
3322     PAIR pp = this->find_pair(p->cachefile, p->key, p->fullhash);
3323     assert(pp == NULL);
3324 
3325     this->add_to_clock(p);
3326     this->add_to_hash_chain(p);
3327     m_n_in_table++;
3328 }
3329 
3330 // This places the given pair inside of the pair list.
3331 //
3332 // requires caller to have grabbed write lock on list.
3333 // requires caller to have p->mutex held as well
3334 //
put(PAIR p)3335 void pair_list::put(PAIR p) {
3336     this->add_to_cachetable_only(p);
3337     this->add_to_cf_list(p);
3338 }
3339 
3340 // This removes the given pair from completely from the pair list.
3341 //
3342 // requires caller to have grabbed write lock on list, and p->mutex held
3343 //
evict_completely(PAIR p)3344 void pair_list::evict_completely(PAIR p) {
3345     this->evict_from_cachetable(p);
3346     this->evict_from_cachefile(p);
3347 }
3348 
3349 // Removes the PAIR from the cachetable's lists,
3350 // but does NOT impact the list maintained by the cachefile
evict_from_cachetable(PAIR p)3351 void pair_list::evict_from_cachetable(PAIR p) {
3352     this->pair_remove(p);
3353     this->pending_pairs_remove(p);
3354     this->remove_from_hash_chain(p);
3355 
3356     assert(m_n_in_table > 0);
3357     m_n_in_table--;
3358 }
3359 
3360 // Removes the PAIR from the cachefile's list of PAIRs
evict_from_cachefile(PAIR p)3361 void pair_list::evict_from_cachefile(PAIR p) {
3362     evict_pair_from_cachefile(p);
3363 }
3364 
3365 //
3366 // Remove pair from linked list for cleaner/clock
3367 //
3368 //
3369 // requires caller to have grabbed write lock on list.
3370 //
pair_remove(PAIR p)3371 void pair_list::pair_remove (PAIR p) {
3372     if (p->clock_prev == p) {
3373         invariant(m_clock_head == p);
3374         invariant(p->clock_next == p);
3375         invariant(m_cleaner_head == p);
3376         invariant(m_checkpoint_head == p);
3377         m_clock_head = NULL;
3378         m_cleaner_head = NULL;
3379         m_checkpoint_head = NULL;
3380     }
3381     else {
3382         if (p == m_clock_head) {
3383             m_clock_head = m_clock_head->clock_next;
3384         }
3385         if (p == m_cleaner_head) {
3386             m_cleaner_head = m_cleaner_head->clock_next;
3387         }
3388         if (p == m_checkpoint_head) {
3389             m_checkpoint_head = m_checkpoint_head->clock_next;
3390         }
3391         p->clock_prev->clock_next = p->clock_next;
3392         p->clock_next->clock_prev = p->clock_prev;
3393     }
3394     p->clock_prev = p->clock_next = NULL;
3395 }
3396 
3397 //Remove a pair from the list of pairs that were marked with the
3398 //pending bit for the in-progress checkpoint.
3399 //
3400 // requires that if the caller is the checkpoint thread, then a read lock
3401 // is grabbed on the list. Otherwise, must have write lock on list.
3402 //
pending_pairs_remove(PAIR p)3403 void pair_list::pending_pairs_remove (PAIR p) {
3404     if (p->pending_next) {
3405         p->pending_next->pending_prev = p->pending_prev;
3406     }
3407     if (p->pending_prev) {
3408         p->pending_prev->pending_next = p->pending_next;
3409     }
3410     else if (m_pending_head==p) {
3411         m_pending_head = p->pending_next;
3412     }
3413     p->pending_prev = p->pending_next = NULL;
3414 }
3415 
remove_from_hash_chain(PAIR p)3416 void pair_list::remove_from_hash_chain(PAIR p) {
3417     // Remove it from the hash chain.
3418     unsigned int h = p->fullhash&(m_table_size - 1);
3419     paranoid_invariant(m_table[h] != NULL);
3420     if (m_table[h] == p) {
3421         m_table[h] = p->hash_chain;
3422     }
3423     else {
3424         PAIR curr = m_table[h];
3425         while (curr->hash_chain != p) {
3426             curr = curr->hash_chain;
3427         }
3428         // remove p from the singular linked list
3429         curr->hash_chain = p->hash_chain;
3430     }
3431     p->hash_chain = NULL;
3432 }
3433 
3434 // Returns a pair from the pair list, using the given
3435 // pair.  If the pair cannot be found, null is returned.
3436 //
3437 // requires caller to have grabbed either a read lock on the list or
3438 // bucket's mutex.
3439 //
find_pair(CACHEFILE file,CACHEKEY key,uint32_t fullhash)3440 PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) {
3441     PAIR found_pair = nullptr;
3442     for (PAIR p = m_table[fullhash&(m_table_size - 1)]; p; p = p->hash_chain) {
3443         if (p->key.b == key.b && p->cachefile == file) {
3444             found_pair = p;
3445             break;
3446         }
3447     }
3448     return found_pair;
3449 }
3450 
3451 // Add PAIR to linked list shared by cleaner thread and clock
3452 //
3453 // requires caller to have grabbed write lock on list.
3454 //
add_to_clock(PAIR p)3455 void pair_list::add_to_clock (PAIR p) {
3456     // requires that p is not currently in the table.
3457     // inserts p into the clock list at the tail.
3458 
3459     p->count = CLOCK_INITIAL_COUNT;
3460     //assert either both head and tail are set or they are both NULL
3461     // tail and head exist
3462     if (m_clock_head) {
3463         assert(m_cleaner_head);
3464         assert(m_checkpoint_head);
3465         // insert right before the head
3466         p->clock_next = m_clock_head;
3467         p->clock_prev = m_clock_head->clock_prev;
3468 
3469         p->clock_prev->clock_next = p;
3470         p->clock_next->clock_prev = p;
3471 
3472     }
3473     // this is the first element in the list
3474     else {
3475         m_clock_head = p;
3476         p->clock_next = p->clock_prev = m_clock_head;
3477         m_cleaner_head = p;
3478         m_checkpoint_head = p;
3479     }
3480 }
3481 
3482 // add the pair to the linked list that of PAIRs belonging
3483 // to the same cachefile. This linked list is used
3484 // in cachetable_flush_cachefile.
add_to_cf_list(PAIR p)3485 void pair_list::add_to_cf_list(PAIR p) {
3486     CACHEFILE cf = p->cachefile;
3487     if (cf->cf_head) {
3488         cf->cf_head->cf_prev = p;
3489     }
3490     p->cf_next = cf->cf_head;
3491     p->cf_prev = NULL;
3492     cf->cf_head = p;
3493     cf->num_pairs++;
3494 }
3495 
3496 // Add PAIR to the hashtable
3497 //
3498 // requires caller to have grabbed write lock on list
3499 // and to have grabbed the p->mutex.
add_to_hash_chain(PAIR p)3500 void pair_list::add_to_hash_chain(PAIR p) {
3501     uint32_t h = p->fullhash & (m_table_size - 1);
3502     p->hash_chain = m_table[h];
3503     m_table[h] = p;
3504 }
3505 
3506 // test function
3507 //
3508 // grabs and releases write list lock
3509 //
verify()3510 void pair_list::verify() {
3511     this->write_list_lock();
3512     uint32_t num_found = 0;
3513 
3514     // First clear all the verify flags by going through the hash chains
3515     {
3516         uint32_t i;
3517         for (i = 0; i < m_table_size; i++) {
3518             PAIR p;
3519             for (p = m_table[i]; p; p = p->hash_chain) {
3520                 num_found++;
3521             }
3522         }
3523     }
3524     assert(num_found == m_n_in_table);
3525     num_found = 0;
3526     // Now go through the clock chain, make sure everything in the LRU chain is hashed.
3527     {
3528         PAIR p;
3529         bool is_first = true;
3530         for (p = m_clock_head; m_clock_head != NULL && (p != m_clock_head || is_first); p=p->clock_next) {
3531             is_first=false;
3532             PAIR p2;
3533             uint32_t fullhash = p->fullhash;
3534             //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
3535             for (p2 = m_table[fullhash&(m_table_size-1)]; p2; p2=p2->hash_chain) {
3536                 if (p2==p) {
3537                     /* found it */
3538                     num_found++;
3539                     goto next;
3540                 }
3541             }
3542             fprintf(stderr, "Something in the clock chain is not hashed\n");
3543             assert(0);
3544         next:;
3545         }
3546         assert (num_found == m_n_in_table);
3547     }
3548     this->write_list_unlock();
3549 }
3550 
3551 // If given pointers are not null, assign the hash table size of
3552 // this pair list and the number of pairs in this pair list.
3553 //
3554 //
3555 // grabs and releases read list lock
3556 //
get_state(int * num_entries,int * hash_size)3557 void pair_list::get_state(int *num_entries, int *hash_size) {
3558     this->read_list_lock();
3559     if (num_entries) {
3560         *num_entries = m_n_in_table;
3561     }
3562     if (hash_size) {
3563         *hash_size = m_table_size;
3564     }
3565     this->read_list_unlock();
3566 }
3567 
read_list_lock()3568 void pair_list::read_list_lock() {
3569     toku_pthread_rwlock_rdlock(&m_list_lock);
3570 }
3571 
read_list_unlock()3572 void pair_list::read_list_unlock() {
3573     toku_pthread_rwlock_rdunlock(&m_list_lock);
3574 }
3575 
write_list_lock()3576 void pair_list::write_list_lock() {
3577     toku_pthread_rwlock_wrlock(&m_list_lock);
3578 }
3579 
write_list_unlock()3580 void pair_list::write_list_unlock() {
3581     toku_pthread_rwlock_wrunlock(&m_list_lock);
3582 }
3583 
read_pending_exp_lock()3584 void pair_list::read_pending_exp_lock() {
3585     toku_pthread_rwlock_rdlock(&m_pending_lock_expensive);
3586 }
3587 
read_pending_exp_unlock()3588 void pair_list::read_pending_exp_unlock() {
3589     toku_pthread_rwlock_rdunlock(&m_pending_lock_expensive);
3590 }
3591 
write_pending_exp_lock()3592 void pair_list::write_pending_exp_lock() {
3593     toku_pthread_rwlock_wrlock(&m_pending_lock_expensive);
3594 }
3595 
write_pending_exp_unlock()3596 void pair_list::write_pending_exp_unlock() {
3597     toku_pthread_rwlock_wrunlock(&m_pending_lock_expensive);
3598 }
3599 
read_pending_cheap_lock()3600 void pair_list::read_pending_cheap_lock() {
3601     toku_pthread_rwlock_rdlock(&m_pending_lock_cheap);
3602 }
3603 
read_pending_cheap_unlock()3604 void pair_list::read_pending_cheap_unlock() {
3605     toku_pthread_rwlock_rdunlock(&m_pending_lock_cheap);
3606 }
3607 
write_pending_cheap_lock()3608 void pair_list::write_pending_cheap_lock() {
3609     toku_pthread_rwlock_wrlock(&m_pending_lock_cheap);
3610 }
3611 
write_pending_cheap_unlock()3612 void pair_list::write_pending_cheap_unlock() {
3613     toku_pthread_rwlock_wrunlock(&m_pending_lock_cheap);
3614 }
3615 
get_mutex_for_pair(uint32_t fullhash)3616 toku_mutex_t* pair_list::get_mutex_for_pair(uint32_t fullhash) {
3617     return &m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex;
3618 }
3619 
pair_lock_by_fullhash(uint32_t fullhash)3620 void pair_list::pair_lock_by_fullhash(uint32_t fullhash) {
3621     toku_mutex_lock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
3622 }
3623 
pair_unlock_by_fullhash(uint32_t fullhash)3624 void pair_list::pair_unlock_by_fullhash(uint32_t fullhash) {
3625     toku_mutex_unlock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
3626 }
3627 
3628 
3629 ENSURE_POD(evictor);
3630 
3631 //
3632 // This is the function that runs eviction on its own thread.
3633 //
eviction_thread(void * evictor_v)3634 static void *eviction_thread(void *evictor_v) {
3635     evictor *CAST_FROM_VOIDP(evictor, evictor_v);
3636     evictor->run_eviction_thread();
3637     return toku_pthread_done(evictor_v);
3638 }
3639 
3640 //
3641 // Starts the eviction thread, assigns external object references,
3642 // and initializes all counters and condition variables.
3643 //
init(long _size_limit,pair_list * _pl,cachefile_list * _cf_list,KIBBUTZ _kibbutz,uint32_t eviction_period)3644 int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) {
3645     TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running);
3646     TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting);
3647 
3648     // set max difference to around 500MB
3649     int64_t max_diff = (1 << 29);
3650 
3651     m_low_size_watermark = _size_limit;
3652     // these values are selected kind of arbitrarily right now as
3653     // being a percentage more than low_size_watermark, which is provided
3654     // by the caller.
3655     m_low_size_hysteresis = (11 * _size_limit)/10; //10% more
3656     if ((m_low_size_hysteresis - m_low_size_watermark) > max_diff) {
3657         m_low_size_hysteresis = m_low_size_watermark + max_diff;
3658     }
3659     m_high_size_hysteresis = (5 * _size_limit)/4; // 20% more
3660     if ((m_high_size_hysteresis - m_low_size_hysteresis) > max_diff) {
3661         m_high_size_hysteresis = m_low_size_hysteresis + max_diff;
3662     }
3663     m_high_size_watermark = (3 * _size_limit)/2; // 50% more
3664     if ((m_high_size_watermark - m_high_size_hysteresis) > max_diff) {
3665         m_high_size_watermark = m_high_size_hysteresis + max_diff;
3666     }
3667 
3668     m_enable_partial_eviction = true;
3669 
3670     m_size_reserved = unreservable_memory(_size_limit);
3671     m_size_current = 0;
3672     m_size_cloned_data = 0;
3673     m_size_evicting = 0;
3674 
3675     m_size_nonleaf = create_partitioned_counter();
3676     m_size_leaf = create_partitioned_counter();
3677     m_size_rollback = create_partitioned_counter();
3678     m_size_cachepressure = create_partitioned_counter();
3679     m_wait_pressure_count = create_partitioned_counter();
3680     m_wait_pressure_time = create_partitioned_counter();
3681     m_long_wait_pressure_count = create_partitioned_counter();
3682     m_long_wait_pressure_time = create_partitioned_counter();
3683 
3684     m_pl = _pl;
3685     m_cf_list = _cf_list;
3686     m_kibbutz = _kibbutz;
3687     toku_mutex_init(
3688         *cachetable_ev_thread_lock_mutex_key, &m_ev_thread_lock, nullptr);
3689     toku_cond_init(
3690         *cachetable_m_flow_control_cond_key, &m_flow_control_cond, nullptr);
3691     toku_cond_init(
3692         *cachetable_m_ev_thread_cond_key, &m_ev_thread_cond, nullptr);
3693     m_num_sleepers = 0;
3694     m_ev_thread_is_running = false;
3695     m_period_in_seconds = eviction_period;
3696 
3697     unsigned int seed = (unsigned int) time(NULL);
3698     int r = myinitstate_r(seed, m_random_statebuf, sizeof m_random_statebuf, &m_random_data);
3699     assert_zero(r);
3700 
3701     // start the background thread
3702     m_run_thread = true;
3703     m_num_eviction_thread_runs = 0;
3704     m_ev_thread_init = false;
3705     r = toku_pthread_create(
3706         *eviction_thread_key, &m_ev_thread, nullptr, eviction_thread, this);
3707     if (r == 0) {
3708         m_ev_thread_init = true;
3709     }
3710     m_evictor_init = true;
3711     return r;
3712 }
3713 
3714 //
3715 // This stops the eviction thread and clears the condition variable.
3716 //
3717 // NOTE: This should only be called if there are no evictions in progress.
3718 //
destroy()3719 void evictor::destroy() {
3720     if (!m_evictor_init) {
3721         return;
3722     }
3723     assert(m_size_evicting == 0);
3724     //
3725     // commented out of Ming, because we could not finish
3726     // #5672. Once #5672 is solved, we should restore this
3727     //
3728     //assert(m_size_current == 0);
3729 
3730     // Stop the eviction thread.
3731     if (m_ev_thread_init) {
3732         toku_mutex_lock(&m_ev_thread_lock);
3733         m_run_thread = false;
3734         this->signal_eviction_thread_locked();
3735         toku_mutex_unlock(&m_ev_thread_lock);
3736         void *ret;
3737         int r = toku_pthread_join(m_ev_thread, &ret);
3738         assert_zero(r);
3739         assert(!m_ev_thread_is_running);
3740     }
3741     destroy_partitioned_counter(m_size_nonleaf);
3742     m_size_nonleaf = NULL;
3743     destroy_partitioned_counter(m_size_leaf);
3744     m_size_leaf = NULL;
3745     destroy_partitioned_counter(m_size_rollback);
3746     m_size_rollback = NULL;
3747     destroy_partitioned_counter(m_size_cachepressure);
3748     m_size_cachepressure = NULL;
3749 
3750     destroy_partitioned_counter(m_wait_pressure_count); m_wait_pressure_count = NULL;
3751     destroy_partitioned_counter(m_wait_pressure_time); m_wait_pressure_time = NULL;
3752     destroy_partitioned_counter(m_long_wait_pressure_count); m_long_wait_pressure_count = NULL;
3753     destroy_partitioned_counter(m_long_wait_pressure_time); m_long_wait_pressure_time = NULL;
3754 
3755     toku_cond_destroy(&m_flow_control_cond);
3756     toku_cond_destroy(&m_ev_thread_cond);
3757     toku_mutex_destroy(&m_ev_thread_lock);
3758 }
3759 
3760 //
3761 // Increases status variables and the current size variable
3762 // of the evictor based on the given pair attribute.
3763 //
add_pair_attr(PAIR_ATTR attr)3764 void evictor::add_pair_attr(PAIR_ATTR attr) {
3765     assert(attr.is_valid);
3766     add_to_size_current(attr.size);
3767     increment_partitioned_counter(m_size_nonleaf, attr.nonleaf_size);
3768     increment_partitioned_counter(m_size_leaf, attr.leaf_size);
3769     increment_partitioned_counter(m_size_rollback, attr.rollback_size);
3770     increment_partitioned_counter(m_size_cachepressure, attr.cache_pressure_size);
3771 }
3772 
3773 //
3774 // Decreases status variables and the current size variable
3775 // of the evictor based on the given pair attribute.
3776 //
remove_pair_attr(PAIR_ATTR attr)3777 void evictor::remove_pair_attr(PAIR_ATTR attr) {
3778     assert(attr.is_valid);
3779     remove_from_size_current(attr.size);
3780     increment_partitioned_counter(m_size_nonleaf, 0 - attr.nonleaf_size);
3781     increment_partitioned_counter(m_size_leaf, 0 - attr.leaf_size);
3782     increment_partitioned_counter(m_size_rollback, 0 - attr.rollback_size);
3783     increment_partitioned_counter(m_size_cachepressure, 0 - attr.cache_pressure_size);
3784 }
3785 
3786 //
3787 // Updates this evictor's stats to match the "new" pair attribute given
3788 // while also removing the given "old" pair attribute.
3789 //
change_pair_attr(PAIR_ATTR old_attr,PAIR_ATTR new_attr)3790 void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) {
3791     this->add_pair_attr(new_attr);
3792     this->remove_pair_attr(old_attr);
3793 }
3794 
3795 //
3796 // Adds the given size to the evictor's estimation of
3797 // the size of the cachetable.
3798 //
add_to_size_current(long size)3799 void evictor::add_to_size_current(long size) {
3800     (void) toku_sync_fetch_and_add(&m_size_current, size);
3801 }
3802 
3803 //
3804 // Subtracts the given size from the evictor's current
3805 // approximation of the cachetable size.
3806 //
remove_from_size_current(long size)3807 void evictor::remove_from_size_current(long size) {
3808     (void) toku_sync_fetch_and_sub(&m_size_current, size);
3809 }
3810 
3811 //
3812 // Adds the size of cloned data to necessary variables in the evictor
3813 //
add_cloned_data_size(long size)3814 void evictor::add_cloned_data_size(long size) {
3815     (void) toku_sync_fetch_and_add(&m_size_cloned_data, size);
3816     add_to_size_current(size);
3817 }
3818 
3819 //
3820 // Removes  the size of cloned data to necessary variables in the evictor
3821 //
remove_cloned_data_size(long size)3822 void evictor::remove_cloned_data_size(long size) {
3823     (void) toku_sync_fetch_and_sub(&m_size_cloned_data, size);
3824     remove_from_size_current(size);
3825 }
3826 
3827 //
3828 // TODO: (Zardosht) comment this function
3829 //
reserve_memory(double fraction,uint64_t upper_bound)3830 uint64_t evictor::reserve_memory(double fraction, uint64_t upper_bound) {
3831     toku_mutex_lock(&m_ev_thread_lock);
3832     uint64_t reserved_memory = fraction * (m_low_size_watermark - m_size_reserved);
3833     if (0) { // debug
3834         fprintf(stderr, "%s %" PRIu64 " %" PRIu64 "\n", __PRETTY_FUNCTION__, reserved_memory, upper_bound);
3835     }
3836     if (upper_bound > 0 && reserved_memory > upper_bound) {
3837         reserved_memory = upper_bound;
3838     }
3839     m_size_reserved += reserved_memory;
3840     (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory);
3841     this->signal_eviction_thread_locked();
3842     toku_mutex_unlock(&m_ev_thread_lock);
3843 
3844     if (this->should_client_thread_sleep()) {
3845         this->wait_for_cache_pressure_to_subside();
3846     }
3847     return reserved_memory;
3848 }
3849 
3850 //
3851 // TODO: (Zardosht) comment this function
3852 //
release_reserved_memory(uint64_t reserved_memory)3853 void evictor::release_reserved_memory(uint64_t reserved_memory){
3854     (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory);
3855     toku_mutex_lock(&m_ev_thread_lock);
3856     m_size_reserved -= reserved_memory;
3857     // signal the eviction thread in order to possibly wake up sleeping clients
3858     if (m_num_sleepers  > 0) {
3859         this->signal_eviction_thread_locked();
3860     }
3861     toku_mutex_unlock(&m_ev_thread_lock);
3862 }
3863 
3864 //
3865 // This function is the eviction thread. It runs for the lifetime of
3866 // the evictor. Goes to sleep for period_in_seconds
3867 // by waiting on m_ev_thread_cond.
3868 //
run_eviction_thread()3869 void evictor::run_eviction_thread(){
3870     toku_mutex_lock(&m_ev_thread_lock);
3871     while (m_run_thread) {
3872         m_num_eviction_thread_runs++; // for test purposes only
3873         m_ev_thread_is_running = true;
3874         // responsibility of run_eviction to release and
3875         // regrab ev_thread_lock as it sees fit
3876         this->run_eviction();
3877         m_ev_thread_is_running = false;
3878 
3879         if (m_run_thread) {
3880             //
3881             // sleep until either we are signaled
3882             // via signal_eviction_thread or
3883             // m_period_in_seconds amount of time has passed
3884             //
3885             if (m_period_in_seconds) {
3886                 toku_timespec_t wakeup_time;
3887                 struct timeval tv;
3888                 gettimeofday(&tv, 0);
3889                 wakeup_time.tv_sec  = tv.tv_sec;
3890                 wakeup_time.tv_nsec = tv.tv_usec * 1000LL;
3891                 wakeup_time.tv_sec += m_period_in_seconds;
3892                 toku_cond_timedwait(
3893                     &m_ev_thread_cond,
3894                     &m_ev_thread_lock,
3895                     &wakeup_time
3896                     );
3897             }
3898             // for test purposes, we have an option of
3899             // not waiting on a period, but rather sleeping indefinitely
3900             else {
3901                 toku_cond_wait(&m_ev_thread_cond, &m_ev_thread_lock);
3902             }
3903         }
3904     }
3905     toku_mutex_unlock(&m_ev_thread_lock);
3906 }
3907 
3908 //
3909 // runs eviction.
3910 // on entry, ev_thread_lock is grabbed, on exit, ev_thread_lock must still be grabbed
3911 // it is the responsibility of this function to release and reacquire ev_thread_lock as it sees fit.
3912 //
run_eviction()3913 void evictor::run_eviction(){
3914     //
3915     // These variables will help us detect if everything in the clock is currently being accessed.
3916     // We must detect this case otherwise we will end up in an infinite loop below.
3917     //
3918     bool exited_early = false;
3919     uint32_t num_pairs_examined_without_evicting = 0;
3920 
3921     while (this->eviction_needed()) {
3922         if (m_num_sleepers > 0 && this->should_sleeping_clients_wakeup()) {
3923             toku_cond_broadcast(&m_flow_control_cond);
3924         }
3925         // release ev_thread_lock so that eviction may run without holding mutex
3926         toku_mutex_unlock(&m_ev_thread_lock);
3927 
3928         // first try to do an eviction from stale cachefiles
3929         bool some_eviction_ran = m_cf_list->evict_some_stale_pair(this);
3930         if (!some_eviction_ran) {
3931             m_pl->read_list_lock();
3932             PAIR curr_in_clock = m_pl->m_clock_head;
3933             // if nothing to evict, we need to exit
3934             if (!curr_in_clock) {
3935                 m_pl->read_list_unlock();
3936                 toku_mutex_lock(&m_ev_thread_lock);
3937                 exited_early = true;
3938                 goto exit;
3939             }
3940             if (num_pairs_examined_without_evicting > m_pl->m_n_in_table) {
3941                 // we have a cycle where everything in the clock is in use
3942                 // do not return an error
3943                 // just let memory be overfull
3944                 m_pl->read_list_unlock();
3945                 toku_mutex_lock(&m_ev_thread_lock);
3946                 exited_early = true;
3947                 goto exit;
3948             }
3949             bool eviction_run = run_eviction_on_pair(curr_in_clock);
3950             if (eviction_run) {
3951                 // reset the count
3952                 num_pairs_examined_without_evicting = 0;
3953             }
3954             else {
3955                 num_pairs_examined_without_evicting++;
3956             }
3957             // at this point, either curr_in_clock is still in the list because it has not been fully evicted,
3958             // and we need to move ct->m_clock_head over. Otherwise, curr_in_clock has been fully evicted
3959             // and we do NOT need to move ct->m_clock_head, as the removal of curr_in_clock
3960             // modified ct->m_clock_head
3961             if (m_pl->m_clock_head && (m_pl->m_clock_head == curr_in_clock)) {
3962                 m_pl->m_clock_head = m_pl->m_clock_head->clock_next;
3963             }
3964             m_pl->read_list_unlock();
3965         }
3966         toku_mutex_lock(&m_ev_thread_lock);
3967     }
3968 
3969 exit:
3970     if (m_num_sleepers > 0 && (exited_early || this->should_sleeping_clients_wakeup())) {
3971         toku_cond_broadcast(&m_flow_control_cond);
3972     }
3973     return;
3974 }
3975 
3976 //
3977 // NOTE: Cachetable lock held on entry.
3978 // Runs eviction on the given PAIR.  This may be a
3979 // partial eviction or full eviction.
3980 //
3981 // on entry, pair mutex is NOT held, but pair list's read list lock
3982 // IS held
3983 // on exit, the same conditions must apply
3984 //
run_eviction_on_pair(PAIR curr_in_clock)3985 bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
3986     uint32_t n_in_table;
3987     int64_t size_current;
3988     bool ret_val = false;
3989     // function meant to be called on PAIR that is not being accessed right now
3990     CACHEFILE cf = curr_in_clock->cachefile;
3991     int r = bjm_add_background_job(cf->bjm);
3992     if (r) {
3993         goto exit;
3994     }
3995     pair_lock(curr_in_clock);
3996     // these are the circumstances under which we don't run eviction on a pair:
3997     //  - if other users are waiting on the lock
3998     //  - if the PAIR is referenced by users
3999     //  - if the PAIR's disk_nb_mutex is in use, implying that it is
4000     //    undergoing a checkpoint
4001     if (curr_in_clock->value_rwlock.users() ||
4002         curr_in_clock->refcount > 0 ||
4003         nb_mutex_users(&curr_in_clock->disk_nb_mutex))
4004     {
4005         pair_unlock(curr_in_clock);
4006         bjm_remove_background_job(cf->bjm);
4007         goto exit;
4008     }
4009 
4010     // extract and use these values so that we don't risk them changing
4011     // out from underneath us in calculations below.
4012     n_in_table = m_pl->m_n_in_table;
4013     size_current = m_size_current;
4014 
4015     // now that we have the pair mutex we care about, we can
4016     // release the read list lock and reacquire it at the end of the function
4017     m_pl->read_list_unlock();
4018     ret_val = true;
4019     if (curr_in_clock->count > 0) {
4020         toku::context pe_ctx(CTX_PARTIAL_EVICTION);
4021 
4022         uint32_t curr_size = curr_in_clock->attr.size;
4023         // if the size of this PAIR is greater than the average size of PAIRs
4024         // in the cachetable, then decrement it, otherwise, decrement
4025         // probabilistically
4026         if (curr_size*n_in_table >= size_current) {
4027             curr_in_clock->count--;
4028         } else {
4029             // generate a random number between 0 and 2^16
4030             assert(size_current <= (INT64_MAX / ((1<<16)-1))); // to protect against possible overflows
4031             int32_t rnd = myrandom_r(&m_random_data) % (1<<16);
4032             // The if-statement below will be true with probability of
4033             // curr_size/(average size of PAIR in cachetable)
4034             // Here is how the math is done:
4035             //   average_size = size_current/n_in_table
4036             //   curr_size/average_size = curr_size*n_in_table/size_current
4037             //   we evaluate if a random number from 0 to 2^16 is less than
4038             //   than curr_size/average_size * 2^16. So, our if-clause should be
4039             //    if (2^16*curr_size/average_size > rnd)
4040             //    this evaluates to:
4041             //    if (2^16*curr_size*n_in_table/size_current > rnd)
4042             //    by multiplying each side of the equation by size_current, we get
4043             //    if (2^16*curr_size*n_in_table > rnd*size_current)
4044             //    and dividing each side by 2^16,
4045             //    we get the if-clause below
4046             //
4047             if ((((int64_t)curr_size) * n_in_table) >= (((int64_t)rnd) * size_current)>>16) {
4048                 curr_in_clock->count--;
4049             }
4050         }
4051 
4052         if (m_enable_partial_eviction) {
4053             // call the partial eviction callback
4054             curr_in_clock->value_rwlock.write_lock(true);
4055 
4056             void *value = curr_in_clock->value_data;
4057             void* disk_data = curr_in_clock->disk_data;
4058             void *write_extraargs = curr_in_clock->write_extraargs;
4059             enum partial_eviction_cost cost;
4060             long bytes_freed_estimate = 0;
4061             curr_in_clock->pe_est_callback(value, disk_data,
4062                                            &bytes_freed_estimate, &cost,
4063                                            write_extraargs);
4064             if (cost == PE_CHEAP) {
4065                 pair_unlock(curr_in_clock);
4066                 curr_in_clock->size_evicting_estimate = 0;
4067                 this->do_partial_eviction(curr_in_clock);
4068                 bjm_remove_background_job(cf->bjm);
4069             } else if (cost == PE_EXPENSIVE) {
4070                 // only bother running an expensive partial eviction
4071                 // if it is expected to free space
4072                 if (bytes_freed_estimate > 0) {
4073                     pair_unlock(curr_in_clock);
4074                     curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
4075                     toku_mutex_lock(&m_ev_thread_lock);
4076                     m_size_evicting += bytes_freed_estimate;
4077                     toku_mutex_unlock(&m_ev_thread_lock);
4078                     toku_kibbutz_enq(m_kibbutz, cachetable_partial_eviction,
4079                                      curr_in_clock);
4080                 } else {
4081                     curr_in_clock->value_rwlock.write_unlock();
4082                     pair_unlock(curr_in_clock);
4083                     bjm_remove_background_job(cf->bjm);
4084                 }
4085             } else {
4086                 assert(false);
4087             }
4088         } else {
4089             pair_unlock(curr_in_clock);
4090             bjm_remove_background_job(cf->bjm);
4091         }
4092     } else {
4093         toku::context pe_ctx(CTX_FULL_EVICTION);
4094 
4095         // responsibility of try_evict_pair to eventually remove background job
4096         // pair's mutex is still grabbed here
4097         this->try_evict_pair(curr_in_clock);
4098     }
4099     // regrab the read list lock, because the caller assumes
4100     // that it is held. The contract requires this.
4101     m_pl->read_list_lock();
4102 exit:
4103     return ret_val;
4104 }
4105 
4106 struct pair_unpin_with_new_attr_extra {
pair_unpin_with_new_attr_extrapair_unpin_with_new_attr_extra4107     pair_unpin_with_new_attr_extra(evictor *e, PAIR p) :
4108         ev(e), pair(p) {
4109     }
4110     evictor *ev;
4111     PAIR pair;
4112 };
4113 
pair_unpin_with_new_attr(PAIR_ATTR new_attr,void * extra)4114 static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *extra) {
4115     struct pair_unpin_with_new_attr_extra *info =
4116         reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra);
4117     PAIR p = info->pair;
4118     evictor *ev = info->ev;
4119 
4120     // change the attr in the evictor, then update the value in the pair
4121     ev->change_pair_attr(p->attr, new_attr);
4122     p->attr = new_attr;
4123 
4124     // unpin
4125     pair_lock(p);
4126     p->value_rwlock.write_unlock();
4127     pair_unlock(p);
4128 }
4129 
4130 //
4131 // on entry and exit, pair's mutex is not held
4132 // on exit, PAIR is unpinned
4133 //
do_partial_eviction(PAIR p)4134 void evictor::do_partial_eviction(PAIR p) {
4135     // Copy the old attr
4136     PAIR_ATTR old_attr = p->attr;
4137     long long size_evicting_estimate = p->size_evicting_estimate;
4138 
4139     struct pair_unpin_with_new_attr_extra extra(this, p);
4140     p->pe_callback(p->value_data, old_attr, p->write_extraargs,
4141                    // passed as the finalize continuation, which allows the
4142                    // pe_callback to unpin the node before doing expensive cleanup
4143                    pair_unpin_with_new_attr, &extra);
4144 
4145     // now that the pe_callback (and its pair_unpin_with_new_attr continuation)
4146     // have finished, we can safely decrease size_evicting
4147     this->decrease_size_evicting(size_evicting_estimate);
4148 }
4149 
4150 //
4151 // CT lock held on entry
4152 // background job has been added for p->cachefile on entry
4153 // responsibility of this function to make sure that background job is removed
4154 //
4155 // on entry, pair's mutex is held, on exit, the pair's mutex is NOT held
4156 //
try_evict_pair(PAIR p)4157 void evictor::try_evict_pair(PAIR p) {
4158     CACHEFILE cf = p->cachefile;
4159     // evictions without a write or unpinned pair's that are clean
4160     // can be run in the current thread
4161 
4162     // the only caller, run_eviction_on_pair, should call this function
4163     // only if no one else is trying to use it
4164     assert(!p->value_rwlock.users());
4165     p->value_rwlock.write_lock(true);
4166     // if the PAIR is dirty, the running eviction requires writing the
4167     // PAIR out. if the disk_nb_mutex is grabbed, then running
4168     // eviction requires waiting for the disk_nb_mutex to become available,
4169     // which may be expensive. Hence, if either is true, we
4170     // do the eviction on a writer thread
4171     if (!p->dirty && (nb_mutex_writers(&p->disk_nb_mutex) == 0)) {
4172         p->size_evicting_estimate = 0;
4173         //
4174         // This method will unpin PAIR and release PAIR mutex
4175         //
4176         // because the PAIR is not dirty, we can safely pass
4177         // false for the for_checkpoint parameter
4178         this->evict_pair(p, false);
4179         bjm_remove_background_job(cf->bjm);
4180     }
4181     else {
4182         pair_unlock(p);
4183         toku_mutex_lock(&m_ev_thread_lock);
4184         assert(m_size_evicting >= 0);
4185         p->size_evicting_estimate = p->attr.size;
4186         m_size_evicting += p->size_evicting_estimate;
4187         assert(m_size_evicting >= 0);
4188         toku_mutex_unlock(&m_ev_thread_lock);
4189         toku_kibbutz_enq(m_kibbutz, cachetable_evicter, p);
4190     }
4191 }
4192 
4193 //
4194 // Requires: This thread must hold the write lock (nb_mutex) for the pair.
4195 //                The pair's mutex (p->mutex) is also held.
4196 //                on exit, neither is held
4197 //
evict_pair(PAIR p,bool for_checkpoint)4198 void evictor::evict_pair(PAIR p, bool for_checkpoint) {
4199     if (p->dirty) {
4200         pair_unlock(p);
4201         cachetable_write_locked_pair(this, p, for_checkpoint);
4202         pair_lock(p);
4203     }
4204     // one thing we can do here is extract the size_evicting estimate,
4205     // have decrease_size_evicting take the estimate and not the pair,
4206     // and do this work after we have called
4207     // cachetable_maybe_remove_and_free_pair
4208     this->decrease_size_evicting(p->size_evicting_estimate);
4209     // if we are to remove this pair, we need the write list lock,
4210     // to get it in a way that avoids deadlocks, we must first release
4211     // the pair's mutex, then grab the write list lock, then regrab the
4212     // pair's mutex. The pair cannot go anywhere because
4213     // the pair is still pinned
4214     nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
4215     pair_unlock(p);
4216     m_pl->write_list_lock();
4217     pair_lock(p);
4218     p->value_rwlock.write_unlock();
4219     nb_mutex_unlock(&p->disk_nb_mutex);
4220     // at this point, we have the pair list's write list lock
4221     // and we have the pair's mutex (p->mutex) held
4222 
4223     // this ensures that a clone running in the background first completes
4224     bool removed = false;
4225     if (p->value_rwlock.users() == 0 && p->refcount == 0) {
4226         // assumption is that if we are about to remove the pair
4227         // that no one has grabbed the disk_nb_mutex,
4228         // and that there is no cloned_value_data, because
4229         // no one is writing a cloned value out.
4230         assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
4231         assert(p->cloned_value_data == NULL);
4232         cachetable_remove_pair(m_pl, this, p);
4233         removed = true;
4234     }
4235     pair_unlock(p);
4236     m_pl->write_list_unlock();
4237     // do not want to hold the write list lock while freeing a pair
4238     if (removed) {
4239         cachetable_free_pair(p);
4240     }
4241 }
4242 
4243 //
4244 // this function handles the responsibilities for writer threads when they
4245 // decrease size_evicting. The responsibilities are:
4246 //  - decrease m_size_evicting in a thread safe manner
4247 //  - in some circumstances, signal the eviction thread
4248 //
decrease_size_evicting(long size_evicting_estimate)4249 void evictor::decrease_size_evicting(long size_evicting_estimate) {
4250     if (size_evicting_estimate > 0) {
4251         toku_mutex_lock(&m_ev_thread_lock);
4252         int64_t buffer = m_high_size_hysteresis - m_low_size_watermark;
4253         // if size_evicting is transitioning from greater than buffer to below buffer, and
4254         // some client threads are sleeping, we need to wake up the eviction thread.
4255         // Here is why. In this scenario, we are in one of two cases:
4256         //  - size_current - size_evicting < low_size_watermark
4257         //     If this is true, then size_current < high_size_hysteresis, which
4258         //     means we need to wake up sleeping clients
4259         //  - size_current - size_evicting > low_size_watermark,
4260         //       which means more evictions must be run.
4261         //  The consequences of both cases are the responsibility
4262         //  of the eviction thread.
4263         //
4264         bool need_to_signal_ev_thread =
4265             (m_num_sleepers > 0) &&
4266             !m_ev_thread_is_running &&
4267             (m_size_evicting > buffer) &&
4268             ((m_size_evicting - size_evicting_estimate) <= buffer);
4269         m_size_evicting -= size_evicting_estimate;
4270         assert(m_size_evicting >= 0);
4271         if (need_to_signal_ev_thread) {
4272             this->signal_eviction_thread_locked();
4273         }
4274         toku_mutex_unlock(&m_ev_thread_lock);
4275     }
4276 }
4277 
4278 //
4279 // Wait for cache table space to become available
4280 // size_current is number of bytes currently occupied by data (referred to by pairs)
4281 // size_evicting is number of bytes queued up to be evicted
4282 //
wait_for_cache_pressure_to_subside()4283 void evictor::wait_for_cache_pressure_to_subside() {
4284     uint64_t t0 = toku_current_time_microsec();
4285     toku_mutex_lock(&m_ev_thread_lock);
4286     m_num_sleepers++;
4287     this->signal_eviction_thread_locked();
4288     toku_cond_wait(&m_flow_control_cond, &m_ev_thread_lock);
4289     m_num_sleepers--;
4290     toku_mutex_unlock(&m_ev_thread_lock);
4291     uint64_t t1 = toku_current_time_microsec();
4292     increment_partitioned_counter(m_wait_pressure_count, 1);
4293     uint64_t tdelta = t1 - t0;
4294     increment_partitioned_counter(m_wait_pressure_time, tdelta);
4295     if (tdelta > 1000000) {
4296         increment_partitioned_counter(m_long_wait_pressure_count, 1);
4297         increment_partitioned_counter(m_long_wait_pressure_time, tdelta);
4298     }
4299 }
4300 
4301 //
4302 // Get the status of the current estimated size of the cachetable,
4303 // and the evictor's set limit.
4304 //
get_state(long * size_current_ptr,long * size_limit_ptr)4305 void evictor::get_state(long *size_current_ptr, long *size_limit_ptr) {
4306     if (size_current_ptr) {
4307         *size_current_ptr = m_size_current;
4308     }
4309     if (size_limit_ptr) {
4310         *size_limit_ptr = m_low_size_watermark;
4311     }
4312 }
4313 
4314 //
4315 // Force the eviction thread to do some work.
4316 //
4317 // This function does not require any mutex to be held.
4318 // As a result, scheduling is not guaranteed, but that is tolerable.
4319 //
signal_eviction_thread()4320 void evictor::signal_eviction_thread() {
4321     toku_mutex_lock(&m_ev_thread_lock);
4322     toku_cond_signal(&m_ev_thread_cond);
4323     toku_mutex_unlock(&m_ev_thread_lock);
4324 }
4325 
signal_eviction_thread_locked()4326 void evictor::signal_eviction_thread_locked() {
4327     toku_cond_signal(&m_ev_thread_cond);
4328 }
4329 
4330 //
4331 // Returns true if the cachetable is so over subscribed, that a client thread should sleep
4332 //
4333 // This function may be called in a thread-unsafe manner. Locks are not
4334 // required to read size_current. The result is that
4335 // the values may be a little off, but we think that is tolerable.
4336 //
should_client_thread_sleep()4337 bool evictor::should_client_thread_sleep(){
4338     return unsafe_read_size_current() > m_high_size_watermark;
4339 }
4340 
4341 //
4342 // Returns true if a sleeping client should be woken up because
4343 // the cachetable is not overly subscribed
4344 //
4345 // This function may be called in a thread-unsafe manner. Locks are not
4346 // required to read size_current. The result is that
4347 // the values may be a little off, but we think that is tolerable.
4348 //
should_sleeping_clients_wakeup()4349 bool evictor::should_sleeping_clients_wakeup() {
4350     return unsafe_read_size_current() <= m_high_size_hysteresis;
4351 }
4352 
4353 //
4354 // Returns true if a client thread should try to wake up the eviction
4355 // thread because the client thread has noticed too much data taken
4356 // up in the cachetable.
4357 //
4358 // This function may be called in a thread-unsafe manner. Locks are not
4359 // required to read size_current or size_evicting. The result is that
4360 // the values may be a little off, but we think that is tolerable.
4361 // If the caller wants to ensure that ev_thread_is_running and size_evicting
4362 // are accurate, then the caller must hold ev_thread_lock before
4363 // calling this function.
4364 //
should_client_wake_eviction_thread()4365 bool evictor::should_client_wake_eviction_thread() {
4366     return
4367         !m_ev_thread_is_running &&
4368         ((unsafe_read_size_current() - m_size_evicting) > m_low_size_hysteresis);
4369 }
4370 
4371 //
4372 // Determines if eviction is needed. If the current size of
4373 // the cachetable exceeds the sum of our fixed size limit and
4374 // the amount of data currently being evicted, then eviction is needed
4375 //
eviction_needed()4376 bool evictor::eviction_needed() {
4377     return (m_size_current - m_size_evicting) > m_low_size_watermark;
4378 }
4379 
unsafe_read_size_current(void) const4380 inline int64_t evictor::unsafe_read_size_current(void) const {
4381     return m_size_current;
4382 }
4383 
fill_engine_status()4384 void evictor::fill_engine_status() {
4385     CT_STATUS_VAL(CT_SIZE_CURRENT)           = m_size_current;
4386     CT_STATUS_VAL(CT_SIZE_LIMIT)             = m_low_size_hysteresis;
4387     CT_STATUS_VAL(CT_SIZE_WRITING)           = m_size_evicting;
4388     CT_STATUS_VAL(CT_SIZE_NONLEAF) = read_partitioned_counter(m_size_nonleaf);
4389     CT_STATUS_VAL(CT_SIZE_LEAF) = read_partitioned_counter(m_size_leaf);
4390     CT_STATUS_VAL(CT_SIZE_ROLLBACK) = read_partitioned_counter(m_size_rollback);
4391     CT_STATUS_VAL(CT_SIZE_CACHEPRESSURE) = read_partitioned_counter(m_size_cachepressure);
4392     CT_STATUS_VAL(CT_SIZE_CLONED) = m_size_cloned_data;
4393     CT_STATUS_VAL(CT_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_wait_pressure_count);
4394     CT_STATUS_VAL(CT_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_wait_pressure_time);
4395     CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_long_wait_pressure_count);
4396     CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_long_wait_pressure_time);
4397 }
4398 
set_enable_partial_eviction(bool enabled)4399 void evictor::set_enable_partial_eviction(bool enabled) {
4400     m_enable_partial_eviction = enabled;
4401 }
4402 
get_enable_partial_eviction(void) const4403 bool evictor::get_enable_partial_eviction(void) const {
4404     return m_enable_partial_eviction;
4405 }
4406 
4407 ////////////////////////////////////////////////////////////////////////////////
4408 
4409 ENSURE_POD(checkpointer);
4410 
4411 //
4412 // Sets the cachetable reference in this checkpointer class, this is temporary.
4413 //
init(pair_list * _pl,TOKULOGGER _logger,evictor * _ev,cachefile_list * files)4414 int checkpointer::init(pair_list *_pl,
4415                         TOKULOGGER _logger,
4416                         evictor *_ev,
4417                         cachefile_list *files) {
4418     m_list = _pl;
4419     m_logger = _logger;
4420     m_ev = _ev;
4421     m_cf_list = files;
4422     bjm_init(&m_checkpoint_clones_bjm);
4423 
4424     // Default is no checkpointing.
4425     m_checkpointer_cron_init = false;
4426     int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this);
4427     if (r == 0) {
4428         m_checkpointer_cron_init = true;
4429     }
4430     m_checkpointer_init = true;
4431     return r;
4432 }
4433 
destroy()4434 void checkpointer::destroy() {
4435     if (!m_checkpointer_init) {
4436         return;
4437     }
4438     if (m_checkpointer_cron_init && !this->has_been_shutdown()) {
4439         // for test code only, production code uses toku_cachetable_minicron_shutdown()
4440         int r = this->shutdown();
4441         assert(r == 0);
4442     }
4443     bjm_destroy(m_checkpoint_clones_bjm);
4444 }
4445 
4446 //
4447 // Sets how often the checkpoint thread will run, in seconds
4448 //
set_checkpoint_period(uint32_t new_period)4449 void checkpointer::set_checkpoint_period(uint32_t new_period) {
4450     toku_minicron_change_period(&m_checkpointer_cron, new_period*1000);
4451 }
4452 
4453 //
4454 // Sets how often the checkpoint thread will run.
4455 //
get_checkpoint_period()4456 uint32_t checkpointer::get_checkpoint_period() {
4457     return toku_minicron_get_period_in_seconds_unlocked(&m_checkpointer_cron);
4458 }
4459 
4460 //
4461 // Stops the checkpoint thread.
4462 //
shutdown()4463 int checkpointer::shutdown() {
4464     return toku_minicron_shutdown(&m_checkpointer_cron);
4465 }
4466 
4467 //
4468 // If checkpointing is running, this returns false.
4469 //
has_been_shutdown()4470 bool checkpointer::has_been_shutdown() {
4471     return toku_minicron_has_been_shutdown(&m_checkpointer_cron);
4472 }
4473 
get_logger()4474 TOKULOGGER checkpointer::get_logger() {
4475     return m_logger;
4476 }
4477 
increment_num_txns()4478 void checkpointer::increment_num_txns() {
4479     m_checkpoint_num_txns++;
4480 }
4481 
4482 struct iterate_begin_checkpoint {
4483     LSN lsn_of_checkpoint_in_progress;
iterate_begin_checkpointiterate_begin_checkpoint4484     iterate_begin_checkpoint(LSN lsn) : lsn_of_checkpoint_in_progress(lsn) { }
fniterate_begin_checkpoint4485     static int fn(const CACHEFILE &cf, const uint32_t UU(idx), struct iterate_begin_checkpoint *info) {
4486         assert(cf->begin_checkpoint_userdata);
4487         if (cf->for_checkpoint) {
4488             cf->begin_checkpoint_userdata(info->lsn_of_checkpoint_in_progress, cf->userdata);
4489         }
4490         return 0;
4491     }
4492 };
4493 
4494 //
4495 // Update the user data in any cachefiles in our checkpoint list.
4496 //
update_cachefiles()4497 void checkpointer::update_cachefiles() {
4498     struct iterate_begin_checkpoint iterate(m_lsn_of_checkpoint_in_progress);
4499     int r = m_cf_list->m_active_fileid.iterate<struct iterate_begin_checkpoint,
4500                                                iterate_begin_checkpoint::fn>(&iterate);
4501     assert_zero(r);
4502 }
4503 
4504 struct iterate_note_pin {
fniterate_note_pin4505     static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
4506         assert(cf->note_pin_by_checkpoint);
4507         cf->note_pin_by_checkpoint(cf, cf->userdata);
4508         cf->for_checkpoint = true;
4509         return 0;
4510     }
4511 };
4512 
4513 //
4514 // Sets up and kicks off a checkpoint.
4515 //
begin_checkpoint()4516 void checkpointer::begin_checkpoint() {
4517     // 1. Initialize the accountability counters.
4518     m_checkpoint_num_txns = 0;
4519 
4520     // 2. Make list of cachefiles to be included in the checkpoint.
4521     m_cf_list->read_lock();
4522     m_cf_list->m_active_fileid.iterate<void *, iterate_note_pin::fn>(nullptr);
4523     m_checkpoint_num_files = m_cf_list->m_active_fileid.size();
4524     m_cf_list->read_unlock();
4525 
4526     // 3. Create log entries for this checkpoint.
4527     if (m_logger) {
4528         this->log_begin_checkpoint();
4529     }
4530 
4531     bjm_reset(m_checkpoint_clones_bjm);
4532 
4533     m_list->write_pending_exp_lock();
4534     m_list->read_list_lock();
4535     m_cf_list->read_lock(); // needed for update_cachefiles
4536     m_list->write_pending_cheap_lock();
4537     // 4. Turn on all the relevant checkpoint pending bits.
4538     this->turn_on_pending_bits();
4539 
4540     // 5.
4541     this->update_cachefiles();
4542     m_list->write_pending_cheap_unlock();
4543     m_cf_list->read_unlock();
4544     m_list->read_list_unlock();
4545     m_list->write_pending_exp_unlock();
4546 }
4547 
4548 struct iterate_log_fassociate {
fniterate_log_fassociate4549     static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
4550         assert(cf->log_fassociate_during_checkpoint);
4551         cf->log_fassociate_during_checkpoint(cf, cf->userdata);
4552         return 0;
4553     }
4554 };
4555 
4556 //
4557 // Assuming the logger exists, this will write out the folloing
4558 // information to the log.
4559 //
4560 // 1. Writes the BEGIN_CHECKPOINT to the log.
4561 // 2. Writes the list of open dictionaries to the log.
4562 // 3. Writes the list of open transactions to the log.
4563 // 4. Writes the list of dicionaries that have had rollback logs suppresed.
4564 //
4565 // NOTE: This also has the side effecto of setting the LSN
4566 // of checkpoint in progress.
4567 //
log_begin_checkpoint()4568 void checkpointer::log_begin_checkpoint() {
4569     int r = 0;
4570 
4571     // Write the BEGIN_CHECKPOINT to the log.
4572     LSN begin_lsn={ .lsn = (uint64_t) -1 }; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
4573     TXN_MANAGER mgr = toku_logger_get_txn_manager(m_logger);
4574     TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
4575     toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid);
4576     m_lsn_of_checkpoint_in_progress = begin_lsn;
4577 
4578     // Log the list of open dictionaries.
4579     m_cf_list->m_active_fileid.iterate<void *, iterate_log_fassociate::fn>(nullptr);
4580 
4581     // Write open transactions to the log.
4582     r = toku_txn_manager_iter_over_live_txns(
4583         m_logger->txn_manager,
4584         log_open_txn,
4585         this
4586         );
4587     assert(r == 0);
4588 }
4589 
4590 //
4591 // Sets the pending bits of EVERY PAIR in the cachetable, regardless of
4592 // whether the PAIR is clean or not. It will be the responsibility of
4593 // end_checkpoint or client threads to simply clear the pending bit
4594 // if the PAIR is clean.
4595 //
4596 // On entry and exit , the pair list's read list lock is grabbed, and
4597 // both pending locks are grabbed
4598 //
turn_on_pending_bits()4599 void checkpointer::turn_on_pending_bits() {
4600     PAIR p = NULL;
4601     uint32_t i;
4602     for (i = 0, p = m_list->m_checkpoint_head; i < m_list->m_n_in_table; i++, p = p->clock_next) {
4603         assert(!p->checkpoint_pending);
4604         //Only include pairs belonging to cachefiles in the checkpoint
4605         if (!p->cachefile->for_checkpoint) {
4606             continue;
4607         }
4608         // Mark everything as pending a checkpoint
4609         //
4610         // The rule for the checkpoint_pending bit is as follows:
4611         //  - begin_checkpoint may set checkpoint_pending to true
4612         //    even though the pair lock on the node is not held.
4613         //  - any thread that wants to clear the pending bit must own
4614         //     the PAIR lock. Otherwise,
4615         //     we may end up clearing the pending bit before the
4616         //     current lock is ever released.
4617         p->checkpoint_pending = true;
4618         if (m_list->m_pending_head) {
4619             m_list->m_pending_head->pending_prev = p;
4620         }
4621         p->pending_next = m_list->m_pending_head;
4622         p->pending_prev = NULL;
4623         m_list->m_pending_head = p;
4624     }
4625     invariant(p == m_list->m_checkpoint_head);
4626 }
4627 
add_background_job()4628 void checkpointer::add_background_job() {
4629     int r = bjm_add_background_job(m_checkpoint_clones_bjm);
4630     assert_zero(r);
4631 }
remove_background_job()4632 void checkpointer::remove_background_job() {
4633     bjm_remove_background_job(m_checkpoint_clones_bjm);
4634 }
4635 
end_checkpoint(void (* testcallback_f)(void *),void * testextra)4636 void checkpointer::end_checkpoint(void (*testcallback_f)(void*),  void* testextra) {
4637     toku::scoped_malloc checkpoint_cfs_buf(m_checkpoint_num_files * sizeof(CACHEFILE));
4638     CACHEFILE *checkpoint_cfs = reinterpret_cast<CACHEFILE *>(checkpoint_cfs_buf.get());
4639 
4640     this->fill_checkpoint_cfs(checkpoint_cfs);
4641     this->checkpoint_pending_pairs();
4642     this->checkpoint_userdata(checkpoint_cfs);
4643     // For testing purposes only.  Dictionary has been fsync-ed to disk but log has not yet been written.
4644     if (testcallback_f) {
4645         testcallback_f(testextra);
4646     }
4647     this->log_end_checkpoint();
4648     this->end_checkpoint_userdata(checkpoint_cfs);
4649 
4650     // Delete list of cachefiles in the checkpoint,
4651     this->remove_cachefiles(checkpoint_cfs);
4652 }
4653 
4654 struct iterate_checkpoint_cfs {
4655     CACHEFILE *checkpoint_cfs;
4656     uint32_t checkpoint_num_files;
4657     uint32_t curr_index;
iterate_checkpoint_cfsiterate_checkpoint_cfs4658     iterate_checkpoint_cfs(CACHEFILE *cfs, uint32_t num_files) :
4659         checkpoint_cfs(cfs), checkpoint_num_files(num_files), curr_index(0) {
4660     }
fniterate_checkpoint_cfs4661     static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_checkpoint_cfs *info) {
4662         if (cf->for_checkpoint) {
4663             assert(info->curr_index < info->checkpoint_num_files);
4664             info->checkpoint_cfs[info->curr_index] = cf;
4665             info->curr_index++;
4666         }
4667         return 0;
4668     }
4669 };
4670 
fill_checkpoint_cfs(CACHEFILE * checkpoint_cfs)4671 void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) {
4672     struct iterate_checkpoint_cfs iterate(checkpoint_cfs, m_checkpoint_num_files);
4673 
4674     m_cf_list->read_lock();
4675     m_cf_list->m_active_fileid.iterate<struct iterate_checkpoint_cfs, iterate_checkpoint_cfs::fn>(&iterate);
4676     assert(iterate.curr_index == m_checkpoint_num_files);
4677     m_cf_list->read_unlock();
4678 }
4679 
checkpoint_pending_pairs()4680 void checkpointer::checkpoint_pending_pairs() {
4681     PAIR p;
4682     m_list->read_list_lock();
4683     while ((p = m_list->m_pending_head)!=0) {
4684         // <CER> TODO: Investigate why we move pending head outisde of the pending_pairs_remove() call.
4685         m_list->m_pending_head = m_list->m_pending_head->pending_next;
4686         m_list->pending_pairs_remove(p);
4687         // if still pending, clear the pending bit and write out the node
4688         pair_lock(p);
4689         m_list->read_list_unlock();
4690         write_pair_for_checkpoint_thread(m_ev, p);
4691         pair_unlock(p);
4692         m_list->read_list_lock();
4693     }
4694     assert(!m_list->m_pending_head);
4695     m_list->read_list_unlock();
4696     bjm_wait_for_jobs_to_finish(m_checkpoint_clones_bjm);
4697 }
4698 
checkpoint_userdata(CACHEFILE * checkpoint_cfs)4699 void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) {
4700     // have just written data blocks, so next write the translation and header for each open dictionary
4701     for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4702         CACHEFILE cf = checkpoint_cfs[i];
4703         assert(cf->for_checkpoint);
4704         assert(cf->checkpoint_userdata);
4705         toku_cachetable_set_checkpointing_user_data_status(1);
4706         cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
4707         toku_cachetable_set_checkpointing_user_data_status(0);
4708     }
4709 }
4710 
log_end_checkpoint()4711 void checkpointer::log_end_checkpoint() {
4712     if (m_logger) {
4713         toku_log_end_checkpoint(m_logger, NULL,
4714                                 1, // want the end_checkpoint to be fsync'd
4715                                 m_lsn_of_checkpoint_in_progress,
4716                                 0,
4717                                 m_checkpoint_num_files,
4718                                 m_checkpoint_num_txns);
4719         toku_logger_note_checkpoint(m_logger, m_lsn_of_checkpoint_in_progress);
4720     }
4721 }
4722 
end_checkpoint_userdata(CACHEFILE * checkpoint_cfs)4723 void checkpointer::end_checkpoint_userdata(CACHEFILE* checkpoint_cfs) {
4724     // everything has been written to file and fsynced
4725     // ... call checkpoint-end function in block translator
4726     //     to free obsolete blocks on disk used by previous checkpoint
4727     //cachefiles_in_checkpoint is protected by the checkpoint_safe_lock
4728     for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4729         CACHEFILE cf = checkpoint_cfs[i];
4730         assert(cf->for_checkpoint);
4731         assert(cf->end_checkpoint_userdata);
4732         cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
4733     }
4734 }
4735 
4736 //
4737 // Deletes all the cachefiles in this checkpointers cachefile list.
4738 //
remove_cachefiles(CACHEFILE * checkpoint_cfs)4739 void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) {
4740     // making this a while loop because note_unpin_by_checkpoint may destroy the cachefile
4741     for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4742         CACHEFILE cf = checkpoint_cfs[i];
4743         // Checking for function existing so that this function
4744         // can be called from cachetable tests.
4745         assert(cf->for_checkpoint);
4746         cf->for_checkpoint = false;
4747         assert(cf->note_unpin_by_checkpoint);
4748         // Clear the bit saying theis file is in the checkpoint.
4749         cf->note_unpin_by_checkpoint(cf, cf->userdata);
4750     }
4751 }
4752 
4753 
4754 ////////////////////////////////////////////////////////
4755 //
4756 // cachefiles list
4757 //
4758 static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD");
4759 
init()4760 void cachefile_list::init() {
4761     m_next_filenum_to_use.fileid = 0;
4762     m_next_hash_id_to_use = 0;
4763     toku_pthread_rwlock_init(*cachetable_m_lock_key, &m_lock, nullptr);
4764     m_active_filenum.create();
4765     m_active_fileid.create();
4766     m_stale_fileid.create();
4767 }
4768 
destroy()4769 void cachefile_list::destroy() {
4770     m_active_filenum.destroy();
4771     m_active_fileid.destroy();
4772     m_stale_fileid.destroy();
4773     toku_pthread_rwlock_destroy(&m_lock);
4774 }
4775 
read_lock()4776 void cachefile_list::read_lock() {
4777     toku_pthread_rwlock_rdlock(&m_lock);
4778 }
4779 
read_unlock()4780 void cachefile_list::read_unlock() {
4781     toku_pthread_rwlock_rdunlock(&m_lock);
4782 }
4783 
write_lock()4784 void cachefile_list::write_lock() {
4785     toku_pthread_rwlock_wrlock(&m_lock);
4786 }
4787 
write_unlock()4788 void cachefile_list::write_unlock() {
4789     toku_pthread_rwlock_wrunlock(&m_lock);
4790 }
4791 
4792 struct iterate_find_iname {
4793     const char *iname_in_env;
4794     CACHEFILE found_cf;
iterate_find_inameiterate_find_iname4795     iterate_find_iname(const char *iname) : iname_in_env(iname), found_cf(nullptr) { }
fniterate_find_iname4796     static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_find_iname *info) {
4797         if (cf->fname_in_env && strcmp(cf->fname_in_env, info->iname_in_env) == 0) {
4798             info->found_cf = cf;
4799             return -1;
4800         }
4801         return 0;
4802     }
4803 };
4804 
cachefile_of_iname_in_env(const char * iname_in_env,CACHEFILE * cf)4805 int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) {
4806     struct iterate_find_iname iterate(iname_in_env);
4807 
4808     read_lock();
4809     int r = m_active_fileid.iterate<iterate_find_iname, iterate_find_iname::fn>(&iterate);
4810     if (iterate.found_cf != nullptr) {
4811         assert(strcmp(iterate.found_cf->fname_in_env, iname_in_env) == 0);
4812         *cf = iterate.found_cf;
4813         r = 0;
4814     } else {
4815         r = ENOENT;
4816     }
4817     read_unlock();
4818     return r;
4819 }
4820 
cachefile_find_by_filenum(const CACHEFILE & a_cf,const FILENUM & b)4821 static int cachefile_find_by_filenum(const CACHEFILE &a_cf, const FILENUM &b) {
4822     const FILENUM a = a_cf->filenum;
4823     if (a.fileid < b.fileid) {
4824         return -1;
4825     } else if (a.fileid == b.fileid) {
4826         return 0;
4827     } else {
4828         return 1;
4829     }
4830 }
4831 
cachefile_of_filenum(FILENUM filenum,CACHEFILE * cf)4832 int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
4833     read_lock();
4834     int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, cf, nullptr);
4835     if (r == DB_NOTFOUND) {
4836         r = ENOENT;
4837     } else {
4838         invariant_zero(r);
4839     }
4840     read_unlock();
4841     return r;
4842 }
4843 
cachefile_find_by_fileid(const CACHEFILE & a_cf,const struct fileid & b)4844 static int cachefile_find_by_fileid(const CACHEFILE &a_cf, const struct fileid &b) {
4845     return toku_fileid_cmp(a_cf->fileid, b);
4846 }
4847 
add_cf_unlocked(CACHEFILE cf)4848 void cachefile_list::add_cf_unlocked(CACHEFILE cf) {
4849     int r;
4850     r = m_active_filenum.insert<FILENUM, cachefile_find_by_filenum>(cf, cf->filenum, nullptr);
4851     assert_zero(r);
4852     r = m_active_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr);
4853     assert_zero(r);
4854 }
4855 
add_stale_cf(CACHEFILE cf)4856 void cachefile_list::add_stale_cf(CACHEFILE cf) {
4857     write_lock();
4858     int r = m_stale_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr);
4859     assert_zero(r);
4860     write_unlock();
4861 }
4862 
remove_cf(CACHEFILE cf)4863 void cachefile_list::remove_cf(CACHEFILE cf) {
4864     write_lock();
4865 
4866     uint32_t idx;
4867     int r;
4868     r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(cf->filenum, nullptr, &idx);
4869     assert_zero(r);
4870     r = m_active_filenum.delete_at(idx);
4871     assert_zero(r);
4872 
4873     r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx);
4874     assert_zero(r);
4875     r = m_active_fileid.delete_at(idx);
4876     assert_zero(r);
4877 
4878     write_unlock();
4879 }
4880 
remove_stale_cf_unlocked(CACHEFILE cf)4881 void cachefile_list::remove_stale_cf_unlocked(CACHEFILE cf) {
4882     uint32_t idx;
4883     int r;
4884     r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx);
4885     assert_zero(r);
4886     r = m_stale_fileid.delete_at(idx);
4887     assert_zero(r);
4888 }
4889 
reserve_filenum()4890 FILENUM cachefile_list::reserve_filenum() {
4891     // taking a write lock because we are modifying next_filenum_to_use
4892     FILENUM filenum = FILENUM_NONE;
4893     write_lock();
4894     while (1) {
4895         int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(m_next_filenum_to_use, nullptr, nullptr);
4896         if (r == 0) {
4897             m_next_filenum_to_use.fileid++;
4898             continue;
4899         }
4900         assert(r == DB_NOTFOUND);
4901 
4902         // skip the reserved value UINT32_MAX and wrap around to zero
4903         if (m_next_filenum_to_use.fileid == FILENUM_NONE.fileid) {
4904             m_next_filenum_to_use.fileid = 0;
4905             continue;
4906         }
4907 
4908         filenum = m_next_filenum_to_use;
4909         m_next_filenum_to_use.fileid++;
4910         break;
4911     }
4912     write_unlock();
4913     return filenum;
4914 }
4915 
get_new_hash_id_unlocked()4916 uint32_t cachefile_list::get_new_hash_id_unlocked() {
4917     uint32_t retval = m_next_hash_id_to_use;
4918     m_next_hash_id_to_use++;
4919     return retval;
4920 }
4921 
find_cachefile_unlocked(struct fileid * fileid)4922 CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) {
4923     CACHEFILE cf = nullptr;
4924     int r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
4925     if (r == 0) {
4926         assert(!cf->unlink_on_close);
4927     }
4928     return cf;
4929 }
4930 
find_stale_cachefile_unlocked(struct fileid * fileid)4931 CACHEFILE cachefile_list::find_stale_cachefile_unlocked(struct fileid* fileid) {
4932     CACHEFILE cf = nullptr;
4933     int r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
4934     if (r == 0) {
4935         assert(!cf->unlink_on_close);
4936     }
4937     return cf;
4938 }
4939 
verify_unused_filenum(FILENUM filenum)4940 void cachefile_list::verify_unused_filenum(FILENUM filenum) {
4941     int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, nullptr, nullptr);
4942     assert(r == DB_NOTFOUND);
4943 }
4944 
4945 // returns true if some eviction ran, false otherwise
evict_some_stale_pair(evictor * ev)4946 bool cachefile_list::evict_some_stale_pair(evictor* ev) {
4947     write_lock();
4948     if (m_stale_fileid.size() == 0) {
4949         write_unlock();
4950         return false;
4951     }
4952 
4953     CACHEFILE stale_cf = nullptr;
4954     int r = m_stale_fileid.fetch(0, &stale_cf);
4955     assert_zero(r);
4956 
4957     // we should not have a cf in the stale list
4958     // that does not have any pairs
4959     PAIR p = stale_cf->cf_head;
4960     paranoid_invariant(p != NULL);
4961     evict_pair_from_cachefile(p);
4962 
4963     // now that we have evicted something,
4964     // let's check if the cachefile is needed anymore
4965     //
4966     // it is not needed if the latest eviction caused
4967     // the cf_head for that cf to become null
4968     bool destroy_cf = stale_cf->cf_head == nullptr;
4969     if (destroy_cf) {
4970         remove_stale_cf_unlocked(stale_cf);
4971     }
4972 
4973     write_unlock();
4974 
4975     ev->remove_pair_attr(p->attr);
4976     cachetable_free_pair(p);
4977     if (destroy_cf) {
4978         cachefile_destroy(stale_cf);
4979     }
4980     return true;
4981 }
4982 
free_stale_data(evictor * ev)4983 void cachefile_list::free_stale_data(evictor* ev) {
4984     write_lock();
4985     while (m_stale_fileid.size() != 0) {
4986         CACHEFILE stale_cf = nullptr;
4987         int r = m_stale_fileid.fetch(0, &stale_cf);
4988         assert_zero(r);
4989 
4990         // we should not have a cf in the stale list
4991         // that does not have any pairs
4992         PAIR p = stale_cf->cf_head;
4993         paranoid_invariant(p != NULL);
4994 
4995         evict_pair_from_cachefile(p);
4996         ev->remove_pair_attr(p->attr);
4997         cachetable_free_pair(p);
4998 
4999         // now that we have evicted something,
5000         // let's check if the cachefile is needed anymore
5001         if (stale_cf->cf_head == NULL) {
5002             remove_stale_cf_unlocked(stale_cf);
5003             cachefile_destroy(stale_cf);
5004         }
5005     }
5006     write_unlock();
5007 }
5008 
5009 void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void);
5010 void
toku_cachetable_helgrind_ignore(void)5011 toku_cachetable_helgrind_ignore(void) {
5012     TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss);
5013     TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
5014     TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
5015     TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions);
5016     TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions);
5017     TOKU_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status);
5018 }
5019