1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include <my_global.h>
40 #include <string.h>
41 #include <time.h>
42 #include <stdarg.h>
43
44 #include <portability/memory.h>
45 #include <portability/toku_race_tools.h>
46 #include <portability/toku_atomic.h>
47 #include <portability/toku_pthread.h>
48 #include <portability/toku_portability.h>
49 #include <portability/toku_stdlib.h>
50 #include <portability/toku_time.h>
51
52 #include "ft/cachetable/cachetable.h"
53 #include "ft/cachetable/cachetable-internal.h"
54 #include "ft/cachetable/checkpoint.h"
55 #include "ft/logger/log-internal.h"
56 #include "util/rwlock.h"
57 #include "util/scoped_malloc.h"
58 #include "util/status.h"
59 #include "util/context.h"
60
61 toku_instr_key *cachetable_m_mutex_key;
62 toku_instr_key *cachetable_ev_thread_lock_mutex_key;
63
64 toku_instr_key *cachetable_m_list_lock_key;
65 toku_instr_key *cachetable_m_pending_lock_expensive_key;
66 toku_instr_key *cachetable_m_pending_lock_cheap_key;
67 toku_instr_key *cachetable_m_lock_key;
68
69 toku_instr_key *cachetable_value_key;
70 toku_instr_key *cachetable_disk_nb_rwlock_key;
71
72 toku_instr_key *cachetable_p_refcount_wait_key;
73 toku_instr_key *cachetable_m_flow_control_cond_key;
74 toku_instr_key *cachetable_m_ev_thread_cond_key;
75
76 toku_instr_key *cachetable_disk_nb_mutex_key;
77 toku_instr_key *log_internal_lock_mutex_key;
78 toku_instr_key *eviction_thread_key;
79
80 ///////////////////////////////////////////////////////////////////////////////////
81 // Engine status
82 //
83 // Status is intended for display to humans to help understand system behavior.
84 // It does not need to be perfectly thread-safe.
85
86 // These should be in the cachetable object, but we make them file-wide so that gdb can get them easily.
87 // They were left here after engine status cleanup (#2949, rather than moved into the status struct)
88 // so they are still easily available to the debugger and to save lots of typing.
89 static uint64_t cachetable_miss;
90 static uint64_t cachetable_misstime; // time spent waiting for disk read
91 static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
92 static uint64_t cachetable_evictions;
93 static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed
94
95
96 // Note, toku_cachetable_get_status() is below, after declaration of cachetable.
97
98 static void * const zero_value = nullptr;
99 static PAIR_ATTR const zero_attr = {
100 .size = 0,
101 .nonleaf_size = 0,
102 .leaf_size = 0,
103 .rollback_size = 0,
104 .cache_pressure_size = 0,
105 .is_valid = true
106 };
107
108
ctpair_destroy(PAIR p)109 static inline void ctpair_destroy(PAIR p) {
110 p->value_rwlock.deinit();
111 paranoid_invariant(p->refcount == 0);
112 nb_mutex_destroy(&p->disk_nb_mutex);
113 toku_cond_destroy(&p->refcount_wait);
114 toku_free(p);
115 }
116
pair_lock(PAIR p)117 static inline void pair_lock(PAIR p) {
118 toku_mutex_lock(p->mutex);
119 }
120
pair_unlock(PAIR p)121 static inline void pair_unlock(PAIR p) {
122 toku_mutex_unlock(p->mutex);
123 }
124
125 // adds a reference to the PAIR
126 // on input and output, PAIR mutex is held
pair_add_ref_unlocked(PAIR p)127 static void pair_add_ref_unlocked(PAIR p) {
128 p->refcount++;
129 }
130
131 // releases a reference to the PAIR
132 // on input and output, PAIR mutex is held
pair_release_ref_unlocked(PAIR p)133 static void pair_release_ref_unlocked(PAIR p) {
134 paranoid_invariant(p->refcount > 0);
135 p->refcount--;
136 if (p->refcount == 0 && p->num_waiting_on_refs > 0) {
137 toku_cond_broadcast(&p->refcount_wait);
138 }
139 }
140
pair_wait_for_ref_release_unlocked(PAIR p)141 static void pair_wait_for_ref_release_unlocked(PAIR p) {
142 p->num_waiting_on_refs++;
143 while (p->refcount > 0) {
144 toku_cond_wait(&p->refcount_wait, p->mutex);
145 }
146 p->num_waiting_on_refs--;
147 }
148
toku_ctpair_is_write_locked(PAIR pair)149 bool toku_ctpair_is_write_locked(PAIR pair) {
150 return pair->value_rwlock.writers() == 1;
151 }
152
153 void
toku_cachetable_get_status(CACHETABLE ct,CACHETABLE_STATUS statp)154 toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
155 ct_status.init();
156 CT_STATUS_VAL(CT_MISS) = cachetable_miss;
157 CT_STATUS_VAL(CT_MISSTIME) = cachetable_misstime;
158 CT_STATUS_VAL(CT_PREFETCHES) = cachetable_prefetches;
159 CT_STATUS_VAL(CT_EVICTIONS) = cachetable_evictions;
160 CT_STATUS_VAL(CT_CLEANER_EXECUTIONS) = cleaner_executions;
161 CT_STATUS_VAL(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct);
162 CT_STATUS_VAL(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct);
163 toku_kibbutz_get_status(ct->client_kibbutz,
164 &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS),
165 &CT_STATUS_VAL(CT_POOL_CLIENT_NUM_THREADS_ACTIVE),
166 &CT_STATUS_VAL(CT_POOL_CLIENT_QUEUE_SIZE),
167 &CT_STATUS_VAL(CT_POOL_CLIENT_MAX_QUEUE_SIZE),
168 &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_ITEMS_PROCESSED),
169 &CT_STATUS_VAL(CT_POOL_CLIENT_TOTAL_EXECUTION_TIME));
170 toku_kibbutz_get_status(ct->ct_kibbutz,
171 &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS),
172 &CT_STATUS_VAL(CT_POOL_CACHETABLE_NUM_THREADS_ACTIVE),
173 &CT_STATUS_VAL(CT_POOL_CACHETABLE_QUEUE_SIZE),
174 &CT_STATUS_VAL(CT_POOL_CACHETABLE_MAX_QUEUE_SIZE),
175 &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_ITEMS_PROCESSED),
176 &CT_STATUS_VAL(CT_POOL_CACHETABLE_TOTAL_EXECUTION_TIME));
177 toku_kibbutz_get_status(ct->checkpointing_kibbutz,
178 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS),
179 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_NUM_THREADS_ACTIVE),
180 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_QUEUE_SIZE),
181 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_MAX_QUEUE_SIZE),
182 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_ITEMS_PROCESSED),
183 &CT_STATUS_VAL(CT_POOL_CHECKPOINT_TOTAL_EXECUTION_TIME));
184 ct->ev.fill_engine_status();
185 *statp = ct_status;
186 }
187
188 // FIXME global with no toku prefix
remove_background_job_from_cf(CACHEFILE cf)189 void remove_background_job_from_cf(CACHEFILE cf)
190 {
191 bjm_remove_background_job(cf->bjm);
192 }
193
194 // FIXME global with no toku prefix
cachefile_kibbutz_enq(CACHEFILE cf,void (* f)(void *),void * extra)195 void cachefile_kibbutz_enq (CACHEFILE cf, void (*f)(void*), void *extra)
196 // The function f must call remove_background_job_from_cf when it completes
197 {
198 int r = bjm_add_background_job(cf->bjm);
199 // if client is adding a background job, then it must be done
200 // at a time when the manager is accepting background jobs, otherwise
201 // the client is screwing up
202 assert_zero(r);
203 toku_kibbutz_enq(cf->cachetable->client_kibbutz, f, extra);
204 }
205
206 static int
checkpoint_thread(void * checkpointer_v)207 checkpoint_thread (void *checkpointer_v)
208 // Effect: If checkpoint_period>0 thn periodically run a checkpoint.
209 // If someone changes the checkpoint_period (calling toku_set_checkpoint_period), then the checkpoint will run sooner or later.
210 // If someone sets the checkpoint_shutdown boolean , then this thread exits.
211 // This thread notices those changes by waiting on a condition variable.
212 {
213 CHECKPOINTER CAST_FROM_VOIDP(cp, checkpointer_v);
214 int r = toku_checkpoint(cp, cp->get_logger(), NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT);
215 invariant_zero(r);
216 return r;
217 }
218
toku_set_checkpoint_period(CACHETABLE ct,uint32_t new_period)219 void toku_set_checkpoint_period (CACHETABLE ct, uint32_t new_period) {
220 ct->cp.set_checkpoint_period(new_period);
221 }
222
toku_get_checkpoint_period_unlocked(CACHETABLE ct)223 uint32_t toku_get_checkpoint_period_unlocked (CACHETABLE ct) {
224 return ct->cp.get_checkpoint_period();
225 }
226
toku_set_cleaner_period(CACHETABLE ct,uint32_t new_period)227 void toku_set_cleaner_period (CACHETABLE ct, uint32_t new_period) {
228 if(force_recovery) {
229 return;
230 }
231 ct->cl.set_period(new_period);
232 }
233
toku_get_cleaner_period_unlocked(CACHETABLE ct)234 uint32_t toku_get_cleaner_period_unlocked (CACHETABLE ct) {
235 return ct->cl.get_period_unlocked();
236 }
237
toku_set_cleaner_iterations(CACHETABLE ct,uint32_t new_iterations)238 void toku_set_cleaner_iterations (CACHETABLE ct, uint32_t new_iterations) {
239 ct->cl.set_iterations(new_iterations);
240 }
241
toku_get_cleaner_iterations(CACHETABLE ct)242 uint32_t toku_get_cleaner_iterations (CACHETABLE ct) {
243 return ct->cl.get_iterations();
244 }
245
toku_get_cleaner_iterations_unlocked(CACHETABLE ct)246 uint32_t toku_get_cleaner_iterations_unlocked (CACHETABLE ct) {
247 return ct->cl.get_iterations();
248 }
249
toku_set_enable_partial_eviction(CACHETABLE ct,bool enabled)250 void toku_set_enable_partial_eviction (CACHETABLE ct, bool enabled) {
251 ct->ev.set_enable_partial_eviction(enabled);
252 }
253
toku_get_enable_partial_eviction(CACHETABLE ct)254 bool toku_get_enable_partial_eviction (CACHETABLE ct) {
255 return ct->ev.get_enable_partial_eviction();
256 }
257
258 // reserve 25% as "unreservable". The loader cannot have it.
259 #define unreservable_memory(size) ((size)/4)
260
toku_cachetable_create_ex(CACHETABLE * ct_result,long size_limit,unsigned long client_pool_threads,unsigned long cachetable_pool_threads,unsigned long checkpoint_pool_threads,LSN UU (initial_lsn),TOKULOGGER logger)261 int toku_cachetable_create_ex(CACHETABLE *ct_result, long size_limit,
262 unsigned long client_pool_threads,
263 unsigned long cachetable_pool_threads,
264 unsigned long checkpoint_pool_threads,
265 LSN UU(initial_lsn), TOKULOGGER logger) {
266 int result = 0;
267 int r;
268
269 if (size_limit == 0) {
270 size_limit = 128*1024*1024;
271 }
272
273 CACHETABLE XCALLOC(ct);
274 ct->list.init();
275 ct->cf_list.init();
276
277 int num_processors = toku_os_get_number_active_processors();
278 int checkpointing_nworkers = (num_processors/4) ? num_processors/4 : 1;
279 r = toku_kibbutz_create(client_pool_threads ? client_pool_threads : num_processors,
280 &ct->client_kibbutz);
281 if (r != 0) {
282 result = r;
283 goto cleanup;
284 }
285 r = toku_kibbutz_create(cachetable_pool_threads ? cachetable_pool_threads : 2*num_processors,
286 &ct->ct_kibbutz);
287 if (r != 0) {
288 result = r;
289 goto cleanup;
290 }
291 r = toku_kibbutz_create(checkpoint_pool_threads ? checkpoint_pool_threads : checkpointing_nworkers,
292 &ct->checkpointing_kibbutz);
293 if (r != 0) {
294 result = r;
295 goto cleanup;
296 }
297 // must be done after creating ct_kibbutz
298 r = ct->ev.init(size_limit, &ct->list, &ct->cf_list, ct->ct_kibbutz, EVICTION_PERIOD);
299 if (r != 0) {
300 result = r;
301 goto cleanup;
302 }
303 r = ct->cp.init(&ct->list, logger, &ct->ev, &ct->cf_list);
304 if (r != 0) {
305 result = r;
306 goto cleanup;
307 }
308 r = ct->cl.init(1, &ct->list, ct); // by default, start with one iteration
309 if (r != 0) {
310 result = r;
311 goto cleanup;
312 }
313 ct->env_dir = toku_xstrdup(".");
314 cleanup:
315 if (result == 0) {
316 *ct_result = ct;
317 } else {
318 toku_cachetable_close(&ct);
319 }
320 return result;
321 }
322
323 // Returns a pointer to the checkpoint contained within
324 // the given cachetable.
toku_cachetable_get_checkpointer(CACHETABLE ct)325 CHECKPOINTER toku_cachetable_get_checkpointer(CACHETABLE ct) {
326 return &ct->cp;
327 }
328
toku_cachetable_reserve_memory(CACHETABLE ct,double fraction,uint64_t upper_bound)329 uint64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction, uint64_t upper_bound) {
330 uint64_t reserved_memory = ct->ev.reserve_memory(fraction, upper_bound);
331 return reserved_memory;
332 }
333
toku_cachetable_release_reserved_memory(CACHETABLE ct,uint64_t reserved_memory)334 void toku_cachetable_release_reserved_memory(CACHETABLE ct, uint64_t reserved_memory) {
335 ct->ev.release_reserved_memory(reserved_memory);
336 }
337
338 void
toku_cachetable_set_env_dir(CACHETABLE ct,const char * env_dir)339 toku_cachetable_set_env_dir(CACHETABLE ct, const char *env_dir) {
340 toku_free(ct->env_dir);
341 ct->env_dir = toku_xstrdup(env_dir);
342 }
343
344 // What cachefile goes with particular iname (iname relative to env)?
345 // The transaction that is adding the reference might not have a reference
346 // to the ft, therefore the cachefile might be closing.
347 // If closing, we want to return that it is not there, but must wait till after
348 // the close has finished.
349 // Once the close has finished, there must not be a cachefile with that name
350 // in the cachetable.
toku_cachefile_of_iname_in_env(CACHETABLE ct,const char * iname_in_env,CACHEFILE * cf)351 int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CACHEFILE *cf) {
352 return ct->cf_list.cachefile_of_iname_in_env(iname_in_env, cf);
353 }
354
355 // What cachefile goes with particular fd?
356 // This function can only be called if the ft is still open, so file must
357 // still be open
toku_cachefile_of_filenum(CACHETABLE ct,FILENUM filenum,CACHEFILE * cf)358 int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
359 return ct->cf_list.cachefile_of_filenum(filenum, cf);
360 }
361
362 // TEST-ONLY function
363 // If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
toku_cachetable_openfd(CACHEFILE * cfptr,CACHETABLE ct,int fd,const char * fname_in_env)364 int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname_in_env) {
365 FILENUM filenum = toku_cachetable_reserve_filenum(ct);
366 bool was_open;
367 return toku_cachetable_openfd_with_filenum(cfptr, ct, fd, fname_in_env, filenum, &was_open);
368 }
369
370 // Get a unique filenum from the cachetable
371 FILENUM
toku_cachetable_reserve_filenum(CACHETABLE ct)372 toku_cachetable_reserve_filenum(CACHETABLE ct) {
373 return ct->cf_list.reserve_filenum();
374 }
375
create_new_cachefile(CACHETABLE ct,FILENUM filenum,uint32_t hash_id,int fd,const char * fname_in_env,struct fileid fileid,CACHEFILE * cfptr)376 static void create_new_cachefile(
377 CACHETABLE ct,
378 FILENUM filenum,
379 uint32_t hash_id,
380 int fd,
381 const char *fname_in_env,
382 struct fileid fileid,
383 CACHEFILE *cfptr
384 ) {
385 // File is not open. Make a new cachefile.
386 CACHEFILE newcf = NULL;
387 XCALLOC(newcf);
388 newcf->cachetable = ct;
389 newcf->hash_id = hash_id;
390 newcf->fileid = fileid;
391
392 newcf->filenum = filenum;
393 newcf->fd = fd;
394 newcf->fname_in_env = toku_xstrdup(fname_in_env);
395 bjm_init(&newcf->bjm);
396 *cfptr = newcf;
397 }
398
toku_cachetable_openfd_with_filenum(CACHEFILE * cfptr,CACHETABLE ct,int fd,const char * fname_in_env,FILENUM filenum,bool * was_open)399 int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd,
400 const char *fname_in_env,
401 FILENUM filenum, bool* was_open) {
402 int r;
403 CACHEFILE newcf;
404 struct fileid fileid;
405
406 assert(filenum.fileid != FILENUM_NONE.fileid);
407 r = toku_os_get_unique_file_id(fd, &fileid);
408 if (r != 0) {
409 r = get_error_errno();
410 close(fd);
411 return r;
412 }
413 ct->cf_list.write_lock();
414 CACHEFILE existing_cf = ct->cf_list.find_cachefile_unlocked(&fileid);
415 if (existing_cf) {
416 *was_open = true;
417 // Reuse an existing cachefile and close the caller's fd, whose
418 // responsibility has been passed to us.
419 r = close(fd);
420 assert(r == 0);
421 *cfptr = existing_cf;
422 r = 0;
423 goto exit;
424 }
425 *was_open = false;
426 ct->cf_list.verify_unused_filenum(filenum);
427 // now let's try to find it in the stale cachefiles
428 existing_cf = ct->cf_list.find_stale_cachefile_unlocked(&fileid);
429 // found the stale file,
430 if (existing_cf) {
431 // fix up the fields in the cachefile
432 existing_cf->filenum = filenum;
433 existing_cf->fd = fd;
434 existing_cf->fname_in_env = toku_xstrdup(fname_in_env);
435 bjm_init(&existing_cf->bjm);
436
437 // now we need to move all the PAIRs in it back into the cachetable
438 ct->list.write_list_lock();
439 for (PAIR curr_pair = existing_cf->cf_head; curr_pair; curr_pair = curr_pair->cf_next) {
440 pair_lock(curr_pair);
441 ct->list.add_to_cachetable_only(curr_pair);
442 pair_unlock(curr_pair);
443 }
444 ct->list.write_list_unlock();
445 // move the cachefile back to the list of active cachefiles
446 ct->cf_list.remove_stale_cf_unlocked(existing_cf);
447 ct->cf_list.add_cf_unlocked(existing_cf);
448 *cfptr = existing_cf;
449 r = 0;
450 goto exit;
451 }
452
453 create_new_cachefile(
454 ct,
455 filenum,
456 ct->cf_list.get_new_hash_id_unlocked(),
457 fd,
458 fname_in_env,
459 fileid,
460 &newcf
461 );
462
463 ct->cf_list.add_cf_unlocked(newcf);
464
465 *cfptr = newcf;
466 r = 0;
467 exit:
468 ct->cf_list.write_unlock();
469 return r;
470 }
471
472 static void cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, bool evict_completely);
473
474 //TEST_ONLY_FUNCTION
toku_cachetable_openf(CACHEFILE * cfptr,CACHETABLE ct,const char * fname_in_env,int flags,mode_t mode)475 int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname_in_env, int flags, mode_t mode) {
476 char *fname_in_cwd = toku_construct_full_name(2, ct->env_dir, fname_in_env);
477 int fd = open(fname_in_cwd, flags+O_BINARY, mode);
478 int r;
479 if (fd < 0) {
480 r = get_error_errno();
481 } else {
482 r = toku_cachetable_openfd (cfptr, ct, fd, fname_in_env);
483 }
484 toku_free(fname_in_cwd);
485 return r;
486 }
487
488 char *
toku_cachefile_fname_in_env(CACHEFILE cf)489 toku_cachefile_fname_in_env (CACHEFILE cf) {
490 if (cf) {
491 return cf->fname_in_env;
492 }
493 return nullptr;
494 }
495
toku_cachefile_set_fname_in_env(CACHEFILE cf,char * new_fname_in_env)496 void toku_cachefile_set_fname_in_env(CACHEFILE cf, char *new_fname_in_env) {
497 cf->fname_in_env = new_fname_in_env;
498 }
499
500 int
toku_cachefile_get_fd(CACHEFILE cf)501 toku_cachefile_get_fd (CACHEFILE cf) {
502 return cf->fd;
503 }
504
cachefile_destroy(CACHEFILE cf)505 static void cachefile_destroy(CACHEFILE cf) {
506 if (cf->free_userdata) {
507 cf->free_userdata(cf, cf->userdata);
508 }
509 toku_free(cf);
510 }
511
toku_cachefile_close(CACHEFILE * cfp,bool oplsn_valid,LSN oplsn)512 void toku_cachefile_close(CACHEFILE *cfp, bool oplsn_valid, LSN oplsn) {
513 CACHEFILE cf = *cfp;
514 CACHETABLE ct = cf->cachetable;
515
516 bjm_wait_for_jobs_to_finish(cf->bjm);
517
518 // Clients should never attempt to close a cachefile that is being
519 // checkpointed. We notify clients this is happening in the
520 // note_pin_by_checkpoint callback.
521 assert(!cf->for_checkpoint);
522
523 // Flush the cachefile and remove all of its pairs from the cachetable,
524 // but keep the PAIRs linked in the cachefile. We will store the cachefile
525 // away in case it gets opened immedietely
526 //
527 // if we are unlinking on close, then we want to evict completely,
528 // otherwise, we will keep the PAIRs and cachefile around in case
529 // a subsequent open comes soon
530 cachetable_flush_cachefile(ct, cf, cf->unlink_on_close);
531
532 // Call the close userdata callback to notify the client this cachefile
533 // and its underlying file are going to be closed
534 if (cf->close_userdata) {
535 cf->close_userdata(cf, cf->fd, cf->userdata, oplsn_valid, oplsn);
536 }
537 // fsync and close the fd.
538 toku_file_fsync_without_accounting(cf->fd);
539 int r = close(cf->fd);
540 assert(r == 0);
541 cf->fd = -1;
542
543 // destroy the parts of the cachefile
544 // that do not persist across opens/closes
545 bjm_destroy(cf->bjm);
546 cf->bjm = NULL;
547
548 // remove the cf from the list of active cachefiles
549 ct->cf_list.remove_cf(cf);
550 cf->filenum = FILENUM_NONE;
551
552 // Unlink the file if the bit was set
553 if (cf->unlink_on_close) {
554 char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(cf->cachetable, cf->fname_in_env);
555 r = unlink(fname_in_cwd);
556 assert_zero(r);
557 toku_free(fname_in_cwd);
558 }
559 toku_free(cf->fname_in_env);
560 cf->fname_in_env = NULL;
561
562 // we destroy the cf if the unlink bit was set or if no PAIRs exist
563 // if no PAIRs exist, there is no sense in keeping the cachefile around
564 bool destroy_cf = cf->unlink_on_close || (cf->cf_head == NULL);
565 if (destroy_cf) {
566 cachefile_destroy(cf);
567 }
568 else {
569 ct->cf_list.add_stale_cf(cf);
570 }
571 }
572
573 // This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c
574 // The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
575 // Instead we can use a bitmask on a table of size power of two.
576 // This hash function does yield improved performance on ./db-benchmark-test-tokudb and ./scanscan
rot(uint32_t x,uint32_t k)577 static inline uint32_t rot(uint32_t x, uint32_t k) {
578 return (x<<k) | (x>>(32-k));
579 }
final(uint32_t a,uint32_t b,uint32_t c)580 static inline uint32_t final (uint32_t a, uint32_t b, uint32_t c) {
581 c ^= b; c -= rot(b,14);
582 a ^= c; a -= rot(c,11);
583 b ^= a; b -= rot(a,25);
584 c ^= b; c -= rot(b,16);
585 a ^= c; a -= rot(c,4);
586 b ^= a; b -= rot(a,14);
587 c ^= b; c -= rot(b,24);
588 return c;
589 }
590
toku_cachetable_hash(CACHEFILE cachefile,BLOCKNUM key)591 uint32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key)
592 // Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
593 {
594 return final(cachefile->hash_id, (uint32_t)(key.b>>32), (uint32_t)key.b);
595 }
596
597 #define CLOCK_SATURATION 15
598 #define CLOCK_INITIAL_COUNT 3
599
600 // Requires pair's mutex to be held
pair_touch(PAIR p)601 static void pair_touch (PAIR p) {
602 p->count = (p->count < CLOCK_SATURATION) ? p->count+1 : CLOCK_SATURATION;
603 }
604
605 // Remove a pair from the cachetable, requires write list lock to be held and p->mutex to be held
606 // Effects: the pair is removed from the LRU list and from the cachetable's hash table.
607 // The size of the objects in the cachetable is adjusted by the size of the pair being
608 // removed.
cachetable_remove_pair(pair_list * list,evictor * ev,PAIR p)609 static void cachetable_remove_pair (pair_list* list, evictor* ev, PAIR p) {
610 list->evict_completely(p);
611 ev->remove_pair_attr(p->attr);
612 }
613
cachetable_free_pair(PAIR p)614 static void cachetable_free_pair(PAIR p) {
615 CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
616 CACHEKEY key = p->key;
617 void *value = p->value_data;
618 void* disk_data = p->disk_data;
619 void *write_extraargs = p->write_extraargs;
620 PAIR_ATTR old_attr = p->attr;
621
622 cachetable_evictions++;
623 PAIR_ATTR new_attr = p->attr;
624 // Note that flush_callback is called with write_me false, so the only purpose of this
625 // call is to tell the ft layer to evict the node (keep_me is false).
626 // Also, because we have already removed the PAIR from the cachetable in
627 // cachetable_remove_pair, we cannot pass in p->cachefile and p->cachefile->fd
628 // for the first two parameters, as these may be invalid (#5171), so, we
629 // pass in NULL and -1, dummy values
630 flush_callback(NULL, -1, key, value, &disk_data, write_extraargs, old_attr, &new_attr, false, false, true, false);
631
632 ctpair_destroy(p);
633 }
634
635 // assumes value_rwlock and disk_nb_mutex held on entry
636 // responsibility of this function is to only write a locked PAIR to disk
637 // and NOTHING else. We do not manipulate the state of the PAIR
638 // of the cachetable here (with the exception of ct->size_current for clones)
639 //
640 // No pair_list lock should be held, and the PAIR mutex should not be held
641 //
cachetable_only_write_locked_data(evictor * ev,PAIR p,bool for_checkpoint,PAIR_ATTR * new_attr,bool is_clone)642 static void cachetable_only_write_locked_data(
643 evictor* ev,
644 PAIR p,
645 bool for_checkpoint,
646 PAIR_ATTR* new_attr,
647 bool is_clone
648 )
649 {
650 CACHETABLE_FLUSH_CALLBACK flush_callback = p->flush_callback;
651 CACHEFILE cachefile = p->cachefile;
652 CACHEKEY key = p->key;
653 void *value = is_clone ? p->cloned_value_data : p->value_data;
654 void *disk_data = p->disk_data;
655 void *write_extraargs = p->write_extraargs;
656 PAIR_ATTR old_attr;
657 // we do this for drd. If we are a cloned pair and only
658 // have the disk_nb_mutex, it is a race to access p->attr.
659 // Luckily, old_attr here is only used for some test applications,
660 // so inaccurate non-size fields are ok.
661 if (is_clone) {
662 old_attr = make_pair_attr(p->cloned_value_size);
663 }
664 else {
665 old_attr = p->attr;
666 }
667 bool dowrite = true;
668
669 // write callback
670 flush_callback(
671 cachefile,
672 cachefile->fd,
673 key,
674 value,
675 &disk_data,
676 write_extraargs,
677 old_attr,
678 new_attr,
679 dowrite,
680 is_clone ? false : true, // keep_me (only keep if this is not cloned pointer)
681 for_checkpoint,
682 is_clone //is_clone
683 );
684 p->disk_data = disk_data;
685 if (is_clone) {
686 p->cloned_value_data = NULL;
687 ev->remove_cloned_data_size(p->cloned_value_size);
688 p->cloned_value_size = 0;
689 }
690 }
691
692
693 //
694 // This function writes a PAIR's value out to disk. Currently, it is called
695 // by get_and_pin functions that write a PAIR out for checkpoint, by
696 // evictor threads that evict dirty PAIRS, and by the checkpoint thread
697 // that needs to write out a dirty node for checkpoint.
698 //
699 // Requires on entry for p->mutex to NOT be held, otherwise
700 // calling cachetable_only_write_locked_data will be very expensive
701 //
cachetable_write_locked_pair(evictor * ev,PAIR p,bool for_checkpoint)702 static void cachetable_write_locked_pair(
703 evictor* ev,
704 PAIR p,
705 bool for_checkpoint
706 )
707 {
708 PAIR_ATTR old_attr = p->attr;
709 PAIR_ATTR new_attr = p->attr;
710 // grabbing the disk_nb_mutex here ensures that
711 // after this point, no one is writing out a cloned value
712 // if we grab the disk_nb_mutex inside the if clause,
713 // then we may try to evict a PAIR that is in the process
714 // of having its clone be written out
715 pair_lock(p);
716 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
717 pair_unlock(p);
718 // make sure that assumption about cloned_value_data is true
719 // if we have grabbed the disk_nb_mutex, then that means that
720 // there should be no cloned value data
721 assert(p->cloned_value_data == NULL);
722 if (p->dirty) {
723 cachetable_only_write_locked_data(ev, p, for_checkpoint, &new_attr, false);
724 //
725 // now let's update variables
726 //
727 if (new_attr.is_valid) {
728 p->attr = new_attr;
729 ev->change_pair_attr(old_attr, new_attr);
730 }
731 }
732 // the pair is no longer dirty once written
733 p->dirty = CACHETABLE_CLEAN;
734 pair_lock(p);
735 nb_mutex_unlock(&p->disk_nb_mutex);
736 pair_unlock(p);
737 }
738
739 // Worker thread function to writes and evicts a pair from memory to its cachefile
cachetable_evicter(void * extra)740 static void cachetable_evicter(void* extra) {
741 PAIR p = (PAIR)extra;
742 pair_list* pl = p->list;
743 CACHEFILE cf = p->cachefile;
744 pl->read_pending_exp_lock();
745 bool for_checkpoint = p->checkpoint_pending;
746 p->checkpoint_pending = false;
747 // per the contract of evictor::evict_pair,
748 // the pair's mutex, p->mutex, must be held on entry
749 pair_lock(p);
750 p->ev->evict_pair(p, for_checkpoint);
751 pl->read_pending_exp_unlock();
752 bjm_remove_background_job(cf->bjm);
753 }
754
cachetable_partial_eviction(void * extra)755 static void cachetable_partial_eviction(void* extra) {
756 PAIR p = (PAIR)extra;
757 CACHEFILE cf = p->cachefile;
758 p->ev->do_partial_eviction(p);
759 bjm_remove_background_job(cf->bjm);
760 }
761
toku_cachetable_swap_pair_values(PAIR old_pair,PAIR new_pair)762 void toku_cachetable_swap_pair_values(PAIR old_pair, PAIR new_pair) {
763 void* old_value = old_pair->value_data;
764 void* new_value = new_pair->value_data;
765 old_pair->value_data = new_value;
766 new_pair->value_data = old_value;
767 }
768
toku_cachetable_maybe_flush_some(CACHETABLE ct)769 void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
770 // TODO: <CER> Maybe move this...
771 ct->ev.signal_eviction_thread();
772 }
773
774 // Initializes a pair's members.
775 //
pair_init(PAIR p,CACHEFILE cachefile,CACHEKEY key,void * value,PAIR_ATTR attr,enum cachetable_dirty dirty,uint32_t fullhash,CACHETABLE_WRITE_CALLBACK write_callback,evictor * ev,pair_list * list)776 void pair_init(PAIR p,
777 CACHEFILE cachefile,
778 CACHEKEY key,
779 void *value,
780 PAIR_ATTR attr,
781 enum cachetable_dirty dirty,
782 uint32_t fullhash,
783 CACHETABLE_WRITE_CALLBACK write_callback,
784 evictor *ev,
785 pair_list *list)
786 {
787 p->cachefile = cachefile;
788 p->key = key;
789 p->value_data = value;
790 p->cloned_value_data = NULL;
791 p->cloned_value_size = 0;
792 p->disk_data = NULL;
793 p->attr = attr;
794 p->dirty = dirty;
795 p->fullhash = fullhash;
796
797 p->flush_callback = write_callback.flush_callback;
798 p->pe_callback = write_callback.pe_callback;
799 p->pe_est_callback = write_callback.pe_est_callback;
800 p->cleaner_callback = write_callback.cleaner_callback;
801 p->clone_callback = write_callback.clone_callback;
802 p->checkpoint_complete_callback = write_callback.checkpoint_complete_callback;
803 p->write_extraargs = write_callback.write_extraargs;
804
805 p->count = 0; // <CER> Is zero the correct init value?
806 p->refcount = 0;
807 p->num_waiting_on_refs = 0;
808 toku_cond_init(*cachetable_p_refcount_wait_key, &p->refcount_wait, nullptr);
809 p->checkpoint_pending = false;
810
811 p->mutex = list->get_mutex_for_pair(fullhash);
812 assert(p->mutex);
813 p->value_rwlock.init(p->mutex
814 #ifdef TOKU_MYSQL_WITH_PFS
815 ,
816 *cachetable_value_key
817 #endif
818 );
819 nb_mutex_init(*cachetable_disk_nb_mutex_key,
820 *cachetable_disk_nb_rwlock_key,
821 &p->disk_nb_mutex);
822
823 p->size_evicting_estimate = 0; // <CER> Is zero the correct init value?
824
825 p->ev = ev;
826 p->list = list;
827
828 p->clock_next = p->clock_prev = NULL;
829 p->pending_next = p->pending_prev = NULL;
830 p->cf_next = p->cf_prev = NULL;
831 p->hash_chain = NULL;
832 }
833
834 // has ct locked on entry
835 // This function MUST NOT release and reacquire the cachetable lock
836 // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
837 //
838 // Requires pair list's write lock to be held on entry.
839 // the pair's mutex must be held as wel
840 //
841 //
cachetable_insert_at(CACHETABLE ct,CACHEFILE cachefile,CACHEKEY key,void * value,uint32_t fullhash,PAIR_ATTR attr,CACHETABLE_WRITE_CALLBACK write_callback,enum cachetable_dirty dirty)842 static PAIR cachetable_insert_at(CACHETABLE ct,
843 CACHEFILE cachefile, CACHEKEY key, void *value,
844 uint32_t fullhash,
845 PAIR_ATTR attr,
846 CACHETABLE_WRITE_CALLBACK write_callback,
847 enum cachetable_dirty dirty) {
848 PAIR MALLOC(p);
849 assert(p);
850 memset(p, 0, sizeof *p);
851 pair_init(p,
852 cachefile,
853 key,
854 value,
855 attr,
856 dirty,
857 fullhash,
858 write_callback,
859 &ct->ev,
860 &ct->list
861 );
862
863 ct->list.put(p);
864 ct->ev.add_pair_attr(attr);
865 return p;
866 }
867
868 // on input, the write list lock must be held AND
869 // the pair's mutex must be held as wel
cachetable_insert_pair_at(CACHETABLE ct,PAIR p,PAIR_ATTR attr)870 static void cachetable_insert_pair_at(CACHETABLE ct, PAIR p, PAIR_ATTR attr) {
871 ct->list.put(p);
872 ct->ev.add_pair_attr(attr);
873 }
874
875
876 // has ct locked on entry
877 // This function MUST NOT release and reacquire the cachetable lock
878 // Its callers (toku_cachetable_put_with_dep_pairs) depend on this behavior.
879 //
880 // Requires pair list's write lock to be held on entry
881 //
cachetable_put_internal(CACHEFILE cachefile,PAIR p,void * value,PAIR_ATTR attr,CACHETABLE_PUT_CALLBACK put_callback)882 static void cachetable_put_internal(
883 CACHEFILE cachefile,
884 PAIR p,
885 void *value,
886 PAIR_ATTR attr,
887 CACHETABLE_PUT_CALLBACK put_callback
888 )
889 {
890 CACHETABLE ct = cachefile->cachetable;
891 //
892 //
893 // TODO: (Zardosht), make code run in debug only
894 //
895 //
896 //PAIR dummy_p = ct->list.find_pair(cachefile, key, fullhash);
897 //invariant_null(dummy_p);
898 cachetable_insert_pair_at(ct, p, attr);
899 invariant_notnull(put_callback);
900 put_callback(p->key, value, p);
901 }
902
903 // Pair mutex (p->mutex) is may or may not be held on entry,
904 // Holding the pair mutex on entry is not important
905 // for performance or corrrectness
906 // Pair is pinned on entry
907 static void
clone_pair(evictor * ev,PAIR p)908 clone_pair(evictor* ev, PAIR p) {
909 PAIR_ATTR old_attr = p->attr;
910 PAIR_ATTR new_attr;
911 long clone_size = 0;
912
913 // act of cloning should be fast,
914 // not sure if we have to release
915 // and regrab the cachetable lock,
916 // but doing it for now
917 p->clone_callback(
918 p->value_data,
919 &p->cloned_value_data,
920 &clone_size,
921 &new_attr,
922 true,
923 p->write_extraargs
924 );
925
926 // now we need to do the same actions we would do
927 // if the PAIR had been written to disk
928 //
929 // because we hold the value_rwlock,
930 // it doesn't matter whether we clear
931 // the pending bit before the clone
932 // or after the clone
933 p->dirty = CACHETABLE_CLEAN;
934 if (new_attr.is_valid) {
935 p->attr = new_attr;
936 ev->change_pair_attr(old_attr, new_attr);
937 }
938 p->cloned_value_size = clone_size;
939 ev->add_cloned_data_size(p->cloned_value_size);
940 }
941
checkpoint_cloned_pair(void * extra)942 static void checkpoint_cloned_pair(void* extra) {
943 PAIR p = (PAIR)extra;
944 CACHETABLE ct = p->cachefile->cachetable;
945 PAIR_ATTR new_attr;
946 // note that pending lock is not needed here because
947 // we KNOW we are in the middle of a checkpoint
948 // and that a begin_checkpoint cannot happen
949 cachetable_only_write_locked_data(
950 p->ev,
951 p,
952 true, //for_checkpoint
953 &new_attr,
954 true //is_clone
955 );
956 pair_lock(p);
957 nb_mutex_unlock(&p->disk_nb_mutex);
958 pair_unlock(p);
959 ct->cp.remove_background_job();
960 }
961
962 static void
checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct,PAIR p)963 checkpoint_cloned_pair_on_writer_thread(CACHETABLE ct, PAIR p) {
964 toku_kibbutz_enq(ct->checkpointing_kibbutz, checkpoint_cloned_pair, p);
965 }
966
967
968 //
969 // Given a PAIR p with the value_rwlock altready held, do the following:
970 // - If the PAIR needs to be written out to disk for checkpoint:
971 // - If the PAIR is cloneable, clone the PAIR and place the work
972 // of writing the PAIR on a background thread.
973 // - If the PAIR is not cloneable, write the PAIR to disk for checkpoint
974 // on the current thread
975 //
976 // On entry, pair's mutex is NOT held
977 //
978 static void
write_locked_pair_for_checkpoint(CACHETABLE ct,PAIR p,bool checkpoint_pending)979 write_locked_pair_for_checkpoint(CACHETABLE ct, PAIR p, bool checkpoint_pending)
980 {
981 if (checkpoint_pending && p->checkpoint_complete_callback) {
982 p->checkpoint_complete_callback(p->value_data);
983 }
984 if (p->dirty && checkpoint_pending) {
985 if (p->clone_callback) {
986 pair_lock(p);
987 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
988 pair_unlock(p);
989 assert(!p->cloned_value_data);
990 clone_pair(&ct->ev, p);
991 assert(p->cloned_value_data);
992 // place it on the background thread and continue
993 // responsibility of writer thread to release disk_nb_mutex
994 ct->cp.add_background_job();
995 checkpoint_cloned_pair_on_writer_thread(ct, p);
996 }
997 else {
998 // The pair is not cloneable, just write the pair to disk
999 // we already have p->value_rwlock and we just do the write in our own thread.
1000 cachetable_write_locked_pair(&ct->ev, p, true); // keeps the PAIR's write lock
1001 }
1002 }
1003 }
1004
1005 // On entry and exit: hold the pair's mutex (p->mutex)
1006 // Method: take write lock
1007 // maybe write out the node
1008 // Else release write lock
1009 //
1010 static void
write_pair_for_checkpoint_thread(evictor * ev,PAIR p)1011 write_pair_for_checkpoint_thread (evictor* ev, PAIR p)
1012 {
1013 // Grab an exclusive lock on the pair.
1014 // If we grab an expensive lock, then other threads will return
1015 // TRY_AGAIN rather than waiting. In production, the only time
1016 // another thread will check if grabbing a lock is expensive is when
1017 // we have a clone_callback (FTNODEs), so the act of checkpointing
1018 // will be cheap. Also, much of the time we'll just be clearing
1019 // pending bits and that's definitely cheap. (see #5427)
1020 p->value_rwlock.write_lock(false);
1021 if (p->checkpoint_pending && p->checkpoint_complete_callback) {
1022 p->checkpoint_complete_callback(p->value_data);
1023 }
1024 if (p->dirty && p->checkpoint_pending) {
1025 if (p->clone_callback) {
1026 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1027 assert(!p->cloned_value_data);
1028 clone_pair(ev, p);
1029 assert(p->cloned_value_data);
1030 }
1031 else {
1032 // The pair is not cloneable, just write the pair to disk
1033 // we already have p->value_rwlock and we just do the write in our own thread.
1034 // this will grab and release disk_nb_mutex
1035 pair_unlock(p);
1036 cachetable_write_locked_pair(ev, p, true); // keeps the PAIR's write lock
1037 pair_lock(p);
1038 }
1039 p->checkpoint_pending = false;
1040
1041 // now release value_rwlock, before we write the PAIR out
1042 // so that the PAIR is available to client threads
1043 p->value_rwlock.write_unlock(); // didn't call cachetable_evict_pair so we have to unlock it ourselves.
1044 if (p->clone_callback) {
1045 // note that pending lock is not needed here because
1046 // we KNOW we are in the middle of a checkpoint
1047 // and that a begin_checkpoint cannot happen
1048 PAIR_ATTR attr;
1049 pair_unlock(p);
1050 cachetable_only_write_locked_data(
1051 ev,
1052 p,
1053 true, //for_checkpoint
1054 &attr,
1055 true //is_clone
1056 );
1057 pair_lock(p);
1058 nb_mutex_unlock(&p->disk_nb_mutex);
1059 }
1060 }
1061 else {
1062 //
1063 // we may clear the pending bit here because we have
1064 // both the cachetable lock and the PAIR lock.
1065 // The rule, as mentioned in toku_cachetable_begin_checkpoint,
1066 // is that to clear the bit, we must have both the PAIR lock
1067 // and the pending lock
1068 //
1069 p->checkpoint_pending = false;
1070 p->value_rwlock.write_unlock();
1071 }
1072 }
1073
1074 //
1075 // For each PAIR associated with these CACHEFILEs and CACHEKEYs
1076 // if the checkpoint_pending bit is set and the PAIR is dirty, write the PAIR
1077 // to disk.
1078 // We assume the PAIRs passed in have been locked by the client that made calls
1079 // into the cachetable that eventually make it here.
1080 //
checkpoint_dependent_pairs(CACHETABLE ct,uint32_t num_dependent_pairs,PAIR * dependent_pairs,bool * checkpoint_pending,enum cachetable_dirty * dependent_dirty)1081 static void checkpoint_dependent_pairs(
1082 CACHETABLE ct,
1083 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1084 PAIR* dependent_pairs,
1085 bool* checkpoint_pending,
1086 enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1087 )
1088 {
1089 for (uint32_t i =0; i < num_dependent_pairs; i++) {
1090 PAIR curr_dep_pair = dependent_pairs[i];
1091 // we need to update the dirtyness of the dependent pair,
1092 // because the client may have dirtied it while holding its lock,
1093 // and if the pair is pending a checkpoint, it needs to be written out
1094 if (dependent_dirty[i]) curr_dep_pair->dirty = CACHETABLE_DIRTY;
1095 if (checkpoint_pending[i]) {
1096 write_locked_pair_for_checkpoint(ct, curr_dep_pair, checkpoint_pending[i]);
1097 }
1098 }
1099 }
1100
toku_cachetable_put_with_dep_pairs(CACHEFILE cachefile,CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,void * value,PAIR_ATTR attr,CACHETABLE_WRITE_CALLBACK write_callback,void * get_key_and_fullhash_extra,uint32_t num_dependent_pairs,PAIR * dependent_pairs,enum cachetable_dirty * dependent_dirty,CACHEKEY * key,uint32_t * fullhash,CACHETABLE_PUT_CALLBACK put_callback)1101 void toku_cachetable_put_with_dep_pairs(
1102 CACHEFILE cachefile,
1103 CACHETABLE_GET_KEY_AND_FULLHASH get_key_and_fullhash,
1104 void *value,
1105 PAIR_ATTR attr,
1106 CACHETABLE_WRITE_CALLBACK write_callback,
1107 void *get_key_and_fullhash_extra,
1108 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1109 PAIR* dependent_pairs,
1110 enum cachetable_dirty* dependent_dirty, // array stating dirty/cleanness of dependent pairs
1111 CACHEKEY* key,
1112 uint32_t* fullhash,
1113 CACHETABLE_PUT_CALLBACK put_callback
1114 )
1115 {
1116 //
1117 // need to get the key and filehash
1118 //
1119 CACHETABLE ct = cachefile->cachetable;
1120 if (ct->ev.should_client_thread_sleep()) {
1121 ct->ev.wait_for_cache_pressure_to_subside();
1122 }
1123 if (ct->ev.should_client_wake_eviction_thread()) {
1124 ct->ev.signal_eviction_thread();
1125 }
1126
1127 PAIR p = NULL;
1128 XMALLOC(p);
1129 memset(p, 0, sizeof *p);
1130
1131 ct->list.write_list_lock();
1132 get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
1133 pair_init(
1134 p,
1135 cachefile,
1136 *key,
1137 value,
1138 attr,
1139 CACHETABLE_DIRTY,
1140 *fullhash,
1141 write_callback,
1142 &ct->ev,
1143 &ct->list
1144 );
1145 pair_lock(p);
1146 p->value_rwlock.write_lock(true);
1147 cachetable_put_internal(
1148 cachefile,
1149 p,
1150 value,
1151 attr,
1152 put_callback
1153 );
1154 pair_unlock(p);
1155 bool checkpoint_pending[num_dependent_pairs];
1156 ct->list.write_pending_cheap_lock();
1157 for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1158 checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1159 dependent_pairs[i]->checkpoint_pending = false;
1160 }
1161 ct->list.write_pending_cheap_unlock();
1162 ct->list.write_list_unlock();
1163
1164 //
1165 // now that we have inserted the row, let's checkpoint the
1166 // dependent nodes, if they need checkpointing
1167 //
1168 checkpoint_dependent_pairs(
1169 ct,
1170 num_dependent_pairs,
1171 dependent_pairs,
1172 checkpoint_pending,
1173 dependent_dirty
1174 );
1175 }
1176
toku_cachetable_put(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,void * value,PAIR_ATTR attr,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_PUT_CALLBACK put_callback)1177 void toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, void*value, PAIR_ATTR attr,
1178 CACHETABLE_WRITE_CALLBACK write_callback,
1179 CACHETABLE_PUT_CALLBACK put_callback
1180 ) {
1181 CACHETABLE ct = cachefile->cachetable;
1182 if (ct->ev.should_client_thread_sleep()) {
1183 ct->ev.wait_for_cache_pressure_to_subside();
1184 }
1185 if (ct->ev.should_client_wake_eviction_thread()) {
1186 ct->ev.signal_eviction_thread();
1187 }
1188
1189 PAIR p = NULL;
1190 XMALLOC(p);
1191 memset(p, 0, sizeof *p);
1192
1193 ct->list.write_list_lock();
1194 pair_init(
1195 p,
1196 cachefile,
1197 key,
1198 value,
1199 attr,
1200 CACHETABLE_DIRTY,
1201 fullhash,
1202 write_callback,
1203 &ct->ev,
1204 &ct->list
1205 );
1206 pair_lock(p);
1207 p->value_rwlock.write_lock(true);
1208 cachetable_put_internal(
1209 cachefile,
1210 p,
1211 value,
1212 attr,
1213 put_callback
1214 );
1215 pair_unlock(p);
1216 ct->list.write_list_unlock();
1217 }
1218
get_tnow(void)1219 static uint64_t get_tnow(void) {
1220 struct timeval tv;
1221 int r = gettimeofday(&tv, NULL); assert(r == 0);
1222 return tv.tv_sec * 1000000ULL + tv.tv_usec;
1223 }
1224
1225 //
1226 // cachetable lock and PAIR lock are held on entry
1227 // On exit, cachetable lock is still held, but PAIR lock
1228 // is either released.
1229 //
1230 // No locks are held on entry (besides the rwlock write lock of the PAIR)
1231 //
1232 static void
do_partial_fetch(CACHETABLE ct,CACHEFILE cachefile,PAIR p,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,bool keep_pair_locked)1233 do_partial_fetch(
1234 CACHETABLE ct,
1235 CACHEFILE cachefile,
1236 PAIR p,
1237 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1238 void *read_extraargs,
1239 bool keep_pair_locked
1240 )
1241 {
1242 PAIR_ATTR old_attr = p->attr;
1243 PAIR_ATTR new_attr = zero_attr;
1244 // As of Dr. No, only clean PAIRs may have pieces missing,
1245 // so we do a sanity check here.
1246 assert(!p->dirty);
1247
1248 pair_lock(p);
1249 invariant(p->value_rwlock.writers());
1250 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1251 pair_unlock(p);
1252 int r = pf_callback(p->value_data, p->disk_data, read_extraargs, cachefile->fd, &new_attr);
1253 lazy_assert_zero(r);
1254 p->attr = new_attr;
1255 ct->ev.change_pair_attr(old_attr, new_attr);
1256 pair_lock(p);
1257 nb_mutex_unlock(&p->disk_nb_mutex);
1258 if (!keep_pair_locked) {
1259 p->value_rwlock.write_unlock();
1260 }
1261 pair_unlock(p);
1262 }
1263
toku_cachetable_pf_pinned_pair(void * value,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,CACHEFILE cf,CACHEKEY key,uint32_t fullhash)1264 void toku_cachetable_pf_pinned_pair(
1265 void* value,
1266 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1267 void* read_extraargs,
1268 CACHEFILE cf,
1269 CACHEKEY key,
1270 uint32_t fullhash
1271 )
1272 {
1273 PAIR_ATTR attr;
1274 PAIR p = NULL;
1275 CACHETABLE ct = cf->cachetable;
1276 ct->list.pair_lock_by_fullhash(fullhash);
1277 p = ct->list.find_pair(cf, key, fullhash);
1278 assert(p != NULL);
1279 assert(p->value_data == value);
1280 assert(p->value_rwlock.writers());
1281 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1282 pair_unlock(p);
1283
1284 int fd = cf->fd;
1285 pf_callback(value, p->disk_data, read_extraargs, fd, &attr);
1286
1287 pair_lock(p);
1288 nb_mutex_unlock(&p->disk_nb_mutex);
1289 pair_unlock(p);
1290 }
1291
toku_cachetable_get_and_pin(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,void ** value,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,bool may_modify_value,void * read_extraargs)1292 int toku_cachetable_get_and_pin (
1293 CACHEFILE cachefile,
1294 CACHEKEY key,
1295 uint32_t fullhash,
1296 void**value,
1297 CACHETABLE_WRITE_CALLBACK write_callback,
1298 CACHETABLE_FETCH_CALLBACK fetch_callback,
1299 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1300 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1301 bool may_modify_value,
1302 void* read_extraargs // parameter for fetch_callback, pf_req_callback, and pf_callback
1303 )
1304 {
1305 pair_lock_type lock_type = may_modify_value ? PL_WRITE_EXPENSIVE : PL_READ;
1306 // We have separate parameters of read_extraargs and write_extraargs because
1307 // the lifetime of the two parameters are different. write_extraargs may be used
1308 // long after this function call (e.g. after a flush to disk), whereas read_extraargs
1309 // will not be used after this function returns. As a result, the caller may allocate
1310 // read_extraargs on the stack, whereas write_extraargs must be allocated
1311 // on the heap.
1312 return toku_cachetable_get_and_pin_with_dep_pairs (
1313 cachefile,
1314 key,
1315 fullhash,
1316 value,
1317 write_callback,
1318 fetch_callback,
1319 pf_req_callback,
1320 pf_callback,
1321 lock_type,
1322 read_extraargs,
1323 0, // number of dependent pairs that we may need to checkpoint
1324 NULL, // array of dependent pairs
1325 NULL // array stating dirty/cleanness of dependent pairs
1326 );
1327 }
1328
1329 // Read a pair from a cachefile into memory using the pair's fetch callback
1330 // on entry, pair mutex (p->mutex) is NOT held, but pair is pinned
cachetable_fetch_pair(CACHETABLE ct,CACHEFILE cf,PAIR p,CACHETABLE_FETCH_CALLBACK fetch_callback,void * read_extraargs,bool keep_pair_locked)1331 static void cachetable_fetch_pair(
1332 CACHETABLE ct,
1333 CACHEFILE cf,
1334 PAIR p,
1335 CACHETABLE_FETCH_CALLBACK fetch_callback,
1336 void* read_extraargs,
1337 bool keep_pair_locked
1338 )
1339 {
1340 // helgrind
1341 CACHEKEY key = p->key;
1342 uint32_t fullhash = p->fullhash;
1343
1344 void *toku_value = NULL;
1345 void *disk_data = NULL;
1346 PAIR_ATTR attr;
1347
1348 // FIXME this should be enum cachetable_dirty, right?
1349 int dirty = 0;
1350
1351 pair_lock(p);
1352 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
1353 pair_unlock(p);
1354
1355 int r;
1356 r = fetch_callback(cf, p, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs);
1357 if (dirty) {
1358 p->dirty = CACHETABLE_DIRTY;
1359 }
1360 assert(r == 0);
1361
1362 p->value_data = toku_value;
1363 p->disk_data = disk_data;
1364 p->attr = attr;
1365 ct->ev.add_pair_attr(attr);
1366 pair_lock(p);
1367 nb_mutex_unlock(&p->disk_nb_mutex);
1368 if (!keep_pair_locked) {
1369 p->value_rwlock.write_unlock();
1370 }
1371 pair_unlock(p);
1372 }
1373
get_checkpoint_pending(PAIR p,pair_list * pl)1374 static bool get_checkpoint_pending(PAIR p, pair_list* pl) {
1375 bool checkpoint_pending = false;
1376 pl->read_pending_cheap_lock();
1377 checkpoint_pending = p->checkpoint_pending;
1378 p->checkpoint_pending = false;
1379 pl->read_pending_cheap_unlock();
1380 return checkpoint_pending;
1381 }
1382
checkpoint_pair_and_dependent_pairs(CACHETABLE ct,PAIR p,bool p_is_pending_checkpoint,uint32_t num_dependent_pairs,PAIR * dependent_pairs,bool * dependent_pairs_pending_checkpoint,enum cachetable_dirty * dependent_dirty)1383 static void checkpoint_pair_and_dependent_pairs(
1384 CACHETABLE ct,
1385 PAIR p,
1386 bool p_is_pending_checkpoint,
1387 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1388 PAIR* dependent_pairs,
1389 bool* dependent_pairs_pending_checkpoint,
1390 enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1391 )
1392 {
1393
1394 //
1395 // A checkpoint must not begin while we are checking dependent pairs or pending bits.
1396 // Here is why.
1397 //
1398 // Now that we have all of the locks on the pairs we
1399 // care about, we can take care of the necessary checkpointing.
1400 // For each pair, we simply need to write the pair if it is
1401 // pending a checkpoint. If no pair is pending a checkpoint,
1402 // then all of this work will be done with the cachetable lock held,
1403 // so we don't need to worry about a checkpoint beginning
1404 // in the middle of any operation below. If some pair
1405 // is pending a checkpoint, then the checkpoint thread
1406 // will not complete its current checkpoint until it can
1407 // successfully grab a lock on the pending pair and
1408 // remove it from its list of pairs pending a checkpoint.
1409 // This cannot be done until we release the lock
1410 // that we have, which is not done in this function.
1411 // So, the point is, it is impossible for a checkpoint
1412 // to begin while we write any of these locked pairs
1413 // for checkpoint, even though writing a pair releases
1414 // the cachetable lock.
1415 //
1416 write_locked_pair_for_checkpoint(ct, p, p_is_pending_checkpoint);
1417
1418 checkpoint_dependent_pairs(
1419 ct,
1420 num_dependent_pairs,
1421 dependent_pairs,
1422 dependent_pairs_pending_checkpoint,
1423 dependent_dirty
1424 );
1425 }
1426
unpin_pair(PAIR p,bool read_lock_grabbed)1427 static void unpin_pair(PAIR p, bool read_lock_grabbed) {
1428 if (read_lock_grabbed) {
1429 p->value_rwlock.read_unlock();
1430 }
1431 else {
1432 p->value_rwlock.write_unlock();
1433 }
1434 }
1435
1436
1437 // on input, the pair's mutex is held,
1438 // on output, the pair's mutex is not held.
1439 // if true, we must try again, and pair is not pinned
1440 // if false, we succeeded, the pair is pinned
try_pin_pair(PAIR p,CACHETABLE ct,CACHEFILE cachefile,pair_lock_type lock_type,uint32_t num_dependent_pairs,PAIR * dependent_pairs,enum cachetable_dirty * dependent_dirty,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,bool already_slept)1441 static bool try_pin_pair(
1442 PAIR p,
1443 CACHETABLE ct,
1444 CACHEFILE cachefile,
1445 pair_lock_type lock_type,
1446 uint32_t num_dependent_pairs,
1447 PAIR* dependent_pairs,
1448 enum cachetable_dirty* dependent_dirty,
1449 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1450 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1451 void* read_extraargs,
1452 bool already_slept
1453 )
1454 {
1455 bool dep_checkpoint_pending[num_dependent_pairs];
1456 bool try_again = true;
1457 bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
1458 if (lock_type != PL_READ) {
1459 p->value_rwlock.write_lock(expensive);
1460 }
1461 else {
1462 p->value_rwlock.read_lock();
1463 }
1464 pair_touch(p);
1465 pair_unlock(p);
1466
1467 bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
1468
1469 if (partial_fetch_required) {
1470 toku::context pf_ctx(CTX_PARTIAL_FETCH);
1471
1472 if (ct->ev.should_client_thread_sleep() && !already_slept) {
1473 pair_lock(p);
1474 unpin_pair(p, (lock_type == PL_READ));
1475 pair_unlock(p);
1476 try_again = true;
1477 goto exit;
1478 }
1479 if (ct->ev.should_client_wake_eviction_thread()) {
1480 ct->ev.signal_eviction_thread();
1481 }
1482 //
1483 // Just because the PAIR exists does necessarily mean the all the data the caller requires
1484 // is in memory. A partial fetch may be required, which is evaluated above
1485 // if the variable is true, a partial fetch is required so we must grab the PAIR's write lock
1486 // and then call a callback to retrieve what we need
1487 //
1488 assert(partial_fetch_required);
1489 // As of Dr. No, only clean PAIRs may have pieces missing,
1490 // so we do a sanity check here.
1491 assert(!p->dirty);
1492
1493 if (lock_type == PL_READ) {
1494 pair_lock(p);
1495 p->value_rwlock.read_unlock();
1496 p->value_rwlock.write_lock(true);
1497 pair_unlock(p);
1498 }
1499 else if (lock_type == PL_WRITE_CHEAP) {
1500 pair_lock(p);
1501 p->value_rwlock.write_unlock();
1502 p->value_rwlock.write_lock(true);
1503 pair_unlock(p);
1504 }
1505
1506 partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
1507 if (partial_fetch_required) {
1508 do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, true);
1509 }
1510 if (lock_type == PL_READ) {
1511 //
1512 // TODO: Zardosht, somehow ensure that a partial eviction cannot happen
1513 // between these two calls
1514 //
1515 pair_lock(p);
1516 p->value_rwlock.write_unlock();
1517 p->value_rwlock.read_lock();
1518 pair_unlock(p);
1519 }
1520 else if (lock_type == PL_WRITE_CHEAP) {
1521 pair_lock(p);
1522 p->value_rwlock.write_unlock();
1523 p->value_rwlock.write_lock(false);
1524 pair_unlock(p);
1525 }
1526 // small hack here for #5439,
1527 // for queries, pf_req_callback does some work for the caller,
1528 // that information may be out of date after a write_unlock
1529 // followed by a relock, so we do it again.
1530 bool pf_required = pf_req_callback(p->value_data,read_extraargs);
1531 assert(!pf_required);
1532 }
1533
1534 if (lock_type != PL_READ) {
1535 ct->list.read_pending_cheap_lock();
1536 bool p_checkpoint_pending = p->checkpoint_pending;
1537 p->checkpoint_pending = false;
1538 for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1539 dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1540 dependent_pairs[i]->checkpoint_pending = false;
1541 }
1542 ct->list.read_pending_cheap_unlock();
1543 checkpoint_pair_and_dependent_pairs(
1544 ct,
1545 p,
1546 p_checkpoint_pending,
1547 num_dependent_pairs,
1548 dependent_pairs,
1549 dep_checkpoint_pending,
1550 dependent_dirty
1551 );
1552 }
1553
1554 try_again = false;
1555 exit:
1556 return try_again;
1557 }
1558
toku_cachetable_get_and_pin_with_dep_pairs(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,void ** value,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,pair_lock_type lock_type,void * read_extraargs,uint32_t num_dependent_pairs,PAIR * dependent_pairs,enum cachetable_dirty * dependent_dirty)1559 int toku_cachetable_get_and_pin_with_dep_pairs (
1560 CACHEFILE cachefile,
1561 CACHEKEY key,
1562 uint32_t fullhash,
1563 void**value,
1564 CACHETABLE_WRITE_CALLBACK write_callback,
1565 CACHETABLE_FETCH_CALLBACK fetch_callback,
1566 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
1567 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
1568 pair_lock_type lock_type,
1569 void* read_extraargs, // parameter for fetch_callback, pf_req_callback, and pf_callback
1570 uint32_t num_dependent_pairs, // number of dependent pairs that we may need to checkpoint
1571 PAIR* dependent_pairs,
1572 enum cachetable_dirty* dependent_dirty // array stating dirty/cleanness of dependent pairs
1573 )
1574 // See cachetable/cachetable.h
1575 {
1576 CACHETABLE ct = cachefile->cachetable;
1577 bool wait = false;
1578 bool already_slept = false;
1579 bool dep_checkpoint_pending[num_dependent_pairs];
1580
1581 //
1582 // If in the process of pinning the node we add data to the cachetable via a partial fetch
1583 // or a full fetch, we may need to first sleep because there is too much data in the
1584 // cachetable. In those cases, we set the bool wait to true and goto try_again, so that
1585 // we can do our sleep and then restart the function.
1586 //
1587 beginning:
1588 if (wait) {
1589 // We shouldn't be holding the read list lock while
1590 // waiting for the evictor to remove pairs.
1591 already_slept = true;
1592 ct->ev.wait_for_cache_pressure_to_subside();
1593 }
1594
1595 ct->list.pair_lock_by_fullhash(fullhash);
1596 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1597 if (p) {
1598 // on entry, holds p->mutex (which is locked via pair_lock_by_fullhash)
1599 // on exit, does not hold p->mutex
1600 bool try_again = try_pin_pair(
1601 p,
1602 ct,
1603 cachefile,
1604 lock_type,
1605 num_dependent_pairs,
1606 dependent_pairs,
1607 dependent_dirty,
1608 pf_req_callback,
1609 pf_callback,
1610 read_extraargs,
1611 already_slept
1612 );
1613 if (try_again) {
1614 wait = true;
1615 goto beginning;
1616 }
1617 else {
1618 goto got_value;
1619 }
1620 }
1621 else {
1622 toku::context fetch_ctx(CTX_FULL_FETCH);
1623
1624 ct->list.pair_unlock_by_fullhash(fullhash);
1625 // we only want to sleep once per call to get_and_pin. If we have already
1626 // slept and there is still cache pressure, then we might as
1627 // well just complete the call, because the sleep did not help
1628 // By sleeping only once per get_and_pin, we prevent starvation and ensure
1629 // that we make progress (however slow) on each thread, which allows
1630 // assumptions of the form 'x will eventually happen'.
1631 // This happens in extreme scenarios.
1632 if (ct->ev.should_client_thread_sleep() && !already_slept) {
1633 wait = true;
1634 goto beginning;
1635 }
1636 if (ct->ev.should_client_wake_eviction_thread()) {
1637 ct->ev.signal_eviction_thread();
1638 }
1639 // Since the pair was not found, we need the write list
1640 // lock to add it. So, we have to release the read list lock
1641 // first.
1642 ct->list.write_list_lock();
1643 ct->list.pair_lock_by_fullhash(fullhash);
1644 p = ct->list.find_pair(cachefile, key, fullhash);
1645 if (p != NULL) {
1646 ct->list.write_list_unlock();
1647 // on entry, holds p->mutex,
1648 // on exit, does not hold p->mutex
1649 bool try_again = try_pin_pair(
1650 p,
1651 ct,
1652 cachefile,
1653 lock_type,
1654 num_dependent_pairs,
1655 dependent_pairs,
1656 dependent_dirty,
1657 pf_req_callback,
1658 pf_callback,
1659 read_extraargs,
1660 already_slept
1661 );
1662 if (try_again) {
1663 wait = true;
1664 goto beginning;
1665 }
1666 else {
1667 goto got_value;
1668 }
1669 }
1670 assert(p == NULL);
1671
1672 // Insert a PAIR into the cachetable
1673 // NOTE: At this point we still have the write list lock held.
1674 p = cachetable_insert_at(
1675 ct,
1676 cachefile,
1677 key,
1678 zero_value,
1679 fullhash,
1680 zero_attr,
1681 write_callback,
1682 CACHETABLE_CLEAN
1683 );
1684 invariant_notnull(p);
1685
1686 // Pin the pair.
1687 p->value_rwlock.write_lock(true);
1688 pair_unlock(p);
1689
1690
1691 if (lock_type != PL_READ) {
1692 ct->list.read_pending_cheap_lock();
1693 invariant(!p->checkpoint_pending);
1694 for (uint32_t i = 0; i < num_dependent_pairs; i++) {
1695 dep_checkpoint_pending[i] = dependent_pairs[i]->checkpoint_pending;
1696 dependent_pairs[i]->checkpoint_pending = false;
1697 }
1698 ct->list.read_pending_cheap_unlock();
1699 }
1700 // We should release the lock before we perform
1701 // these expensive operations.
1702 ct->list.write_list_unlock();
1703
1704 if (lock_type != PL_READ) {
1705 checkpoint_dependent_pairs(
1706 ct,
1707 num_dependent_pairs,
1708 dependent_pairs,
1709 dep_checkpoint_pending,
1710 dependent_dirty
1711 );
1712 }
1713 uint64_t t0 = get_tnow();
1714
1715 // Retrieve the value of the PAIR from disk.
1716 // The pair being fetched will be marked as pending if a checkpoint happens during the
1717 // fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean.
1718 cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, true);
1719 cachetable_miss++;
1720 cachetable_misstime += get_tnow() - t0;
1721
1722 // If the lock_type requested was a PL_READ, we downgrade to PL_READ,
1723 // but if the request was for a PL_WRITE_CHEAP, we don't bother
1724 // downgrading, because we would have to possibly resolve the
1725 // checkpointing again, and that would just make this function even
1726 // messier.
1727 //
1728 // TODO(yoni): in case of PL_WRITE_CHEAP, write and use
1729 // p->value_rwlock.write_change_status_to_not_expensive(); (Also name it better)
1730 // to downgrade from an expensive write lock to a cheap one
1731 if (lock_type == PL_READ) {
1732 pair_lock(p);
1733 p->value_rwlock.write_unlock();
1734 p->value_rwlock.read_lock();
1735 pair_unlock(p);
1736 // small hack here for #5439,
1737 // for queries, pf_req_callback does some work for the caller,
1738 // that information may be out of date after a write_unlock
1739 // followed by a read_lock, so we do it again.
1740 bool pf_required = pf_req_callback(p->value_data,read_extraargs);
1741 assert(!pf_required);
1742 }
1743 goto got_value;
1744 }
1745 got_value:
1746 *value = p->value_data;
1747 return 0;
1748 }
1749
1750 // Lookup a key in the cachetable. If it is found and it is not being written, then
1751 // acquire a read lock on the pair, update the LRU list, and return sucess.
1752 //
1753 // However, if the page is clean or has checkpoint pending, don't return success.
1754 // This will minimize the number of dirty nodes.
1755 // Rationale: maybe_get_and_pin is used when the system has an alternative to modifying a node.
1756 // In the context of checkpointing, we don't want to gratuituously dirty a page, because it causes an I/O.
1757 // For example, imagine that we can modify a bit in a dirty parent, or modify a bit in a clean child, then we should modify
1758 // the dirty parent (which will have to do I/O eventually anyway) rather than incur a full block write to modify one bit.
1759 // Similarly, if the checkpoint is actually pending, we don't want to block on it.
toku_cachetable_maybe_get_and_pin(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,pair_lock_type lock_type,void ** value)1760 int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
1761 CACHETABLE ct = cachefile->cachetable;
1762 int r = -1;
1763 ct->list.pair_lock_by_fullhash(fullhash);
1764 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1765 if (p) {
1766 const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
1767 bool got_lock = false;
1768 switch (lock_type) {
1769 case PL_READ:
1770 if (p->value_rwlock.try_read_lock()) {
1771 got_lock = p->dirty;
1772
1773 if (!got_lock) {
1774 p->value_rwlock.read_unlock();
1775 }
1776 }
1777 break;
1778 case PL_WRITE_CHEAP:
1779 case PL_WRITE_EXPENSIVE:
1780 if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
1781 // we got the lock fast, so continue
1782 ct->list.read_pending_cheap_lock();
1783
1784 // if pending a checkpoint, then we don't want to return
1785 // the value to the user, because we are responsible for
1786 // handling the checkpointing, which we do not want to do,
1787 // because it is expensive
1788 got_lock = p->dirty && !p->checkpoint_pending;
1789
1790 ct->list.read_pending_cheap_unlock();
1791 if (!got_lock) {
1792 p->value_rwlock.write_unlock();
1793 }
1794 }
1795 break;
1796 }
1797 if (got_lock) {
1798 pair_touch(p);
1799 *value = p->value_data;
1800 r = 0;
1801 }
1802 }
1803 ct->list.pair_unlock_by_fullhash(fullhash);
1804 return r;
1805 }
1806
1807 //Used by flusher threads to possibly pin child on client thread if pinning is cheap
1808 //Same as toku_cachetable_maybe_get_and_pin except that we don't care if the node is clean or dirty (return the node regardless).
1809 //All other conditions remain the same.
toku_cachetable_maybe_get_and_pin_clean(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,pair_lock_type lock_type,void ** value)1810 int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, pair_lock_type lock_type, void**value) {
1811 CACHETABLE ct = cachefile->cachetable;
1812 int r = -1;
1813 ct->list.pair_lock_by_fullhash(fullhash);
1814 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1815 if (p) {
1816 const bool lock_is_expensive = (lock_type == PL_WRITE_EXPENSIVE);
1817 bool got_lock = false;
1818 switch (lock_type) {
1819 case PL_READ:
1820 if (p->value_rwlock.try_read_lock()) {
1821 got_lock = true;
1822 } else if (!p->value_rwlock.read_lock_is_expensive()) {
1823 p->value_rwlock.write_lock(lock_is_expensive);
1824 got_lock = true;
1825 }
1826 if (got_lock) {
1827 pair_touch(p);
1828 }
1829 pair_unlock(p);
1830 break;
1831 case PL_WRITE_CHEAP:
1832 case PL_WRITE_EXPENSIVE:
1833 if (p->value_rwlock.try_write_lock(lock_is_expensive)) {
1834 got_lock = true;
1835 } else if (!p->value_rwlock.write_lock_is_expensive()) {
1836 p->value_rwlock.write_lock(lock_is_expensive);
1837 got_lock = true;
1838 }
1839 if (got_lock) {
1840 pair_touch(p);
1841 }
1842 pair_unlock(p);
1843 if (got_lock) {
1844 bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
1845 write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
1846 }
1847 break;
1848 }
1849 if (got_lock) {
1850 *value = p->value_data;
1851 r = 0;
1852 }
1853 } else {
1854 ct->list.pair_unlock_by_fullhash(fullhash);
1855 }
1856 return r;
1857 }
1858
toku_cachetable_get_attr(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,PAIR_ATTR * attr)1859 int toku_cachetable_get_attr (CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, PAIR_ATTR *attr) {
1860 CACHETABLE ct = cachefile->cachetable;
1861 int r;
1862 ct->list.pair_lock_by_fullhash(fullhash);
1863 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
1864 if (p) {
1865 // Assumes pair lock and full hash lock are the same mutex
1866 *attr = p->attr;
1867 r = 0;
1868 } else {
1869 r = -1;
1870 }
1871 ct->list.pair_unlock_by_fullhash(fullhash);
1872 return r;
1873 }
1874
1875 //
1876 // internal function to unpin a PAIR.
1877 // As of Clayface, this is may be called in two ways:
1878 // - with flush false
1879 // - with flush true
1880 // The first is for when this is run during run_unlockers in
1881 // toku_cachetable_get_and_pin_nonblocking, the second is during
1882 // normal operations. Only during normal operations do we want to possibly
1883 // induce evictions or sleep.
1884 //
1885 static int
cachetable_unpin_internal(CACHEFILE cachefile,PAIR p,enum cachetable_dirty dirty,PAIR_ATTR attr,bool flush)1886 cachetable_unpin_internal(
1887 CACHEFILE cachefile,
1888 PAIR p,
1889 enum cachetable_dirty dirty,
1890 PAIR_ATTR attr,
1891 bool flush
1892 )
1893 {
1894 invariant_notnull(p);
1895
1896 CACHETABLE ct = cachefile->cachetable;
1897 bool added_data_to_cachetable = false;
1898
1899 // hack for #3969, only exists in case where we run unlockers
1900 pair_lock(p);
1901 PAIR_ATTR old_attr = p->attr;
1902 PAIR_ATTR new_attr = attr;
1903 if (dirty) {
1904 p->dirty = CACHETABLE_DIRTY;
1905 }
1906 if (attr.is_valid) {
1907 p->attr = attr;
1908 }
1909 bool read_lock_grabbed = p->value_rwlock.readers() != 0;
1910 unpin_pair(p, read_lock_grabbed);
1911 pair_unlock(p);
1912
1913 if (attr.is_valid) {
1914 if (new_attr.size > old_attr.size) {
1915 added_data_to_cachetable = true;
1916 }
1917 ct->ev.change_pair_attr(old_attr, new_attr);
1918 }
1919
1920 // see comments above this function to understand this code
1921 if (flush && added_data_to_cachetable) {
1922 if (ct->ev.should_client_thread_sleep()) {
1923 ct->ev.wait_for_cache_pressure_to_subside();
1924 }
1925 if (ct->ev.should_client_wake_eviction_thread()) {
1926 ct->ev.signal_eviction_thread();
1927 }
1928 }
1929 return 0;
1930 }
1931
toku_cachetable_unpin(CACHEFILE cachefile,PAIR p,enum cachetable_dirty dirty,PAIR_ATTR attr)1932 int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
1933 return cachetable_unpin_internal(cachefile, p, dirty, attr, true);
1934 }
toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile,PAIR p,enum cachetable_dirty dirty,PAIR_ATTR attr)1935 int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
1936 return cachetable_unpin_internal(cachefile, p, dirty, attr, false);
1937 }
1938
1939 static void
run_unlockers(UNLOCKERS unlockers)1940 run_unlockers (UNLOCKERS unlockers) {
1941 while (unlockers) {
1942 assert(unlockers->locked);
1943 unlockers->locked = false;
1944 unlockers->f(unlockers->extra);
1945 unlockers=unlockers->next;
1946 }
1947 }
1948
1949 //
1950 // This function tries to pin the pair without running the unlockers.
1951 // If it can pin the pair cheaply, it does so, and returns 0.
1952 // If the pin will be expensive, it runs unlockers,
1953 // pins the pair, then releases the pin,
1954 // and then returns TOKUDB_TRY_AGAIN
1955 //
1956 // on entry, pair mutex is held,
1957 // on exit, pair mutex is NOT held
1958 static int
maybe_pin_pair(PAIR p,pair_lock_type lock_type,UNLOCKERS unlockers)1959 maybe_pin_pair(
1960 PAIR p,
1961 pair_lock_type lock_type,
1962 UNLOCKERS unlockers
1963 )
1964 {
1965 int retval = 0;
1966 bool expensive = (lock_type == PL_WRITE_EXPENSIVE);
1967
1968 // we can pin the PAIR. In each case, we check to see
1969 // if acquiring the pin is expensive. If so, we run the unlockers, set the
1970 // retval to TOKUDB_TRY_AGAIN, pin AND release the PAIR.
1971 // If not, then we pin the PAIR, keep retval at 0, and do not
1972 // run the unlockers, as we intend to return the value to the user
1973 if (lock_type == PL_READ) {
1974 if (p->value_rwlock.read_lock_is_expensive()) {
1975 pair_add_ref_unlocked(p);
1976 pair_unlock(p);
1977 run_unlockers(unlockers);
1978 retval = TOKUDB_TRY_AGAIN;
1979 pair_lock(p);
1980 pair_release_ref_unlocked(p);
1981 }
1982 p->value_rwlock.read_lock();
1983 }
1984 else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){
1985 if (p->value_rwlock.write_lock_is_expensive()) {
1986 pair_add_ref_unlocked(p);
1987 pair_unlock(p);
1988 run_unlockers(unlockers);
1989 // change expensive to false because
1990 // we will unpin the pair immedietely
1991 // after pinning it
1992 expensive = false;
1993 retval = TOKUDB_TRY_AGAIN;
1994 pair_lock(p);
1995 pair_release_ref_unlocked(p);
1996 }
1997 p->value_rwlock.write_lock(expensive);
1998 }
1999 else {
2000 abort();
2001 }
2002
2003 if (retval == TOKUDB_TRY_AGAIN) {
2004 unpin_pair(p, (lock_type == PL_READ));
2005 }
2006 pair_touch(p);
2007 pair_unlock(p);
2008 return retval;
2009 }
2010
toku_cachetable_get_and_pin_nonblocking(CACHEFILE cf,CACHEKEY key,uint32_t fullhash,void ** value,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,pair_lock_type lock_type,void * read_extraargs,UNLOCKERS unlockers)2011 int toku_cachetable_get_and_pin_nonblocking(
2012 CACHEFILE cf,
2013 CACHEKEY key,
2014 uint32_t fullhash,
2015 void**value,
2016 CACHETABLE_WRITE_CALLBACK write_callback,
2017 CACHETABLE_FETCH_CALLBACK fetch_callback,
2018 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
2019 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
2020 pair_lock_type lock_type,
2021 void *read_extraargs,
2022 UNLOCKERS unlockers
2023 )
2024 // See cachetable/cachetable.h.
2025 {
2026 CACHETABLE ct = cf->cachetable;
2027 assert(lock_type == PL_READ ||
2028 lock_type == PL_WRITE_CHEAP ||
2029 lock_type == PL_WRITE_EXPENSIVE
2030 );
2031 try_again:
2032 ct->list.pair_lock_by_fullhash(fullhash);
2033 PAIR p = ct->list.find_pair(cf, key, fullhash);
2034 if (p == NULL) {
2035 toku::context fetch_ctx(CTX_FULL_FETCH);
2036
2037 // Not found
2038 ct->list.pair_unlock_by_fullhash(fullhash);
2039 ct->list.write_list_lock();
2040 ct->list.pair_lock_by_fullhash(fullhash);
2041 p = ct->list.find_pair(cf, key, fullhash);
2042 if (p != NULL) {
2043 // we just did another search with the write list lock and
2044 // found the pair this means that in between our
2045 // releasing the read list lock and grabbing the write list lock,
2046 // another thread snuck in and inserted the PAIR into
2047 // the cachetable. For simplicity, we just return
2048 // to the top and restart the function
2049 ct->list.write_list_unlock();
2050 ct->list.pair_unlock_by_fullhash(fullhash);
2051 goto try_again;
2052 }
2053
2054 p = cachetable_insert_at(
2055 ct,
2056 cf,
2057 key,
2058 zero_value,
2059 fullhash,
2060 zero_attr,
2061 write_callback,
2062 CACHETABLE_CLEAN
2063 );
2064 assert(p);
2065 // grab expensive write lock, because we are about to do a fetch
2066 // off disk
2067 // No one can access this pair because
2068 // we hold the write list lock and we just injected
2069 // the pair into the cachetable. Therefore, this lock acquisition
2070 // will not block.
2071 p->value_rwlock.write_lock(true);
2072 pair_unlock(p);
2073 run_unlockers(unlockers); // we hold the write list_lock.
2074 ct->list.write_list_unlock();
2075
2076 // at this point, only the pair is pinned,
2077 // and no pair mutex held, and
2078 // no list lock is held
2079 uint64_t t0 = get_tnow();
2080 cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, false);
2081 cachetable_miss++;
2082 cachetable_misstime += get_tnow() - t0;
2083
2084 if (ct->ev.should_client_thread_sleep()) {
2085 ct->ev.wait_for_cache_pressure_to_subside();
2086 }
2087 if (ct->ev.should_client_wake_eviction_thread()) {
2088 ct->ev.signal_eviction_thread();
2089 }
2090
2091 return TOKUDB_TRY_AGAIN;
2092 }
2093 else {
2094 int r = maybe_pin_pair(p, lock_type, unlockers);
2095 if (r == TOKUDB_TRY_AGAIN) {
2096 return TOKUDB_TRY_AGAIN;
2097 }
2098 assert_zero(r);
2099
2100 if (lock_type != PL_READ) {
2101 bool checkpoint_pending = get_checkpoint_pending(p, &ct->list);
2102 write_locked_pair_for_checkpoint(ct, p, checkpoint_pending);
2103 }
2104
2105 // At this point, we have pinned the PAIR
2106 // and resolved its checkpointing. The pair's
2107 // mutex is not held. The read list lock IS held. Before
2108 // returning the PAIR to the user, we must
2109 // still check for partial fetch
2110 bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
2111 if (partial_fetch_required) {
2112 toku::context fetch_ctx(CTX_PARTIAL_FETCH);
2113
2114 run_unlockers(unlockers);
2115
2116 // we are now getting an expensive write lock, because we
2117 // are doing a partial fetch. So, if we previously have
2118 // either a read lock or a cheap write lock, we need to
2119 // release and reacquire the correct lock type
2120 if (lock_type == PL_READ) {
2121 pair_lock(p);
2122 p->value_rwlock.read_unlock();
2123 p->value_rwlock.write_lock(true);
2124 pair_unlock(p);
2125 }
2126 else if (lock_type == PL_WRITE_CHEAP) {
2127 pair_lock(p);
2128 p->value_rwlock.write_unlock();
2129 p->value_rwlock.write_lock(true);
2130 pair_unlock(p);
2131 }
2132
2133 // Now wait for the I/O to occur.
2134 partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
2135 if (partial_fetch_required) {
2136 do_partial_fetch(ct, cf, p, pf_callback, read_extraargs, false);
2137 }
2138 else {
2139 pair_lock(p);
2140 p->value_rwlock.write_unlock();
2141 pair_unlock(p);
2142 }
2143
2144 if (ct->ev.should_client_thread_sleep()) {
2145 ct->ev.wait_for_cache_pressure_to_subside();
2146 }
2147 if (ct->ev.should_client_wake_eviction_thread()) {
2148 ct->ev.signal_eviction_thread();
2149 }
2150
2151 return TOKUDB_TRY_AGAIN;
2152 }
2153 else {
2154 *value = p->value_data;
2155 return 0;
2156 }
2157 }
2158 // We should not get here. Above code should hit a return in all cases.
2159 abort();
2160 }
2161
2162 struct cachefile_prefetch_args {
2163 PAIR p;
2164 CACHETABLE_FETCH_CALLBACK fetch_callback;
2165 void* read_extraargs;
2166 };
2167
2168 struct cachefile_partial_prefetch_args {
2169 PAIR p;
2170 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback;
2171 void *read_extraargs;
2172 };
2173
2174 // Worker thread function to read a pair from a cachefile to memory
cachetable_reader(void * extra)2175 static void cachetable_reader(void* extra) {
2176 struct cachefile_prefetch_args* cpargs = (struct cachefile_prefetch_args*)extra;
2177 CACHEFILE cf = cpargs->p->cachefile;
2178 CACHETABLE ct = cf->cachetable;
2179 cachetable_fetch_pair(
2180 ct,
2181 cpargs->p->cachefile,
2182 cpargs->p,
2183 cpargs->fetch_callback,
2184 cpargs->read_extraargs,
2185 false
2186 );
2187 bjm_remove_background_job(cf->bjm);
2188 toku_free(cpargs);
2189 }
2190
cachetable_partial_reader(void * extra)2191 static void cachetable_partial_reader(void* extra) {
2192 struct cachefile_partial_prefetch_args *cpargs = (struct cachefile_partial_prefetch_args*)extra;
2193 CACHEFILE cf = cpargs->p->cachefile;
2194 CACHETABLE ct = cf->cachetable;
2195 do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs, false);
2196 bjm_remove_background_job(cf->bjm);
2197 toku_free(cpargs);
2198 }
2199
toku_cachefile_prefetch(CACHEFILE cf,CACHEKEY key,uint32_t fullhash,CACHETABLE_WRITE_CALLBACK write_callback,CACHETABLE_FETCH_CALLBACK fetch_callback,CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,void * read_extraargs,bool * doing_prefetch)2200 int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
2201 CACHETABLE_WRITE_CALLBACK write_callback,
2202 CACHETABLE_FETCH_CALLBACK fetch_callback,
2203 CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
2204 CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
2205 void *read_extraargs,
2206 bool *doing_prefetch)
2207 // Effect: See the documentation for this function in cachetable/cachetable.h
2208 {
2209 int r = 0;
2210 PAIR p = NULL;
2211 if (doing_prefetch) {
2212 *doing_prefetch = false;
2213 }
2214 CACHETABLE ct = cf->cachetable;
2215 // if cachetable has too much data, don't bother prefetching
2216 if (ct->ev.should_client_thread_sleep()) {
2217 goto exit;
2218 }
2219 ct->list.pair_lock_by_fullhash(fullhash);
2220 // lookup
2221 p = ct->list.find_pair(cf, key, fullhash);
2222 // if not found then create a pair and fetch it
2223 if (p == NULL) {
2224 cachetable_prefetches++;
2225 ct->list.pair_unlock_by_fullhash(fullhash);
2226 ct->list.write_list_lock();
2227 ct->list.pair_lock_by_fullhash(fullhash);
2228 p = ct->list.find_pair(cf, key, fullhash);
2229 if (p != NULL) {
2230 ct->list.write_list_unlock();
2231 goto found_pair;
2232 }
2233
2234 r = bjm_add_background_job(cf->bjm);
2235 assert_zero(r);
2236 p = cachetable_insert_at(
2237 ct,
2238 cf,
2239 key,
2240 zero_value,
2241 fullhash,
2242 zero_attr,
2243 write_callback,
2244 CACHETABLE_CLEAN
2245 );
2246 assert(p);
2247 p->value_rwlock.write_lock(true);
2248 pair_unlock(p);
2249 ct->list.write_list_unlock();
2250
2251 struct cachefile_prefetch_args *MALLOC(cpargs);
2252 cpargs->p = p;
2253 cpargs->fetch_callback = fetch_callback;
2254 cpargs->read_extraargs = read_extraargs;
2255 toku_kibbutz_enq(ct->ct_kibbutz, cachetable_reader, cpargs);
2256 if (doing_prefetch) {
2257 *doing_prefetch = true;
2258 }
2259 goto exit;
2260 }
2261
2262 found_pair:
2263 // at this point, p is found, pair's mutex is grabbed, and
2264 // no list lock is held
2265 // TODO(leif): should this also just go ahead and wait if all there
2266 // are to wait for are readers?
2267 if (p->value_rwlock.try_write_lock(true)) {
2268 // nobody else is using the node, so we should go ahead and prefetch
2269 pair_touch(p);
2270 pair_unlock(p);
2271 bool partial_fetch_required = pf_req_callback(p->value_data, read_extraargs);
2272
2273 if (partial_fetch_required) {
2274 r = bjm_add_background_job(cf->bjm);
2275 assert_zero(r);
2276 struct cachefile_partial_prefetch_args *MALLOC(cpargs);
2277 cpargs->p = p;
2278 cpargs->pf_callback = pf_callback;
2279 cpargs->read_extraargs = read_extraargs;
2280 toku_kibbutz_enq(ct->ct_kibbutz, cachetable_partial_reader, cpargs);
2281 if (doing_prefetch) {
2282 *doing_prefetch = true;
2283 }
2284 }
2285 else {
2286 pair_lock(p);
2287 p->value_rwlock.write_unlock();
2288 pair_unlock(p);
2289 }
2290 }
2291 else {
2292 // Couldn't get the write lock cheaply
2293 pair_unlock(p);
2294 }
2295 exit:
2296 return 0;
2297 }
2298
toku_cachefile_verify(CACHEFILE cf)2299 void toku_cachefile_verify (CACHEFILE cf) {
2300 toku_cachetable_verify(cf->cachetable);
2301 }
2302
toku_cachetable_verify(CACHETABLE ct)2303 void toku_cachetable_verify (CACHETABLE ct) {
2304 ct->list.verify();
2305 }
2306
2307
2308
2309 struct pair_flush_for_close{
2310 PAIR p;
2311 BACKGROUND_JOB_MANAGER bjm;
2312 };
2313
cachetable_flush_pair_for_close(void * extra)2314 static void cachetable_flush_pair_for_close(void* extra) {
2315 struct pair_flush_for_close *CAST_FROM_VOIDP(args, extra);
2316 PAIR p = args->p;
2317 CACHEFILE cf = p->cachefile;
2318 CACHETABLE ct = cf->cachetable;
2319 PAIR_ATTR attr;
2320 cachetable_only_write_locked_data(
2321 &ct->ev,
2322 p,
2323 false, // not for a checkpoint, as we assert above
2324 &attr,
2325 false // not a clone
2326 );
2327 p->dirty = CACHETABLE_CLEAN;
2328 bjm_remove_background_job(args->bjm);
2329 toku_free(args);
2330 }
2331
2332
flush_pair_for_close_on_background_thread(PAIR p,BACKGROUND_JOB_MANAGER bjm,CACHETABLE ct)2333 static void flush_pair_for_close_on_background_thread(
2334 PAIR p,
2335 BACKGROUND_JOB_MANAGER bjm,
2336 CACHETABLE ct
2337 )
2338 {
2339 pair_lock(p);
2340 assert(p->value_rwlock.users() == 0);
2341 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2342 assert(!p->cloned_value_data);
2343 if (p->dirty == CACHETABLE_DIRTY) {
2344 int r = bjm_add_background_job(bjm);
2345 assert_zero(r);
2346 struct pair_flush_for_close *XMALLOC(args);
2347 args->p = p;
2348 args->bjm = bjm;
2349 toku_kibbutz_enq(ct->ct_kibbutz, cachetable_flush_pair_for_close, args);
2350 }
2351 pair_unlock(p);
2352 }
2353
remove_pair_for_close(PAIR p,CACHETABLE ct,bool completely)2354 static void remove_pair_for_close(PAIR p, CACHETABLE ct, bool completely) {
2355 pair_lock(p);
2356 assert(p->value_rwlock.users() == 0);
2357 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2358 assert(!p->cloned_value_data);
2359 assert(p->dirty == CACHETABLE_CLEAN);
2360 assert(p->refcount == 0);
2361 if (completely) {
2362 cachetable_remove_pair(&ct->list, &ct->ev, p);
2363 pair_unlock(p);
2364 // TODO: Eventually, we should not hold the write list lock during free
2365 cachetable_free_pair(p);
2366 }
2367 else {
2368 // if we are not evicting completely,
2369 // we only want to remove the PAIR from the cachetable,
2370 // that is, remove from the hashtable and various linked
2371 // list, but we will keep the PAIRS and the linked list
2372 // in the cachefile intact, as they will be cached away
2373 // in case an open comes soon.
2374 ct->list.evict_from_cachetable(p);
2375 pair_unlock(p);
2376 }
2377 }
2378
2379 // helper function for cachetable_flush_cachefile, which happens on a close
2380 // writes out the dirty pairs on background threads and returns when
2381 // the writing is done
write_dirty_pairs_for_close(CACHETABLE ct,CACHEFILE cf)2382 static void write_dirty_pairs_for_close(CACHETABLE ct, CACHEFILE cf) {
2383 BACKGROUND_JOB_MANAGER bjm = NULL;
2384 bjm_init(&bjm);
2385 ct->list.write_list_lock(); // TODO: (Zardosht), verify that this lock is unnecessary to take here
2386 PAIR p = NULL;
2387 // write out dirty PAIRs
2388 uint32_t i;
2389 if (cf) {
2390 for (i = 0, p = cf->cf_head;
2391 i < cf->num_pairs;
2392 i++, p = p->cf_next)
2393 {
2394 flush_pair_for_close_on_background_thread(p, bjm, ct);
2395 }
2396 }
2397 else {
2398 for (i = 0, p = ct->list.m_checkpoint_head;
2399 i < ct->list.m_n_in_table;
2400 i++, p = p->clock_next)
2401 {
2402 flush_pair_for_close_on_background_thread(p, bjm, ct);
2403 }
2404 }
2405 ct->list.write_list_unlock();
2406 bjm_wait_for_jobs_to_finish(bjm);
2407 bjm_destroy(bjm);
2408 }
2409
remove_all_pairs_for_close(CACHETABLE ct,CACHEFILE cf,bool evict_completely)2410 static void remove_all_pairs_for_close(CACHETABLE ct, CACHEFILE cf, bool evict_completely) {
2411 ct->list.write_list_lock();
2412 if (cf) {
2413 if (evict_completely) {
2414 // if we are evicting completely, then the PAIRs will
2415 // be removed from the linked list managed by the
2416 // cachefile, so this while loop works
2417 while (cf->num_pairs > 0) {
2418 PAIR p = cf->cf_head;
2419 remove_pair_for_close(p, ct, evict_completely);
2420 }
2421 }
2422 else {
2423 // on the other hand, if we are not evicting completely,
2424 // then the cachefile's linked list stays intact, and we must
2425 // iterate like this.
2426 for (PAIR p = cf->cf_head; p; p = p->cf_next) {
2427 remove_pair_for_close(p, ct, evict_completely);
2428 }
2429 }
2430 }
2431 else {
2432 while (ct->list.m_n_in_table > 0) {
2433 PAIR p = ct->list.m_checkpoint_head;
2434 // if there is no cachefile, then we better
2435 // be evicting completely because we have no
2436 // cachefile to save the PAIRs to. At least,
2437 // we have no guarantees that the cachefile
2438 // will remain good
2439 invariant(evict_completely);
2440 remove_pair_for_close(p, ct, true);
2441 }
2442 }
2443 ct->list.write_list_unlock();
2444 }
2445
verify_cachefile_flushed(CACHETABLE ct UU (),CACHEFILE cf UU ())2446 static void verify_cachefile_flushed(CACHETABLE ct UU(), CACHEFILE cf UU()) {
2447 #ifdef TOKU_DEBUG_PARANOID
2448 // assert here that cachefile is flushed by checking
2449 // pair_list and finding no pairs belonging to this cachefile
2450 // Make a list of pairs that belong to this cachefile.
2451 if (cf) {
2452 ct->list.write_list_lock();
2453 // assert here that cachefile is flushed by checking
2454 // pair_list and finding no pairs belonging to this cachefile
2455 // Make a list of pairs that belong to this cachefile.
2456 uint32_t i;
2457 PAIR p = NULL;
2458 for (i = 0, p = ct->list.m_checkpoint_head;
2459 i < ct->list.m_n_in_table;
2460 i++, p = p->clock_next)
2461 {
2462 assert(p->cachefile != cf);
2463 }
2464 ct->list.write_list_unlock();
2465 }
2466 #endif
2467 }
2468
2469 // Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
2470 // the cachefile is NULL.
2471 // Must be holding cachetable lock on entry.
2472 //
2473 // This function assumes that no client thread is accessing or
2474 // trying to access the cachefile while this function is executing.
2475 // This implies no client thread will be trying to lock any nodes
2476 // belonging to the cachefile.
2477 //
2478 // This function also assumes that the cachefile is not in the process
2479 // of being used by a checkpoint. If a checkpoint is currently happening,
2480 // it does NOT include this cachefile.
2481 //
cachetable_flush_cachefile(CACHETABLE ct,CACHEFILE cf,bool evict_completely)2482 static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf, bool evict_completely) {
2483 //
2484 // Because work on a kibbutz is always done by the client thread,
2485 // and this function assumes that no client thread is doing any work
2486 // on the cachefile, we assume that no client thread will be adding jobs
2487 // to this cachefile's kibbutz.
2488 //
2489 // The caller of this function must ensure that there are
2490 // no jobs added to the kibbutz. This implies that the only work other
2491 // threads may be doing is work by the writer threads.
2492 //
2493 // first write out dirty PAIRs
2494 write_dirty_pairs_for_close(ct, cf);
2495
2496 // now that everything is clean, get rid of everything
2497 remove_all_pairs_for_close(ct, cf, evict_completely);
2498
2499 verify_cachefile_flushed(ct, cf);
2500 }
2501
2502 /* Requires that no locks be held that are used by the checkpoint logic */
2503 void
toku_cachetable_minicron_shutdown(CACHETABLE ct)2504 toku_cachetable_minicron_shutdown(CACHETABLE ct) {
2505 int r = ct->cp.shutdown();
2506 assert(r==0);
2507 ct->cl.destroy();
2508 }
2509
toku_cachetable_prepare_close(CACHETABLE ct UU ())2510 void toku_cachetable_prepare_close(CACHETABLE ct UU()) {
2511 extern bool toku_serialize_in_parallel;
2512 toku_unsafe_set(&toku_serialize_in_parallel, true);
2513 }
2514
2515 /* Requires that it all be flushed. */
toku_cachetable_close(CACHETABLE * ctp)2516 void toku_cachetable_close (CACHETABLE *ctp) {
2517 CACHETABLE ct = *ctp;
2518 ct->cp.destroy();
2519 ct->cl.destroy();
2520 ct->cf_list.free_stale_data(&ct->ev);
2521 cachetable_flush_cachefile(ct, NULL, true);
2522 ct->ev.destroy();
2523 ct->list.destroy();
2524 ct->cf_list.destroy();
2525
2526 if (ct->client_kibbutz)
2527 toku_kibbutz_destroy(ct->client_kibbutz);
2528 if (ct->ct_kibbutz)
2529 toku_kibbutz_destroy(ct->ct_kibbutz);
2530 if (ct->checkpointing_kibbutz)
2531 toku_kibbutz_destroy(ct->checkpointing_kibbutz);
2532 toku_free(ct->env_dir);
2533 toku_free(ct);
2534 *ctp = 0;
2535 }
2536
test_get_pair(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,bool have_ct_lock)2537 static PAIR test_get_pair(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, bool have_ct_lock) {
2538 CACHETABLE ct = cachefile->cachetable;
2539
2540 if (!have_ct_lock) {
2541 ct->list.read_list_lock();
2542 }
2543
2544 PAIR p = ct->list.find_pair(cachefile, key, fullhash);
2545 assert(p != NULL);
2546 if (!have_ct_lock) {
2547 ct->list.read_list_unlock();
2548 }
2549 return p;
2550 }
2551
2552 //test-only wrapper
toku_test_cachetable_unpin(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,enum cachetable_dirty dirty,PAIR_ATTR attr)2553 int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
2554 // By default we don't have the lock
2555 PAIR p = test_get_pair(cachefile, key, fullhash, false);
2556 return toku_cachetable_unpin(cachefile, p, dirty, attr); // assume read lock is not grabbed, and that it is a write lock
2557 }
2558
2559 //test-only wrapper
toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile,CACHEKEY key,uint32_t fullhash,enum cachetable_dirty dirty,PAIR_ATTR attr)2560 int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
2561 // We hold the cachetable mutex.
2562 PAIR p = test_get_pair(cachefile, key, fullhash, true);
2563 return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr);
2564 }
2565
2566 //test-only wrapper
toku_test_cachetable_unpin_and_remove(CACHEFILE cachefile,CACHEKEY key,CACHETABLE_REMOVE_KEY remove_key,void * remove_key_extra)2567 int toku_test_cachetable_unpin_and_remove (
2568 CACHEFILE cachefile,
2569 CACHEKEY key,
2570 CACHETABLE_REMOVE_KEY remove_key,
2571 void* remove_key_extra)
2572 {
2573 uint32_t fullhash = toku_cachetable_hash(cachefile, key);
2574 PAIR p = test_get_pair(cachefile, key, fullhash, false);
2575 return toku_cachetable_unpin_and_remove(cachefile, p, remove_key, remove_key_extra);
2576 }
2577
toku_cachetable_unpin_and_remove(CACHEFILE cachefile,PAIR p,CACHETABLE_REMOVE_KEY remove_key,void * remove_key_extra)2578 int toku_cachetable_unpin_and_remove (
2579 CACHEFILE cachefile,
2580 PAIR p,
2581 CACHETABLE_REMOVE_KEY remove_key,
2582 void* remove_key_extra
2583 )
2584 {
2585 invariant_notnull(p);
2586 int r = ENOENT;
2587 CACHETABLE ct = cachefile->cachetable;
2588
2589 p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
2590 // grab disk_nb_mutex to ensure any background thread writing
2591 // out a cloned value completes
2592 pair_lock(p);
2593 assert(p->value_rwlock.writers());
2594 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
2595 pair_unlock(p);
2596 assert(p->cloned_value_data == NULL);
2597
2598 //
2599 // take care of key removal
2600 //
2601 ct->list.write_list_lock();
2602 ct->list.read_pending_cheap_lock();
2603 bool for_checkpoint = p->checkpoint_pending;
2604 // now let's wipe out the pending bit, because we are
2605 // removing the PAIR
2606 p->checkpoint_pending = false;
2607
2608 // For the PAIR to not be picked by the
2609 // cleaner thread, we mark the cachepressure_size to be 0
2610 // (This is redundant since we have the write_list_lock)
2611 // This should not be an issue because we call
2612 // cachetable_remove_pair before
2613 // releasing the cachetable lock.
2614 //
2615 CACHEKEY key_to_remove = p->key;
2616 p->attr.cache_pressure_size = 0;
2617 //
2618 // callback for removing the key
2619 // for FTNODEs, this leads to calling
2620 // toku_free_blocknum
2621 //
2622 if (remove_key) {
2623 remove_key(
2624 &key_to_remove,
2625 for_checkpoint,
2626 remove_key_extra
2627 );
2628 }
2629 ct->list.read_pending_cheap_unlock();
2630
2631 pair_lock(p);
2632 p->value_rwlock.write_unlock();
2633 nb_mutex_unlock(&p->disk_nb_mutex);
2634 //
2635 // As of Clayface (6.5), only these threads may be
2636 // blocked waiting to lock this PAIR:
2637 // - the checkpoint thread (because a checkpoint is in progress
2638 // and the PAIR was in the list of pending pairs)
2639 // - a client thread running get_and_pin_nonblocking, who
2640 // ran unlockers, then waited on the PAIR lock.
2641 // While waiting on a PAIR lock, another thread comes in,
2642 // locks the PAIR, and ends up calling unpin_and_remove,
2643 // all while get_and_pin_nonblocking is waiting on the PAIR lock.
2644 // We did not realize this at first, which caused bug #4357
2645 // The following threads CANNOT be blocked waiting on
2646 // the PAIR lock:
2647 // - a thread trying to run eviction via run_eviction.
2648 // That cannot happen because run_eviction only
2649 // attempts to lock PAIRS that are not locked, and this PAIR
2650 // is locked.
2651 // - cleaner thread, for the same reason as a thread running
2652 // eviction
2653 // - client thread doing a normal get_and_pin. The client is smart
2654 // enough to not try to lock a PAIR that another client thread
2655 // is trying to unpin and remove. Note that this includes work
2656 // done on kibbutzes.
2657 // - writer thread. Writer threads do not grab PAIR locks. They
2658 // get PAIR locks transferred to them by client threads.
2659 //
2660
2661 // first thing we do is remove the PAIR from the various
2662 // cachetable data structures, so no other thread can possibly
2663 // access it. We do not want to risk some other thread
2664 // trying to lock this PAIR if we release the write list lock
2665 // below. If some thread is already waiting on the lock,
2666 // then we let that thread grab the lock and finish, but
2667 // we don't want any NEW threads to try to grab the PAIR
2668 // lock.
2669 //
2670 // Because we call cachetable_remove_pair and wait,
2671 // the threads that may be waiting
2672 // on this PAIR lock must be careful to do NOTHING with the PAIR
2673 // As per our analysis above, we only need
2674 // to make sure the checkpoint thread and get_and_pin_nonblocking do
2675 // nothing, and looking at those functions, it is clear they do nothing.
2676 //
2677 cachetable_remove_pair(&ct->list, &ct->ev, p);
2678 ct->list.write_list_unlock();
2679 if (p->refcount > 0) {
2680 pair_wait_for_ref_release_unlocked(p);
2681 }
2682 if (p->value_rwlock.users() > 0) {
2683 // Need to wait for everyone else to leave
2684 // This write lock will be granted only after all waiting
2685 // threads are done.
2686 p->value_rwlock.write_lock(true);
2687 assert(p->refcount == 0);
2688 assert(p->value_rwlock.users() == 1); // us
2689 assert(!p->checkpoint_pending);
2690 assert(p->attr.cache_pressure_size == 0);
2691 p->value_rwlock.write_unlock();
2692 }
2693 // just a sanity check
2694 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
2695 assert(p->cloned_value_data == NULL);
2696 //Remove pair.
2697 pair_unlock(p);
2698 cachetable_free_pair(p);
2699 r = 0;
2700 return r;
2701 }
2702
2703 int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array);
set_filenum_in_array(const FT & ft,const uint32_t index,FILENUM * const array)2704 int set_filenum_in_array(const FT &ft, const uint32_t index, FILENUM *const array) {
2705 array[index] = toku_cachefile_filenum(ft->cf);
2706 return 0;
2707 }
2708
log_open_txn(TOKUTXN txn,void * extra)2709 static int log_open_txn (TOKUTXN txn, void* extra) {
2710 int r;
2711 checkpointer* cp = (checkpointer *)extra;
2712 TOKULOGGER logger = txn->logger;
2713 FILENUMS open_filenums;
2714 uint32_t num_filenums = txn->open_fts.size();
2715 FILENUM array[num_filenums];
2716 if (toku_txn_is_read_only(txn)) {
2717 goto cleanup;
2718 }
2719 else {
2720 cp->increment_num_txns();
2721 }
2722
2723 open_filenums.num = num_filenums;
2724 open_filenums.filenums = array;
2725 //Fill in open_filenums
2726 r = txn->open_fts.iterate<FILENUM, set_filenum_in_array>(array);
2727 invariant(r==0);
2728 switch (toku_txn_get_state(txn)) {
2729 case TOKUTXN_LIVE:{
2730 toku_log_xstillopen(logger, NULL, 0, txn,
2731 toku_txn_get_txnid(txn),
2732 toku_txn_get_txnid(toku_logger_txn_parent(txn)),
2733 txn->roll_info.rollentry_raw_count,
2734 open_filenums,
2735 txn->force_fsync_on_commit,
2736 txn->roll_info.num_rollback_nodes,
2737 txn->roll_info.num_rollentries,
2738 txn->roll_info.spilled_rollback_head,
2739 txn->roll_info.spilled_rollback_tail,
2740 txn->roll_info.current_rollback);
2741 goto cleanup;
2742 }
2743 case TOKUTXN_PREPARING: {
2744 TOKU_XA_XID xa_xid;
2745 toku_txn_get_prepared_xa_xid(txn, &xa_xid);
2746 toku_log_xstillopenprepared(logger, NULL, 0, txn,
2747 toku_txn_get_txnid(txn),
2748 &xa_xid,
2749 txn->roll_info.rollentry_raw_count,
2750 open_filenums,
2751 txn->force_fsync_on_commit,
2752 txn->roll_info.num_rollback_nodes,
2753 txn->roll_info.num_rollentries,
2754 txn->roll_info.spilled_rollback_head,
2755 txn->roll_info.spilled_rollback_tail,
2756 txn->roll_info.current_rollback);
2757 goto cleanup;
2758 }
2759 case TOKUTXN_RETIRED:
2760 case TOKUTXN_COMMITTING:
2761 case TOKUTXN_ABORTING: {
2762 assert(0);
2763 }
2764 }
2765 // default is an error
2766 assert(0);
2767 cleanup:
2768 return 0;
2769 }
2770
2771 // Requires: All three checkpoint-relevant locks must be held (see checkpoint.c).
2772 // Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
2773 // Use the begin_checkpoint callback to take necessary snapshots (header, btt)
2774 // Mark every dirty node as "pending." ("Pending" means that the node must be
2775 // written to disk before it can be modified.)
toku_cachetable_begin_checkpoint(CHECKPOINTER cp,TOKULOGGER UU (logger))2776 void toku_cachetable_begin_checkpoint (CHECKPOINTER cp, TOKULOGGER UU(logger)) {
2777 cp->begin_checkpoint();
2778 }
2779
2780
2781 // This is used by the cachetable_race test.
2782 static volatile int toku_checkpointing_user_data_status = 0;
toku_cachetable_set_checkpointing_user_data_status(int v)2783 static void toku_cachetable_set_checkpointing_user_data_status (int v) {
2784 toku_checkpointing_user_data_status = v;
2785 }
toku_cachetable_get_checkpointing_user_data_status(void)2786 int toku_cachetable_get_checkpointing_user_data_status (void) {
2787 return toku_checkpointing_user_data_status;
2788 }
2789
2790 // Requires: The big checkpoint lock must be held (see checkpoint.c).
2791 // Algorithm: Write all pending nodes to disk
2792 // Use checkpoint callback to write snapshot information to disk (header, btt)
2793 // Use end_checkpoint callback to fsync dictionary and log, and to free unused blocks
2794 // Note: If testcallback is null (for testing purposes only), call it after writing dictionary but before writing log
toku_cachetable_end_checkpoint(CHECKPOINTER cp,TOKULOGGER UU (logger),void (* testcallback_f)(void *),void * testextra)2795 void toku_cachetable_end_checkpoint(CHECKPOINTER cp, TOKULOGGER UU(logger),
2796 void (*testcallback_f)(void*), void* testextra) {
2797 cp->end_checkpoint(testcallback_f, testextra);
2798 }
2799
toku_cachefile_logger(CACHEFILE cf)2800 TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
2801 return cf->cachetable->cp.get_logger();
2802 }
2803
toku_cachefile_filenum(CACHEFILE cf)2804 FILENUM toku_cachefile_filenum (CACHEFILE cf) {
2805 return cf->filenum;
2806 }
2807
2808 // debug functions
2809
toku_cachetable_assert_all_unpinned(CACHETABLE ct)2810 int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
2811 uint32_t i;
2812 int some_pinned=0;
2813 ct->list.read_list_lock();
2814 for (i=0; i<ct->list.m_table_size; i++) {
2815 PAIR p;
2816 for (p=ct->list.m_table[i]; p; p=p->hash_chain) {
2817 pair_lock(p);
2818 if (p->value_rwlock.users()) {
2819 //printf("%s:%d pinned: %" PRId64 " (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data);
2820 some_pinned=1;
2821 }
2822 pair_unlock(p);
2823 }
2824 }
2825 ct->list.read_list_unlock();
2826 return some_pinned;
2827 }
2828
toku_cachefile_count_pinned(CACHEFILE cf,int print_them)2829 int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
2830 assert(cf != NULL);
2831 int n_pinned=0;
2832 CACHETABLE ct = cf->cachetable;
2833 ct->list.read_list_lock();
2834
2835 // Iterate over all the pairs to find pairs specific to the
2836 // given cachefile.
2837 for (uint32_t i = 0; i < ct->list.m_table_size; i++) {
2838 for (PAIR p = ct->list.m_table[i]; p; p = p->hash_chain) {
2839 if (p->cachefile == cf) {
2840 pair_lock(p);
2841 if (p->value_rwlock.users()) {
2842 if (print_them) {
2843 printf("%s:%d pinned: %" PRId64 " (%p)\n",
2844 __FILE__,
2845 __LINE__,
2846 p->key.b,
2847 p->value_data);
2848 }
2849 n_pinned++;
2850 }
2851 pair_unlock(p);
2852 }
2853 }
2854 }
2855
2856 ct->list.read_list_unlock();
2857 return n_pinned;
2858 }
2859
toku_cachetable_print_state(CACHETABLE ct)2860 void toku_cachetable_print_state (CACHETABLE ct) {
2861 uint32_t i;
2862 ct->list.read_list_lock();
2863 for (i=0; i<ct->list.m_table_size; i++) {
2864 PAIR p = ct->list.m_table[i];
2865 if (p != 0) {
2866 pair_lock(p);
2867 printf("t[%u]=", i);
2868 for (p=ct->list.m_table[i]; p; p=p->hash_chain) {
2869 printf(" {%" PRId64 ", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, (int) p->dirty, p->value_rwlock.users(), p->attr.size);
2870 }
2871 printf("\n");
2872 pair_unlock(p);
2873 }
2874 }
2875 ct->list.read_list_unlock();
2876 }
2877
toku_cachetable_get_state(CACHETABLE ct,int * num_entries_ptr,int * hash_size_ptr,long * size_current_ptr,long * size_limit_ptr)2878 void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) {
2879 ct->list.get_state(num_entries_ptr, hash_size_ptr);
2880 ct->ev.get_state(size_current_ptr, size_limit_ptr);
2881 }
2882
toku_cachetable_get_key_state(CACHETABLE ct,CACHEKEY key,CACHEFILE cf,void ** value_ptr,int * dirty_ptr,long long * pin_ptr,long * size_ptr)2883 int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
2884 int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
2885 int r = -1;
2886 uint32_t fullhash = toku_cachetable_hash(cf, key);
2887 ct->list.read_list_lock();
2888 PAIR p = ct->list.find_pair(cf, key, fullhash);
2889 if (p) {
2890 pair_lock(p);
2891 if (value_ptr)
2892 *value_ptr = p->value_data;
2893 if (dirty_ptr)
2894 *dirty_ptr = p->dirty;
2895 if (pin_ptr)
2896 *pin_ptr = p->value_rwlock.users();
2897 if (size_ptr)
2898 *size_ptr = p->attr.size;
2899 r = 0;
2900 pair_unlock(p);
2901 }
2902 ct->list.read_list_unlock();
2903 return r;
2904 }
2905
2906 void
toku_cachefile_set_userdata(CACHEFILE cf,void * userdata,void (* log_fassociate_during_checkpoint)(CACHEFILE,void *),void (* close_userdata)(CACHEFILE,int,void *,bool,LSN),void (* free_userdata)(CACHEFILE,void *),void (* checkpoint_userdata)(CACHEFILE,int,void *),void (* begin_checkpoint_userdata)(LSN,void *),void (* end_checkpoint_userdata)(CACHEFILE,int,void *),void (* note_pin_by_checkpoint)(CACHEFILE,void *),void (* note_unpin_by_checkpoint)(CACHEFILE,void *))2907 toku_cachefile_set_userdata (CACHEFILE cf,
2908 void *userdata,
2909 void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
2910 void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
2911 void (*free_userdata)(CACHEFILE, void*),
2912 void (*checkpoint_userdata)(CACHEFILE, int, void*),
2913 void (*begin_checkpoint_userdata)(LSN, void*),
2914 void (*end_checkpoint_userdata)(CACHEFILE, int, void*),
2915 void (*note_pin_by_checkpoint)(CACHEFILE, void*),
2916 void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
2917 cf->userdata = userdata;
2918 cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
2919 cf->close_userdata = close_userdata;
2920 cf->free_userdata = free_userdata;
2921 cf->checkpoint_userdata = checkpoint_userdata;
2922 cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
2923 cf->end_checkpoint_userdata = end_checkpoint_userdata;
2924 cf->note_pin_by_checkpoint = note_pin_by_checkpoint;
2925 cf->note_unpin_by_checkpoint = note_unpin_by_checkpoint;
2926 }
2927
toku_cachefile_get_userdata(CACHEFILE cf)2928 void *toku_cachefile_get_userdata(CACHEFILE cf) {
2929 return cf->userdata;
2930 }
2931
2932 CACHETABLE
toku_cachefile_get_cachetable(CACHEFILE cf)2933 toku_cachefile_get_cachetable(CACHEFILE cf) {
2934 return cf->cachetable;
2935 }
2936
toku_pair_get_cachefile(PAIR pair)2937 CACHEFILE toku_pair_get_cachefile(PAIR pair) {
2938 return pair->cachefile;
2939 }
2940
2941 //Only called by ft_end_checkpoint
2942 //Must have access to cf->fd (must be protected)
toku_cachefile_fsync(CACHEFILE cf)2943 void toku_cachefile_fsync(CACHEFILE cf) {
2944 toku_file_fsync(cf->fd);
2945 }
2946
2947 // Make it so when the cachefile closes, the underlying file is unlinked
toku_cachefile_unlink_on_close(CACHEFILE cf)2948 void toku_cachefile_unlink_on_close(CACHEFILE cf) {
2949 assert(!cf->unlink_on_close);
2950 cf->unlink_on_close = true;
2951 }
2952
2953 // is this cachefile marked as unlink on close?
toku_cachefile_is_unlink_on_close(CACHEFILE cf)2954 bool toku_cachefile_is_unlink_on_close(CACHEFILE cf) {
2955 return cf->unlink_on_close;
2956 }
2957
toku_cachefile_skip_log_recover_on_close(CACHEFILE cf)2958 void toku_cachefile_skip_log_recover_on_close(CACHEFILE cf) {
2959 cf->skip_log_recover_on_close = true;
2960 }
2961
toku_cachefile_do_log_recover_on_close(CACHEFILE cf)2962 void toku_cachefile_do_log_recover_on_close(CACHEFILE cf) {
2963 cf->skip_log_recover_on_close = false;
2964 }
2965
toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf)2966 bool toku_cachefile_is_skip_log_recover_on_close(CACHEFILE cf) {
2967 return cf->skip_log_recover_on_close;
2968 }
2969
toku_cachefile_size(CACHEFILE cf)2970 uint64_t toku_cachefile_size(CACHEFILE cf) {
2971 int64_t file_size;
2972 int fd = toku_cachefile_get_fd(cf);
2973 int r = toku_os_get_file_size(fd, &file_size);
2974 assert_zero(r);
2975 return file_size;
2976 }
2977
2978 char *
toku_construct_full_name(int count,...)2979 toku_construct_full_name(int count, ...) {
2980 va_list ap;
2981 char *name = NULL;
2982 size_t n = 0;
2983 int i;
2984 va_start(ap, count);
2985 for (i=0; i<count; i++) {
2986 char *arg = va_arg(ap, char *);
2987 if (arg) {
2988 n += 1 + strlen(arg) + 1;
2989 char *XMALLOC_N(n, newname);
2990 if (name && !toku_os_is_absolute_name(arg))
2991 snprintf(newname, n, "%s/%s", name, arg);
2992 else
2993 snprintf(newname, n, "%s", arg);
2994 toku_free(name);
2995 name = newname;
2996 }
2997 }
2998 va_end(ap);
2999
3000 return name;
3001 }
3002
3003 char *
toku_cachetable_get_fname_in_cwd(CACHETABLE ct,const char * fname_in_env)3004 toku_cachetable_get_fname_in_cwd(CACHETABLE ct, const char * fname_in_env) {
3005 return toku_construct_full_name(2, ct->env_dir, fname_in_env);
3006 }
3007
3008 static long
cleaner_thread_rate_pair(PAIR p)3009 cleaner_thread_rate_pair(PAIR p)
3010 {
3011 return p->attr.cache_pressure_size;
3012 }
3013
3014 static int const CLEANER_N_TO_CHECK = 8;
3015
toku_cleaner_thread_for_test(CACHETABLE ct)3016 int toku_cleaner_thread_for_test (CACHETABLE ct) {
3017 return ct->cl.run_cleaner();
3018 }
3019
toku_cleaner_thread(void * cleaner_v)3020 int toku_cleaner_thread (void *cleaner_v) {
3021 cleaner* cl = (cleaner *) cleaner_v;
3022 assert(cl);
3023 return cl->run_cleaner();
3024 }
3025
3026 /////////////////////////////////////////////////////////////////////////
3027 //
3028 // cleaner methods
3029 //
3030 ENSURE_POD(cleaner);
3031
3032 extern uint force_recovery;
3033
init(uint32_t _cleaner_iterations,pair_list * _pl,CACHETABLE _ct)3034 int cleaner::init(uint32_t _cleaner_iterations, pair_list* _pl, CACHETABLE _ct) {
3035 // default is no cleaner, for now
3036 m_cleaner_cron_init = false;
3037 if (force_recovery) return 0;
3038 int r = toku_minicron_setup(&m_cleaner_cron, 0, toku_cleaner_thread, this);
3039 if (r == 0) {
3040 m_cleaner_cron_init = true;
3041 }
3042 TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_cleaner_iterations, sizeof m_cleaner_iterations);
3043 m_cleaner_iterations = _cleaner_iterations;
3044 m_pl = _pl;
3045 m_ct = _ct;
3046 m_cleaner_init = true;
3047 return r;
3048 }
3049
3050 // this function is allowed to be called multiple times
destroy(void)3051 void cleaner::destroy(void) {
3052 if (!m_cleaner_init) {
3053 return;
3054 }
3055 if (m_cleaner_cron_init && !toku_minicron_has_been_shutdown(&m_cleaner_cron)) {
3056 // for test code only, production code uses toku_cachetable_minicron_shutdown()
3057 int r = toku_minicron_shutdown(&m_cleaner_cron);
3058 assert(r==0);
3059 }
3060 }
3061
get_iterations(void)3062 uint32_t cleaner::get_iterations(void) {
3063 return m_cleaner_iterations;
3064 }
3065
set_iterations(uint32_t new_iterations)3066 void cleaner::set_iterations(uint32_t new_iterations) {
3067 m_cleaner_iterations = new_iterations;
3068 }
3069
get_period_unlocked(void)3070 uint32_t cleaner::get_period_unlocked(void) {
3071 return toku_minicron_get_period_in_seconds_unlocked(&m_cleaner_cron);
3072 }
3073
3074 //
3075 // Sets how often the cleaner thread will run, in seconds
3076 //
set_period(uint32_t new_period)3077 void cleaner::set_period(uint32_t new_period) {
3078 toku_minicron_change_period(&m_cleaner_cron, new_period*1000);
3079 }
3080
3081 // Effect: runs a cleaner.
3082 //
3083 // We look through some number of nodes, the first N that we see which are
3084 // unlocked and are not involved in a cachefile flush, pick one, and call
3085 // the cleaner callback. While we're picking a node, we have the
3086 // cachetable lock the whole time, so we don't need any extra
3087 // synchronization. Once we have one we want, we lock it and notify the
3088 // cachefile that we're doing some background work (so a flush won't
3089 // start). At this point, we can safely unlock the cachetable, do the
3090 // work (callback), and unlock/release our claim to the cachefile.
run_cleaner(void)3091 int cleaner::run_cleaner(void) {
3092 toku::context cleaner_ctx(CTX_CLEANER);
3093
3094 int r;
3095 uint32_t num_iterations = this->get_iterations();
3096 for (uint32_t i = 0; i < num_iterations; ++i) {
3097 cleaner_executions++;
3098 m_pl->read_list_lock();
3099 PAIR best_pair = NULL;
3100 int n_seen = 0;
3101 long best_score = 0;
3102 const PAIR first_pair = m_pl->m_cleaner_head;
3103 if (first_pair == NULL) {
3104 // nothing in the cachetable, just get out now
3105 m_pl->read_list_unlock();
3106 break;
3107 }
3108 // here we select a PAIR for cleaning
3109 // look at some number of PAIRS, and
3110 // pick what we think is the best one for cleaning
3111 //***** IMPORTANT ******
3112 // we MUST not pick a PAIR whose rating is 0. We have
3113 // numerous assumptions in other parts of the code that
3114 // this is the case:
3115 // - this is how rollback nodes and leaf nodes are not selected for cleaning
3116 // - this is how a thread that is calling unpin_and_remove will prevent
3117 // the cleaner thread from picking its PAIR (see comments in that function)
3118 do {
3119 //
3120 // We are already holding onto best_pair, if we run across a pair that
3121 // has the same mutex due to a collision in the hashtable, we need
3122 // to be careful.
3123 //
3124 if (best_pair && m_pl->m_cleaner_head->mutex == best_pair->mutex) {
3125 // Advance the cleaner head.
3126 long score = 0;
3127 // only bother with this pair if it has no current users
3128 if (m_pl->m_cleaner_head->value_rwlock.users() == 0) {
3129 score = cleaner_thread_rate_pair(m_pl->m_cleaner_head);
3130 if (score > best_score) {
3131 best_score = score;
3132 best_pair = m_pl->m_cleaner_head;
3133 }
3134 }
3135 m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next;
3136 continue;
3137 }
3138 pair_lock(m_pl->m_cleaner_head);
3139 if (m_pl->m_cleaner_head->value_rwlock.users() > 0) {
3140 pair_unlock(m_pl->m_cleaner_head);
3141 }
3142 else {
3143 n_seen++;
3144 long score = 0;
3145 score = cleaner_thread_rate_pair(m_pl->m_cleaner_head);
3146 if (score > best_score) {
3147 best_score = score;
3148 // Since we found a new best pair, we need to
3149 // free the old best pair.
3150 if (best_pair) {
3151 pair_unlock(best_pair);
3152 }
3153 best_pair = m_pl->m_cleaner_head;
3154 }
3155 else {
3156 pair_unlock(m_pl->m_cleaner_head);
3157 }
3158 }
3159 // Advance the cleaner head.
3160 m_pl->m_cleaner_head = m_pl->m_cleaner_head->clock_next;
3161 } while (m_pl->m_cleaner_head != first_pair && n_seen < CLEANER_N_TO_CHECK);
3162 m_pl->read_list_unlock();
3163
3164 //
3165 // at this point, if we have found a PAIR for cleaning,
3166 // that is, best_pair != NULL, we do the clean
3167 //
3168 // if best_pair !=NULL, then best_pair->mutex is held
3169 // no list lock is held
3170 //
3171 if (best_pair) {
3172 CACHEFILE cf = best_pair->cachefile;
3173 // try to add a background job to the manager
3174 // if we can't, that means the cachefile is flushing, so
3175 // we simply continue the for loop and this iteration
3176 // becomes a no-op
3177 r = bjm_add_background_job(cf->bjm);
3178 if (r) {
3179 pair_unlock(best_pair);
3180 continue;
3181 }
3182 best_pair->value_rwlock.write_lock(true);
3183 pair_unlock(best_pair);
3184 // verify a key assumption.
3185 assert(cleaner_thread_rate_pair(best_pair) > 0);
3186 // check the checkpoint_pending bit
3187 m_pl->read_pending_cheap_lock();
3188 bool checkpoint_pending = best_pair->checkpoint_pending;
3189 best_pair->checkpoint_pending = false;
3190 m_pl->read_pending_cheap_unlock();
3191 if (checkpoint_pending) {
3192 write_locked_pair_for_checkpoint(m_ct, best_pair, true);
3193 }
3194
3195 bool cleaner_callback_called = false;
3196
3197 // it's theoretically possible that after writing a PAIR for checkpoint, the
3198 // PAIR's heuristic tells us nothing needs to be done. It is not possible
3199 // in Dr. Noga, but unit tests verify this behavior works properly.
3200 if (cleaner_thread_rate_pair(best_pair) > 0) {
3201 r = best_pair->cleaner_callback(best_pair->value_data,
3202 best_pair->key,
3203 best_pair->fullhash,
3204 best_pair->write_extraargs);
3205 assert_zero(r);
3206 cleaner_callback_called = true;
3207 }
3208
3209 // The cleaner callback must have unlocked the pair, so we
3210 // don't need to unlock it if the cleaner callback is called.
3211 if (!cleaner_callback_called) {
3212 pair_lock(best_pair);
3213 best_pair->value_rwlock.write_unlock();
3214 pair_unlock(best_pair);
3215 }
3216 // We need to make sure the cachefile sticks around so a close
3217 // can't come destroy it. That's the purpose of this
3218 // "add/remove_background_job" business, which means the
3219 // cachefile is still valid here, even though the cleaner
3220 // callback unlocks the pair.
3221 bjm_remove_background_job(cf->bjm);
3222 }
3223 else {
3224 // If we didn't find anything this time around the cachetable,
3225 // we probably won't find anything if we run around again, so
3226 // just break out from the for-loop now and
3227 // we'll try again when the cleaner thread runs again.
3228 break;
3229 }
3230 }
3231 return 0;
3232 }
3233
3234 static_assert(std::is_pod<pair_list>::value, "pair_list isn't POD");
3235
3236 const uint32_t INITIAL_PAIR_LIST_SIZE = 1<<20;
3237 uint32_t PAIR_LOCK_SIZE = 1<<20;
3238
toku_pair_list_set_lock_size(uint32_t num_locks)3239 void toku_pair_list_set_lock_size(uint32_t num_locks) {
3240 PAIR_LOCK_SIZE = num_locks;
3241 }
3242
evict_pair_from_cachefile(PAIR p)3243 static void evict_pair_from_cachefile(PAIR p) {
3244 CACHEFILE cf = p->cachefile;
3245 if (p->cf_next) {
3246 p->cf_next->cf_prev = p->cf_prev;
3247 }
3248 if (p->cf_prev) {
3249 p->cf_prev->cf_next = p->cf_next;
3250 }
3251 else if (p->cachefile->cf_head == p) {
3252 cf->cf_head = p->cf_next;
3253 }
3254 p->cf_prev = p->cf_next = NULL;
3255 cf->num_pairs--;
3256 }
3257
3258 // Allocates the hash table of pairs inside this pair list.
3259 //
init()3260 void pair_list::init() {
3261 m_table_size = INITIAL_PAIR_LIST_SIZE;
3262 m_num_locks = PAIR_LOCK_SIZE;
3263 m_n_in_table = 0;
3264 m_clock_head = NULL;
3265 m_cleaner_head = NULL;
3266 m_checkpoint_head = NULL;
3267 m_pending_head = NULL;
3268 m_table = NULL;
3269
3270
3271 pthread_rwlockattr_t attr;
3272 pthread_rwlockattr_init(&attr);
3273 #if defined(HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP)
3274 pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
3275 #else
3276 // TODO: need to figure out how to make writer-preferential rwlocks
3277 // happen on osx
3278 #endif
3279 toku_pthread_rwlock_init(*cachetable_m_list_lock_key, &m_list_lock, &attr);
3280 toku_pthread_rwlock_init(*cachetable_m_pending_lock_expensive_key,
3281 &m_pending_lock_expensive,
3282 &attr);
3283 toku_pthread_rwlock_init(
3284 *cachetable_m_pending_lock_cheap_key, &m_pending_lock_cheap, &attr);
3285 XCALLOC_N(m_table_size, m_table);
3286 XCALLOC_N(m_num_locks, m_mutexes);
3287 for (uint64_t i = 0; i < m_num_locks; i++) {
3288 toku_mutex_init(
3289 #ifdef TOKU_PFS_MUTEX_EXTENDED_CACHETABLEMMUTEX
3290 *cachetable_m_mutex_key,
3291 #else
3292 toku_uninstrumented,
3293 #endif
3294 &m_mutexes[i].aligned_mutex,
3295 nullptr);
3296 }
3297 }
3298
3299 // Frees the pair_list hash table. It is expected to be empty by
3300 // the time this is called. Returns an error if there are any
3301 // pairs in any of the hash table slots.
destroy()3302 void pair_list::destroy() {
3303 // Check if any entries exist in the hash table.
3304 for (uint32_t i = 0; i < m_table_size; ++i) {
3305 invariant_null(m_table[i]);
3306 }
3307 for (uint64_t i = 0; i < m_num_locks; i++) {
3308 toku_mutex_destroy(&m_mutexes[i].aligned_mutex);
3309 }
3310 toku_pthread_rwlock_destroy(&m_list_lock);
3311 toku_pthread_rwlock_destroy(&m_pending_lock_expensive);
3312 toku_pthread_rwlock_destroy(&m_pending_lock_cheap);
3313 toku_free(m_table);
3314 toku_free(m_mutexes);
3315 }
3316
3317 // adds a PAIR to the cachetable's structures,
3318 // but does NOT add it to the list maintained by
3319 // the cachefile
add_to_cachetable_only(PAIR p)3320 void pair_list::add_to_cachetable_only(PAIR p) {
3321 // sanity check to make sure that the PAIR does not already exist
3322 PAIR pp = this->find_pair(p->cachefile, p->key, p->fullhash);
3323 assert(pp == NULL);
3324
3325 this->add_to_clock(p);
3326 this->add_to_hash_chain(p);
3327 m_n_in_table++;
3328 }
3329
3330 // This places the given pair inside of the pair list.
3331 //
3332 // requires caller to have grabbed write lock on list.
3333 // requires caller to have p->mutex held as well
3334 //
put(PAIR p)3335 void pair_list::put(PAIR p) {
3336 this->add_to_cachetable_only(p);
3337 this->add_to_cf_list(p);
3338 }
3339
3340 // This removes the given pair from completely from the pair list.
3341 //
3342 // requires caller to have grabbed write lock on list, and p->mutex held
3343 //
evict_completely(PAIR p)3344 void pair_list::evict_completely(PAIR p) {
3345 this->evict_from_cachetable(p);
3346 this->evict_from_cachefile(p);
3347 }
3348
3349 // Removes the PAIR from the cachetable's lists,
3350 // but does NOT impact the list maintained by the cachefile
evict_from_cachetable(PAIR p)3351 void pair_list::evict_from_cachetable(PAIR p) {
3352 this->pair_remove(p);
3353 this->pending_pairs_remove(p);
3354 this->remove_from_hash_chain(p);
3355
3356 assert(m_n_in_table > 0);
3357 m_n_in_table--;
3358 }
3359
3360 // Removes the PAIR from the cachefile's list of PAIRs
evict_from_cachefile(PAIR p)3361 void pair_list::evict_from_cachefile(PAIR p) {
3362 evict_pair_from_cachefile(p);
3363 }
3364
3365 //
3366 // Remove pair from linked list for cleaner/clock
3367 //
3368 //
3369 // requires caller to have grabbed write lock on list.
3370 //
pair_remove(PAIR p)3371 void pair_list::pair_remove (PAIR p) {
3372 if (p->clock_prev == p) {
3373 invariant(m_clock_head == p);
3374 invariant(p->clock_next == p);
3375 invariant(m_cleaner_head == p);
3376 invariant(m_checkpoint_head == p);
3377 m_clock_head = NULL;
3378 m_cleaner_head = NULL;
3379 m_checkpoint_head = NULL;
3380 }
3381 else {
3382 if (p == m_clock_head) {
3383 m_clock_head = m_clock_head->clock_next;
3384 }
3385 if (p == m_cleaner_head) {
3386 m_cleaner_head = m_cleaner_head->clock_next;
3387 }
3388 if (p == m_checkpoint_head) {
3389 m_checkpoint_head = m_checkpoint_head->clock_next;
3390 }
3391 p->clock_prev->clock_next = p->clock_next;
3392 p->clock_next->clock_prev = p->clock_prev;
3393 }
3394 p->clock_prev = p->clock_next = NULL;
3395 }
3396
3397 //Remove a pair from the list of pairs that were marked with the
3398 //pending bit for the in-progress checkpoint.
3399 //
3400 // requires that if the caller is the checkpoint thread, then a read lock
3401 // is grabbed on the list. Otherwise, must have write lock on list.
3402 //
pending_pairs_remove(PAIR p)3403 void pair_list::pending_pairs_remove (PAIR p) {
3404 if (p->pending_next) {
3405 p->pending_next->pending_prev = p->pending_prev;
3406 }
3407 if (p->pending_prev) {
3408 p->pending_prev->pending_next = p->pending_next;
3409 }
3410 else if (m_pending_head==p) {
3411 m_pending_head = p->pending_next;
3412 }
3413 p->pending_prev = p->pending_next = NULL;
3414 }
3415
remove_from_hash_chain(PAIR p)3416 void pair_list::remove_from_hash_chain(PAIR p) {
3417 // Remove it from the hash chain.
3418 unsigned int h = p->fullhash&(m_table_size - 1);
3419 paranoid_invariant(m_table[h] != NULL);
3420 if (m_table[h] == p) {
3421 m_table[h] = p->hash_chain;
3422 }
3423 else {
3424 PAIR curr = m_table[h];
3425 while (curr->hash_chain != p) {
3426 curr = curr->hash_chain;
3427 }
3428 // remove p from the singular linked list
3429 curr->hash_chain = p->hash_chain;
3430 }
3431 p->hash_chain = NULL;
3432 }
3433
3434 // Returns a pair from the pair list, using the given
3435 // pair. If the pair cannot be found, null is returned.
3436 //
3437 // requires caller to have grabbed either a read lock on the list or
3438 // bucket's mutex.
3439 //
find_pair(CACHEFILE file,CACHEKEY key,uint32_t fullhash)3440 PAIR pair_list::find_pair(CACHEFILE file, CACHEKEY key, uint32_t fullhash) {
3441 PAIR found_pair = nullptr;
3442 for (PAIR p = m_table[fullhash&(m_table_size - 1)]; p; p = p->hash_chain) {
3443 if (p->key.b == key.b && p->cachefile == file) {
3444 found_pair = p;
3445 break;
3446 }
3447 }
3448 return found_pair;
3449 }
3450
3451 // Add PAIR to linked list shared by cleaner thread and clock
3452 //
3453 // requires caller to have grabbed write lock on list.
3454 //
add_to_clock(PAIR p)3455 void pair_list::add_to_clock (PAIR p) {
3456 // requires that p is not currently in the table.
3457 // inserts p into the clock list at the tail.
3458
3459 p->count = CLOCK_INITIAL_COUNT;
3460 //assert either both head and tail are set or they are both NULL
3461 // tail and head exist
3462 if (m_clock_head) {
3463 assert(m_cleaner_head);
3464 assert(m_checkpoint_head);
3465 // insert right before the head
3466 p->clock_next = m_clock_head;
3467 p->clock_prev = m_clock_head->clock_prev;
3468
3469 p->clock_prev->clock_next = p;
3470 p->clock_next->clock_prev = p;
3471
3472 }
3473 // this is the first element in the list
3474 else {
3475 m_clock_head = p;
3476 p->clock_next = p->clock_prev = m_clock_head;
3477 m_cleaner_head = p;
3478 m_checkpoint_head = p;
3479 }
3480 }
3481
3482 // add the pair to the linked list that of PAIRs belonging
3483 // to the same cachefile. This linked list is used
3484 // in cachetable_flush_cachefile.
add_to_cf_list(PAIR p)3485 void pair_list::add_to_cf_list(PAIR p) {
3486 CACHEFILE cf = p->cachefile;
3487 if (cf->cf_head) {
3488 cf->cf_head->cf_prev = p;
3489 }
3490 p->cf_next = cf->cf_head;
3491 p->cf_prev = NULL;
3492 cf->cf_head = p;
3493 cf->num_pairs++;
3494 }
3495
3496 // Add PAIR to the hashtable
3497 //
3498 // requires caller to have grabbed write lock on list
3499 // and to have grabbed the p->mutex.
add_to_hash_chain(PAIR p)3500 void pair_list::add_to_hash_chain(PAIR p) {
3501 uint32_t h = p->fullhash & (m_table_size - 1);
3502 p->hash_chain = m_table[h];
3503 m_table[h] = p;
3504 }
3505
3506 // test function
3507 //
3508 // grabs and releases write list lock
3509 //
verify()3510 void pair_list::verify() {
3511 this->write_list_lock();
3512 uint32_t num_found = 0;
3513
3514 // First clear all the verify flags by going through the hash chains
3515 {
3516 uint32_t i;
3517 for (i = 0; i < m_table_size; i++) {
3518 PAIR p;
3519 for (p = m_table[i]; p; p = p->hash_chain) {
3520 num_found++;
3521 }
3522 }
3523 }
3524 assert(num_found == m_n_in_table);
3525 num_found = 0;
3526 // Now go through the clock chain, make sure everything in the LRU chain is hashed.
3527 {
3528 PAIR p;
3529 bool is_first = true;
3530 for (p = m_clock_head; m_clock_head != NULL && (p != m_clock_head || is_first); p=p->clock_next) {
3531 is_first=false;
3532 PAIR p2;
3533 uint32_t fullhash = p->fullhash;
3534 //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
3535 for (p2 = m_table[fullhash&(m_table_size-1)]; p2; p2=p2->hash_chain) {
3536 if (p2==p) {
3537 /* found it */
3538 num_found++;
3539 goto next;
3540 }
3541 }
3542 fprintf(stderr, "Something in the clock chain is not hashed\n");
3543 assert(0);
3544 next:;
3545 }
3546 assert (num_found == m_n_in_table);
3547 }
3548 this->write_list_unlock();
3549 }
3550
3551 // If given pointers are not null, assign the hash table size of
3552 // this pair list and the number of pairs in this pair list.
3553 //
3554 //
3555 // grabs and releases read list lock
3556 //
get_state(int * num_entries,int * hash_size)3557 void pair_list::get_state(int *num_entries, int *hash_size) {
3558 this->read_list_lock();
3559 if (num_entries) {
3560 *num_entries = m_n_in_table;
3561 }
3562 if (hash_size) {
3563 *hash_size = m_table_size;
3564 }
3565 this->read_list_unlock();
3566 }
3567
read_list_lock()3568 void pair_list::read_list_lock() {
3569 toku_pthread_rwlock_rdlock(&m_list_lock);
3570 }
3571
read_list_unlock()3572 void pair_list::read_list_unlock() {
3573 toku_pthread_rwlock_rdunlock(&m_list_lock);
3574 }
3575
write_list_lock()3576 void pair_list::write_list_lock() {
3577 toku_pthread_rwlock_wrlock(&m_list_lock);
3578 }
3579
write_list_unlock()3580 void pair_list::write_list_unlock() {
3581 toku_pthread_rwlock_wrunlock(&m_list_lock);
3582 }
3583
read_pending_exp_lock()3584 void pair_list::read_pending_exp_lock() {
3585 toku_pthread_rwlock_rdlock(&m_pending_lock_expensive);
3586 }
3587
read_pending_exp_unlock()3588 void pair_list::read_pending_exp_unlock() {
3589 toku_pthread_rwlock_rdunlock(&m_pending_lock_expensive);
3590 }
3591
write_pending_exp_lock()3592 void pair_list::write_pending_exp_lock() {
3593 toku_pthread_rwlock_wrlock(&m_pending_lock_expensive);
3594 }
3595
write_pending_exp_unlock()3596 void pair_list::write_pending_exp_unlock() {
3597 toku_pthread_rwlock_wrunlock(&m_pending_lock_expensive);
3598 }
3599
read_pending_cheap_lock()3600 void pair_list::read_pending_cheap_lock() {
3601 toku_pthread_rwlock_rdlock(&m_pending_lock_cheap);
3602 }
3603
read_pending_cheap_unlock()3604 void pair_list::read_pending_cheap_unlock() {
3605 toku_pthread_rwlock_rdunlock(&m_pending_lock_cheap);
3606 }
3607
write_pending_cheap_lock()3608 void pair_list::write_pending_cheap_lock() {
3609 toku_pthread_rwlock_wrlock(&m_pending_lock_cheap);
3610 }
3611
write_pending_cheap_unlock()3612 void pair_list::write_pending_cheap_unlock() {
3613 toku_pthread_rwlock_wrunlock(&m_pending_lock_cheap);
3614 }
3615
get_mutex_for_pair(uint32_t fullhash)3616 toku_mutex_t* pair_list::get_mutex_for_pair(uint32_t fullhash) {
3617 return &m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex;
3618 }
3619
pair_lock_by_fullhash(uint32_t fullhash)3620 void pair_list::pair_lock_by_fullhash(uint32_t fullhash) {
3621 toku_mutex_lock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
3622 }
3623
pair_unlock_by_fullhash(uint32_t fullhash)3624 void pair_list::pair_unlock_by_fullhash(uint32_t fullhash) {
3625 toku_mutex_unlock(&m_mutexes[fullhash&(m_num_locks - 1)].aligned_mutex);
3626 }
3627
3628
3629 ENSURE_POD(evictor);
3630
3631 //
3632 // This is the function that runs eviction on its own thread.
3633 //
eviction_thread(void * evictor_v)3634 static void *eviction_thread(void *evictor_v) {
3635 evictor *CAST_FROM_VOIDP(evictor, evictor_v);
3636 evictor->run_eviction_thread();
3637 return toku_pthread_done(evictor_v);
3638 }
3639
3640 //
3641 // Starts the eviction thread, assigns external object references,
3642 // and initializes all counters and condition variables.
3643 //
init(long _size_limit,pair_list * _pl,cachefile_list * _cf_list,KIBBUTZ _kibbutz,uint32_t eviction_period)3644 int evictor::init(long _size_limit, pair_list* _pl, cachefile_list* _cf_list, KIBBUTZ _kibbutz, uint32_t eviction_period) {
3645 TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_ev_thread_is_running, sizeof m_ev_thread_is_running);
3646 TOKU_VALGRIND_HG_DISABLE_CHECKING(&m_size_evicting, sizeof m_size_evicting);
3647
3648 // set max difference to around 500MB
3649 int64_t max_diff = (1 << 29);
3650
3651 m_low_size_watermark = _size_limit;
3652 // these values are selected kind of arbitrarily right now as
3653 // being a percentage more than low_size_watermark, which is provided
3654 // by the caller.
3655 m_low_size_hysteresis = (11 * _size_limit)/10; //10% more
3656 if ((m_low_size_hysteresis - m_low_size_watermark) > max_diff) {
3657 m_low_size_hysteresis = m_low_size_watermark + max_diff;
3658 }
3659 m_high_size_hysteresis = (5 * _size_limit)/4; // 20% more
3660 if ((m_high_size_hysteresis - m_low_size_hysteresis) > max_diff) {
3661 m_high_size_hysteresis = m_low_size_hysteresis + max_diff;
3662 }
3663 m_high_size_watermark = (3 * _size_limit)/2; // 50% more
3664 if ((m_high_size_watermark - m_high_size_hysteresis) > max_diff) {
3665 m_high_size_watermark = m_high_size_hysteresis + max_diff;
3666 }
3667
3668 m_enable_partial_eviction = true;
3669
3670 m_size_reserved = unreservable_memory(_size_limit);
3671 m_size_current = 0;
3672 m_size_cloned_data = 0;
3673 m_size_evicting = 0;
3674
3675 m_size_nonleaf = create_partitioned_counter();
3676 m_size_leaf = create_partitioned_counter();
3677 m_size_rollback = create_partitioned_counter();
3678 m_size_cachepressure = create_partitioned_counter();
3679 m_wait_pressure_count = create_partitioned_counter();
3680 m_wait_pressure_time = create_partitioned_counter();
3681 m_long_wait_pressure_count = create_partitioned_counter();
3682 m_long_wait_pressure_time = create_partitioned_counter();
3683
3684 m_pl = _pl;
3685 m_cf_list = _cf_list;
3686 m_kibbutz = _kibbutz;
3687 toku_mutex_init(
3688 *cachetable_ev_thread_lock_mutex_key, &m_ev_thread_lock, nullptr);
3689 toku_cond_init(
3690 *cachetable_m_flow_control_cond_key, &m_flow_control_cond, nullptr);
3691 toku_cond_init(
3692 *cachetable_m_ev_thread_cond_key, &m_ev_thread_cond, nullptr);
3693 m_num_sleepers = 0;
3694 m_ev_thread_is_running = false;
3695 m_period_in_seconds = eviction_period;
3696
3697 unsigned int seed = (unsigned int) time(NULL);
3698 int r = myinitstate_r(seed, m_random_statebuf, sizeof m_random_statebuf, &m_random_data);
3699 assert_zero(r);
3700
3701 // start the background thread
3702 m_run_thread = true;
3703 m_num_eviction_thread_runs = 0;
3704 m_ev_thread_init = false;
3705 r = toku_pthread_create(
3706 *eviction_thread_key, &m_ev_thread, nullptr, eviction_thread, this);
3707 if (r == 0) {
3708 m_ev_thread_init = true;
3709 }
3710 m_evictor_init = true;
3711 return r;
3712 }
3713
3714 //
3715 // This stops the eviction thread and clears the condition variable.
3716 //
3717 // NOTE: This should only be called if there are no evictions in progress.
3718 //
destroy()3719 void evictor::destroy() {
3720 if (!m_evictor_init) {
3721 return;
3722 }
3723 assert(m_size_evicting == 0);
3724 //
3725 // commented out of Ming, because we could not finish
3726 // #5672. Once #5672 is solved, we should restore this
3727 //
3728 //assert(m_size_current == 0);
3729
3730 // Stop the eviction thread.
3731 if (m_ev_thread_init) {
3732 toku_mutex_lock(&m_ev_thread_lock);
3733 m_run_thread = false;
3734 this->signal_eviction_thread_locked();
3735 toku_mutex_unlock(&m_ev_thread_lock);
3736 void *ret;
3737 int r = toku_pthread_join(m_ev_thread, &ret);
3738 assert_zero(r);
3739 assert(!m_ev_thread_is_running);
3740 }
3741 destroy_partitioned_counter(m_size_nonleaf);
3742 m_size_nonleaf = NULL;
3743 destroy_partitioned_counter(m_size_leaf);
3744 m_size_leaf = NULL;
3745 destroy_partitioned_counter(m_size_rollback);
3746 m_size_rollback = NULL;
3747 destroy_partitioned_counter(m_size_cachepressure);
3748 m_size_cachepressure = NULL;
3749
3750 destroy_partitioned_counter(m_wait_pressure_count); m_wait_pressure_count = NULL;
3751 destroy_partitioned_counter(m_wait_pressure_time); m_wait_pressure_time = NULL;
3752 destroy_partitioned_counter(m_long_wait_pressure_count); m_long_wait_pressure_count = NULL;
3753 destroy_partitioned_counter(m_long_wait_pressure_time); m_long_wait_pressure_time = NULL;
3754
3755 toku_cond_destroy(&m_flow_control_cond);
3756 toku_cond_destroy(&m_ev_thread_cond);
3757 toku_mutex_destroy(&m_ev_thread_lock);
3758 }
3759
3760 //
3761 // Increases status variables and the current size variable
3762 // of the evictor based on the given pair attribute.
3763 //
add_pair_attr(PAIR_ATTR attr)3764 void evictor::add_pair_attr(PAIR_ATTR attr) {
3765 assert(attr.is_valid);
3766 add_to_size_current(attr.size);
3767 increment_partitioned_counter(m_size_nonleaf, attr.nonleaf_size);
3768 increment_partitioned_counter(m_size_leaf, attr.leaf_size);
3769 increment_partitioned_counter(m_size_rollback, attr.rollback_size);
3770 increment_partitioned_counter(m_size_cachepressure, attr.cache_pressure_size);
3771 }
3772
3773 //
3774 // Decreases status variables and the current size variable
3775 // of the evictor based on the given pair attribute.
3776 //
remove_pair_attr(PAIR_ATTR attr)3777 void evictor::remove_pair_attr(PAIR_ATTR attr) {
3778 assert(attr.is_valid);
3779 remove_from_size_current(attr.size);
3780 increment_partitioned_counter(m_size_nonleaf, 0 - attr.nonleaf_size);
3781 increment_partitioned_counter(m_size_leaf, 0 - attr.leaf_size);
3782 increment_partitioned_counter(m_size_rollback, 0 - attr.rollback_size);
3783 increment_partitioned_counter(m_size_cachepressure, 0 - attr.cache_pressure_size);
3784 }
3785
3786 //
3787 // Updates this evictor's stats to match the "new" pair attribute given
3788 // while also removing the given "old" pair attribute.
3789 //
change_pair_attr(PAIR_ATTR old_attr,PAIR_ATTR new_attr)3790 void evictor::change_pair_attr(PAIR_ATTR old_attr, PAIR_ATTR new_attr) {
3791 this->add_pair_attr(new_attr);
3792 this->remove_pair_attr(old_attr);
3793 }
3794
3795 //
3796 // Adds the given size to the evictor's estimation of
3797 // the size of the cachetable.
3798 //
add_to_size_current(long size)3799 void evictor::add_to_size_current(long size) {
3800 (void) toku_sync_fetch_and_add(&m_size_current, size);
3801 }
3802
3803 //
3804 // Subtracts the given size from the evictor's current
3805 // approximation of the cachetable size.
3806 //
remove_from_size_current(long size)3807 void evictor::remove_from_size_current(long size) {
3808 (void) toku_sync_fetch_and_sub(&m_size_current, size);
3809 }
3810
3811 //
3812 // Adds the size of cloned data to necessary variables in the evictor
3813 //
add_cloned_data_size(long size)3814 void evictor::add_cloned_data_size(long size) {
3815 (void) toku_sync_fetch_and_add(&m_size_cloned_data, size);
3816 add_to_size_current(size);
3817 }
3818
3819 //
3820 // Removes the size of cloned data to necessary variables in the evictor
3821 //
remove_cloned_data_size(long size)3822 void evictor::remove_cloned_data_size(long size) {
3823 (void) toku_sync_fetch_and_sub(&m_size_cloned_data, size);
3824 remove_from_size_current(size);
3825 }
3826
3827 //
3828 // TODO: (Zardosht) comment this function
3829 //
reserve_memory(double fraction,uint64_t upper_bound)3830 uint64_t evictor::reserve_memory(double fraction, uint64_t upper_bound) {
3831 toku_mutex_lock(&m_ev_thread_lock);
3832 uint64_t reserved_memory = fraction * (m_low_size_watermark - m_size_reserved);
3833 if (0) { // debug
3834 fprintf(stderr, "%s %" PRIu64 " %" PRIu64 "\n", __PRETTY_FUNCTION__, reserved_memory, upper_bound);
3835 }
3836 if (upper_bound > 0 && reserved_memory > upper_bound) {
3837 reserved_memory = upper_bound;
3838 }
3839 m_size_reserved += reserved_memory;
3840 (void) toku_sync_fetch_and_add(&m_size_current, reserved_memory);
3841 this->signal_eviction_thread_locked();
3842 toku_mutex_unlock(&m_ev_thread_lock);
3843
3844 if (this->should_client_thread_sleep()) {
3845 this->wait_for_cache_pressure_to_subside();
3846 }
3847 return reserved_memory;
3848 }
3849
3850 //
3851 // TODO: (Zardosht) comment this function
3852 //
release_reserved_memory(uint64_t reserved_memory)3853 void evictor::release_reserved_memory(uint64_t reserved_memory){
3854 (void) toku_sync_fetch_and_sub(&m_size_current, reserved_memory);
3855 toku_mutex_lock(&m_ev_thread_lock);
3856 m_size_reserved -= reserved_memory;
3857 // signal the eviction thread in order to possibly wake up sleeping clients
3858 if (m_num_sleepers > 0) {
3859 this->signal_eviction_thread_locked();
3860 }
3861 toku_mutex_unlock(&m_ev_thread_lock);
3862 }
3863
3864 //
3865 // This function is the eviction thread. It runs for the lifetime of
3866 // the evictor. Goes to sleep for period_in_seconds
3867 // by waiting on m_ev_thread_cond.
3868 //
run_eviction_thread()3869 void evictor::run_eviction_thread(){
3870 toku_mutex_lock(&m_ev_thread_lock);
3871 while (m_run_thread) {
3872 m_num_eviction_thread_runs++; // for test purposes only
3873 m_ev_thread_is_running = true;
3874 // responsibility of run_eviction to release and
3875 // regrab ev_thread_lock as it sees fit
3876 this->run_eviction();
3877 m_ev_thread_is_running = false;
3878
3879 if (m_run_thread) {
3880 //
3881 // sleep until either we are signaled
3882 // via signal_eviction_thread or
3883 // m_period_in_seconds amount of time has passed
3884 //
3885 if (m_period_in_seconds) {
3886 toku_timespec_t wakeup_time;
3887 struct timeval tv;
3888 gettimeofday(&tv, 0);
3889 wakeup_time.tv_sec = tv.tv_sec;
3890 wakeup_time.tv_nsec = tv.tv_usec * 1000LL;
3891 wakeup_time.tv_sec += m_period_in_seconds;
3892 toku_cond_timedwait(
3893 &m_ev_thread_cond,
3894 &m_ev_thread_lock,
3895 &wakeup_time
3896 );
3897 }
3898 // for test purposes, we have an option of
3899 // not waiting on a period, but rather sleeping indefinitely
3900 else {
3901 toku_cond_wait(&m_ev_thread_cond, &m_ev_thread_lock);
3902 }
3903 }
3904 }
3905 toku_mutex_unlock(&m_ev_thread_lock);
3906 }
3907
3908 //
3909 // runs eviction.
3910 // on entry, ev_thread_lock is grabbed, on exit, ev_thread_lock must still be grabbed
3911 // it is the responsibility of this function to release and reacquire ev_thread_lock as it sees fit.
3912 //
run_eviction()3913 void evictor::run_eviction(){
3914 //
3915 // These variables will help us detect if everything in the clock is currently being accessed.
3916 // We must detect this case otherwise we will end up in an infinite loop below.
3917 //
3918 bool exited_early = false;
3919 uint32_t num_pairs_examined_without_evicting = 0;
3920
3921 while (this->eviction_needed()) {
3922 if (m_num_sleepers > 0 && this->should_sleeping_clients_wakeup()) {
3923 toku_cond_broadcast(&m_flow_control_cond);
3924 }
3925 // release ev_thread_lock so that eviction may run without holding mutex
3926 toku_mutex_unlock(&m_ev_thread_lock);
3927
3928 // first try to do an eviction from stale cachefiles
3929 bool some_eviction_ran = m_cf_list->evict_some_stale_pair(this);
3930 if (!some_eviction_ran) {
3931 m_pl->read_list_lock();
3932 PAIR curr_in_clock = m_pl->m_clock_head;
3933 // if nothing to evict, we need to exit
3934 if (!curr_in_clock) {
3935 m_pl->read_list_unlock();
3936 toku_mutex_lock(&m_ev_thread_lock);
3937 exited_early = true;
3938 goto exit;
3939 }
3940 if (num_pairs_examined_without_evicting > m_pl->m_n_in_table) {
3941 // we have a cycle where everything in the clock is in use
3942 // do not return an error
3943 // just let memory be overfull
3944 m_pl->read_list_unlock();
3945 toku_mutex_lock(&m_ev_thread_lock);
3946 exited_early = true;
3947 goto exit;
3948 }
3949 bool eviction_run = run_eviction_on_pair(curr_in_clock);
3950 if (eviction_run) {
3951 // reset the count
3952 num_pairs_examined_without_evicting = 0;
3953 }
3954 else {
3955 num_pairs_examined_without_evicting++;
3956 }
3957 // at this point, either curr_in_clock is still in the list because it has not been fully evicted,
3958 // and we need to move ct->m_clock_head over. Otherwise, curr_in_clock has been fully evicted
3959 // and we do NOT need to move ct->m_clock_head, as the removal of curr_in_clock
3960 // modified ct->m_clock_head
3961 if (m_pl->m_clock_head && (m_pl->m_clock_head == curr_in_clock)) {
3962 m_pl->m_clock_head = m_pl->m_clock_head->clock_next;
3963 }
3964 m_pl->read_list_unlock();
3965 }
3966 toku_mutex_lock(&m_ev_thread_lock);
3967 }
3968
3969 exit:
3970 if (m_num_sleepers > 0 && (exited_early || this->should_sleeping_clients_wakeup())) {
3971 toku_cond_broadcast(&m_flow_control_cond);
3972 }
3973 return;
3974 }
3975
3976 //
3977 // NOTE: Cachetable lock held on entry.
3978 // Runs eviction on the given PAIR. This may be a
3979 // partial eviction or full eviction.
3980 //
3981 // on entry, pair mutex is NOT held, but pair list's read list lock
3982 // IS held
3983 // on exit, the same conditions must apply
3984 //
run_eviction_on_pair(PAIR curr_in_clock)3985 bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
3986 uint32_t n_in_table;
3987 int64_t size_current;
3988 bool ret_val = false;
3989 // function meant to be called on PAIR that is not being accessed right now
3990 CACHEFILE cf = curr_in_clock->cachefile;
3991 int r = bjm_add_background_job(cf->bjm);
3992 if (r) {
3993 goto exit;
3994 }
3995 pair_lock(curr_in_clock);
3996 // these are the circumstances under which we don't run eviction on a pair:
3997 // - if other users are waiting on the lock
3998 // - if the PAIR is referenced by users
3999 // - if the PAIR's disk_nb_mutex is in use, implying that it is
4000 // undergoing a checkpoint
4001 if (curr_in_clock->value_rwlock.users() ||
4002 curr_in_clock->refcount > 0 ||
4003 nb_mutex_users(&curr_in_clock->disk_nb_mutex))
4004 {
4005 pair_unlock(curr_in_clock);
4006 bjm_remove_background_job(cf->bjm);
4007 goto exit;
4008 }
4009
4010 // extract and use these values so that we don't risk them changing
4011 // out from underneath us in calculations below.
4012 n_in_table = m_pl->m_n_in_table;
4013 size_current = m_size_current;
4014
4015 // now that we have the pair mutex we care about, we can
4016 // release the read list lock and reacquire it at the end of the function
4017 m_pl->read_list_unlock();
4018 ret_val = true;
4019 if (curr_in_clock->count > 0) {
4020 toku::context pe_ctx(CTX_PARTIAL_EVICTION);
4021
4022 uint32_t curr_size = curr_in_clock->attr.size;
4023 // if the size of this PAIR is greater than the average size of PAIRs
4024 // in the cachetable, then decrement it, otherwise, decrement
4025 // probabilistically
4026 if (curr_size*n_in_table >= size_current) {
4027 curr_in_clock->count--;
4028 } else {
4029 // generate a random number between 0 and 2^16
4030 assert(size_current <= (INT64_MAX / ((1<<16)-1))); // to protect against possible overflows
4031 int32_t rnd = myrandom_r(&m_random_data) % (1<<16);
4032 // The if-statement below will be true with probability of
4033 // curr_size/(average size of PAIR in cachetable)
4034 // Here is how the math is done:
4035 // average_size = size_current/n_in_table
4036 // curr_size/average_size = curr_size*n_in_table/size_current
4037 // we evaluate if a random number from 0 to 2^16 is less than
4038 // than curr_size/average_size * 2^16. So, our if-clause should be
4039 // if (2^16*curr_size/average_size > rnd)
4040 // this evaluates to:
4041 // if (2^16*curr_size*n_in_table/size_current > rnd)
4042 // by multiplying each side of the equation by size_current, we get
4043 // if (2^16*curr_size*n_in_table > rnd*size_current)
4044 // and dividing each side by 2^16,
4045 // we get the if-clause below
4046 //
4047 if ((((int64_t)curr_size) * n_in_table) >= (((int64_t)rnd) * size_current)>>16) {
4048 curr_in_clock->count--;
4049 }
4050 }
4051
4052 if (m_enable_partial_eviction) {
4053 // call the partial eviction callback
4054 curr_in_clock->value_rwlock.write_lock(true);
4055
4056 void *value = curr_in_clock->value_data;
4057 void* disk_data = curr_in_clock->disk_data;
4058 void *write_extraargs = curr_in_clock->write_extraargs;
4059 enum partial_eviction_cost cost;
4060 long bytes_freed_estimate = 0;
4061 curr_in_clock->pe_est_callback(value, disk_data,
4062 &bytes_freed_estimate, &cost,
4063 write_extraargs);
4064 if (cost == PE_CHEAP) {
4065 pair_unlock(curr_in_clock);
4066 curr_in_clock->size_evicting_estimate = 0;
4067 this->do_partial_eviction(curr_in_clock);
4068 bjm_remove_background_job(cf->bjm);
4069 } else if (cost == PE_EXPENSIVE) {
4070 // only bother running an expensive partial eviction
4071 // if it is expected to free space
4072 if (bytes_freed_estimate > 0) {
4073 pair_unlock(curr_in_clock);
4074 curr_in_clock->size_evicting_estimate = bytes_freed_estimate;
4075 toku_mutex_lock(&m_ev_thread_lock);
4076 m_size_evicting += bytes_freed_estimate;
4077 toku_mutex_unlock(&m_ev_thread_lock);
4078 toku_kibbutz_enq(m_kibbutz, cachetable_partial_eviction,
4079 curr_in_clock);
4080 } else {
4081 curr_in_clock->value_rwlock.write_unlock();
4082 pair_unlock(curr_in_clock);
4083 bjm_remove_background_job(cf->bjm);
4084 }
4085 } else {
4086 assert(false);
4087 }
4088 } else {
4089 pair_unlock(curr_in_clock);
4090 bjm_remove_background_job(cf->bjm);
4091 }
4092 } else {
4093 toku::context pe_ctx(CTX_FULL_EVICTION);
4094
4095 // responsibility of try_evict_pair to eventually remove background job
4096 // pair's mutex is still grabbed here
4097 this->try_evict_pair(curr_in_clock);
4098 }
4099 // regrab the read list lock, because the caller assumes
4100 // that it is held. The contract requires this.
4101 m_pl->read_list_lock();
4102 exit:
4103 return ret_val;
4104 }
4105
4106 struct pair_unpin_with_new_attr_extra {
pair_unpin_with_new_attr_extrapair_unpin_with_new_attr_extra4107 pair_unpin_with_new_attr_extra(evictor *e, PAIR p) :
4108 ev(e), pair(p) {
4109 }
4110 evictor *ev;
4111 PAIR pair;
4112 };
4113
pair_unpin_with_new_attr(PAIR_ATTR new_attr,void * extra)4114 static void pair_unpin_with_new_attr(PAIR_ATTR new_attr, void *extra) {
4115 struct pair_unpin_with_new_attr_extra *info =
4116 reinterpret_cast<struct pair_unpin_with_new_attr_extra *>(extra);
4117 PAIR p = info->pair;
4118 evictor *ev = info->ev;
4119
4120 // change the attr in the evictor, then update the value in the pair
4121 ev->change_pair_attr(p->attr, new_attr);
4122 p->attr = new_attr;
4123
4124 // unpin
4125 pair_lock(p);
4126 p->value_rwlock.write_unlock();
4127 pair_unlock(p);
4128 }
4129
4130 //
4131 // on entry and exit, pair's mutex is not held
4132 // on exit, PAIR is unpinned
4133 //
do_partial_eviction(PAIR p)4134 void evictor::do_partial_eviction(PAIR p) {
4135 // Copy the old attr
4136 PAIR_ATTR old_attr = p->attr;
4137 long long size_evicting_estimate = p->size_evicting_estimate;
4138
4139 struct pair_unpin_with_new_attr_extra extra(this, p);
4140 p->pe_callback(p->value_data, old_attr, p->write_extraargs,
4141 // passed as the finalize continuation, which allows the
4142 // pe_callback to unpin the node before doing expensive cleanup
4143 pair_unpin_with_new_attr, &extra);
4144
4145 // now that the pe_callback (and its pair_unpin_with_new_attr continuation)
4146 // have finished, we can safely decrease size_evicting
4147 this->decrease_size_evicting(size_evicting_estimate);
4148 }
4149
4150 //
4151 // CT lock held on entry
4152 // background job has been added for p->cachefile on entry
4153 // responsibility of this function to make sure that background job is removed
4154 //
4155 // on entry, pair's mutex is held, on exit, the pair's mutex is NOT held
4156 //
try_evict_pair(PAIR p)4157 void evictor::try_evict_pair(PAIR p) {
4158 CACHEFILE cf = p->cachefile;
4159 // evictions without a write or unpinned pair's that are clean
4160 // can be run in the current thread
4161
4162 // the only caller, run_eviction_on_pair, should call this function
4163 // only if no one else is trying to use it
4164 assert(!p->value_rwlock.users());
4165 p->value_rwlock.write_lock(true);
4166 // if the PAIR is dirty, the running eviction requires writing the
4167 // PAIR out. if the disk_nb_mutex is grabbed, then running
4168 // eviction requires waiting for the disk_nb_mutex to become available,
4169 // which may be expensive. Hence, if either is true, we
4170 // do the eviction on a writer thread
4171 if (!p->dirty && (nb_mutex_writers(&p->disk_nb_mutex) == 0)) {
4172 p->size_evicting_estimate = 0;
4173 //
4174 // This method will unpin PAIR and release PAIR mutex
4175 //
4176 // because the PAIR is not dirty, we can safely pass
4177 // false for the for_checkpoint parameter
4178 this->evict_pair(p, false);
4179 bjm_remove_background_job(cf->bjm);
4180 }
4181 else {
4182 pair_unlock(p);
4183 toku_mutex_lock(&m_ev_thread_lock);
4184 assert(m_size_evicting >= 0);
4185 p->size_evicting_estimate = p->attr.size;
4186 m_size_evicting += p->size_evicting_estimate;
4187 assert(m_size_evicting >= 0);
4188 toku_mutex_unlock(&m_ev_thread_lock);
4189 toku_kibbutz_enq(m_kibbutz, cachetable_evicter, p);
4190 }
4191 }
4192
4193 //
4194 // Requires: This thread must hold the write lock (nb_mutex) for the pair.
4195 // The pair's mutex (p->mutex) is also held.
4196 // on exit, neither is held
4197 //
evict_pair(PAIR p,bool for_checkpoint)4198 void evictor::evict_pair(PAIR p, bool for_checkpoint) {
4199 if (p->dirty) {
4200 pair_unlock(p);
4201 cachetable_write_locked_pair(this, p, for_checkpoint);
4202 pair_lock(p);
4203 }
4204 // one thing we can do here is extract the size_evicting estimate,
4205 // have decrease_size_evicting take the estimate and not the pair,
4206 // and do this work after we have called
4207 // cachetable_maybe_remove_and_free_pair
4208 this->decrease_size_evicting(p->size_evicting_estimate);
4209 // if we are to remove this pair, we need the write list lock,
4210 // to get it in a way that avoids deadlocks, we must first release
4211 // the pair's mutex, then grab the write list lock, then regrab the
4212 // pair's mutex. The pair cannot go anywhere because
4213 // the pair is still pinned
4214 nb_mutex_lock(&p->disk_nb_mutex, p->mutex);
4215 pair_unlock(p);
4216 m_pl->write_list_lock();
4217 pair_lock(p);
4218 p->value_rwlock.write_unlock();
4219 nb_mutex_unlock(&p->disk_nb_mutex);
4220 // at this point, we have the pair list's write list lock
4221 // and we have the pair's mutex (p->mutex) held
4222
4223 // this ensures that a clone running in the background first completes
4224 bool removed = false;
4225 if (p->value_rwlock.users() == 0 && p->refcount == 0) {
4226 // assumption is that if we are about to remove the pair
4227 // that no one has grabbed the disk_nb_mutex,
4228 // and that there is no cloned_value_data, because
4229 // no one is writing a cloned value out.
4230 assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
4231 assert(p->cloned_value_data == NULL);
4232 cachetable_remove_pair(m_pl, this, p);
4233 removed = true;
4234 }
4235 pair_unlock(p);
4236 m_pl->write_list_unlock();
4237 // do not want to hold the write list lock while freeing a pair
4238 if (removed) {
4239 cachetable_free_pair(p);
4240 }
4241 }
4242
4243 //
4244 // this function handles the responsibilities for writer threads when they
4245 // decrease size_evicting. The responsibilities are:
4246 // - decrease m_size_evicting in a thread safe manner
4247 // - in some circumstances, signal the eviction thread
4248 //
decrease_size_evicting(long size_evicting_estimate)4249 void evictor::decrease_size_evicting(long size_evicting_estimate) {
4250 if (size_evicting_estimate > 0) {
4251 toku_mutex_lock(&m_ev_thread_lock);
4252 int64_t buffer = m_high_size_hysteresis - m_low_size_watermark;
4253 // if size_evicting is transitioning from greater than buffer to below buffer, and
4254 // some client threads are sleeping, we need to wake up the eviction thread.
4255 // Here is why. In this scenario, we are in one of two cases:
4256 // - size_current - size_evicting < low_size_watermark
4257 // If this is true, then size_current < high_size_hysteresis, which
4258 // means we need to wake up sleeping clients
4259 // - size_current - size_evicting > low_size_watermark,
4260 // which means more evictions must be run.
4261 // The consequences of both cases are the responsibility
4262 // of the eviction thread.
4263 //
4264 bool need_to_signal_ev_thread =
4265 (m_num_sleepers > 0) &&
4266 !m_ev_thread_is_running &&
4267 (m_size_evicting > buffer) &&
4268 ((m_size_evicting - size_evicting_estimate) <= buffer);
4269 m_size_evicting -= size_evicting_estimate;
4270 assert(m_size_evicting >= 0);
4271 if (need_to_signal_ev_thread) {
4272 this->signal_eviction_thread_locked();
4273 }
4274 toku_mutex_unlock(&m_ev_thread_lock);
4275 }
4276 }
4277
4278 //
4279 // Wait for cache table space to become available
4280 // size_current is number of bytes currently occupied by data (referred to by pairs)
4281 // size_evicting is number of bytes queued up to be evicted
4282 //
wait_for_cache_pressure_to_subside()4283 void evictor::wait_for_cache_pressure_to_subside() {
4284 uint64_t t0 = toku_current_time_microsec();
4285 toku_mutex_lock(&m_ev_thread_lock);
4286 m_num_sleepers++;
4287 this->signal_eviction_thread_locked();
4288 toku_cond_wait(&m_flow_control_cond, &m_ev_thread_lock);
4289 m_num_sleepers--;
4290 toku_mutex_unlock(&m_ev_thread_lock);
4291 uint64_t t1 = toku_current_time_microsec();
4292 increment_partitioned_counter(m_wait_pressure_count, 1);
4293 uint64_t tdelta = t1 - t0;
4294 increment_partitioned_counter(m_wait_pressure_time, tdelta);
4295 if (tdelta > 1000000) {
4296 increment_partitioned_counter(m_long_wait_pressure_count, 1);
4297 increment_partitioned_counter(m_long_wait_pressure_time, tdelta);
4298 }
4299 }
4300
4301 //
4302 // Get the status of the current estimated size of the cachetable,
4303 // and the evictor's set limit.
4304 //
get_state(long * size_current_ptr,long * size_limit_ptr)4305 void evictor::get_state(long *size_current_ptr, long *size_limit_ptr) {
4306 if (size_current_ptr) {
4307 *size_current_ptr = m_size_current;
4308 }
4309 if (size_limit_ptr) {
4310 *size_limit_ptr = m_low_size_watermark;
4311 }
4312 }
4313
4314 //
4315 // Force the eviction thread to do some work.
4316 //
4317 // This function does not require any mutex to be held.
4318 // As a result, scheduling is not guaranteed, but that is tolerable.
4319 //
signal_eviction_thread()4320 void evictor::signal_eviction_thread() {
4321 toku_mutex_lock(&m_ev_thread_lock);
4322 toku_cond_signal(&m_ev_thread_cond);
4323 toku_mutex_unlock(&m_ev_thread_lock);
4324 }
4325
signal_eviction_thread_locked()4326 void evictor::signal_eviction_thread_locked() {
4327 toku_cond_signal(&m_ev_thread_cond);
4328 }
4329
4330 //
4331 // Returns true if the cachetable is so over subscribed, that a client thread should sleep
4332 //
4333 // This function may be called in a thread-unsafe manner. Locks are not
4334 // required to read size_current. The result is that
4335 // the values may be a little off, but we think that is tolerable.
4336 //
should_client_thread_sleep()4337 bool evictor::should_client_thread_sleep(){
4338 return unsafe_read_size_current() > m_high_size_watermark;
4339 }
4340
4341 //
4342 // Returns true if a sleeping client should be woken up because
4343 // the cachetable is not overly subscribed
4344 //
4345 // This function may be called in a thread-unsafe manner. Locks are not
4346 // required to read size_current. The result is that
4347 // the values may be a little off, but we think that is tolerable.
4348 //
should_sleeping_clients_wakeup()4349 bool evictor::should_sleeping_clients_wakeup() {
4350 return unsafe_read_size_current() <= m_high_size_hysteresis;
4351 }
4352
4353 //
4354 // Returns true if a client thread should try to wake up the eviction
4355 // thread because the client thread has noticed too much data taken
4356 // up in the cachetable.
4357 //
4358 // This function may be called in a thread-unsafe manner. Locks are not
4359 // required to read size_current or size_evicting. The result is that
4360 // the values may be a little off, but we think that is tolerable.
4361 // If the caller wants to ensure that ev_thread_is_running and size_evicting
4362 // are accurate, then the caller must hold ev_thread_lock before
4363 // calling this function.
4364 //
should_client_wake_eviction_thread()4365 bool evictor::should_client_wake_eviction_thread() {
4366 return
4367 !m_ev_thread_is_running &&
4368 ((unsafe_read_size_current() - m_size_evicting) > m_low_size_hysteresis);
4369 }
4370
4371 //
4372 // Determines if eviction is needed. If the current size of
4373 // the cachetable exceeds the sum of our fixed size limit and
4374 // the amount of data currently being evicted, then eviction is needed
4375 //
eviction_needed()4376 bool evictor::eviction_needed() {
4377 return (m_size_current - m_size_evicting) > m_low_size_watermark;
4378 }
4379
unsafe_read_size_current(void) const4380 inline int64_t evictor::unsafe_read_size_current(void) const {
4381 return m_size_current;
4382 }
4383
fill_engine_status()4384 void evictor::fill_engine_status() {
4385 CT_STATUS_VAL(CT_SIZE_CURRENT) = m_size_current;
4386 CT_STATUS_VAL(CT_SIZE_LIMIT) = m_low_size_hysteresis;
4387 CT_STATUS_VAL(CT_SIZE_WRITING) = m_size_evicting;
4388 CT_STATUS_VAL(CT_SIZE_NONLEAF) = read_partitioned_counter(m_size_nonleaf);
4389 CT_STATUS_VAL(CT_SIZE_LEAF) = read_partitioned_counter(m_size_leaf);
4390 CT_STATUS_VAL(CT_SIZE_ROLLBACK) = read_partitioned_counter(m_size_rollback);
4391 CT_STATUS_VAL(CT_SIZE_CACHEPRESSURE) = read_partitioned_counter(m_size_cachepressure);
4392 CT_STATUS_VAL(CT_SIZE_CLONED) = m_size_cloned_data;
4393 CT_STATUS_VAL(CT_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_wait_pressure_count);
4394 CT_STATUS_VAL(CT_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_wait_pressure_time);
4395 CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_COUNT) = read_partitioned_counter(m_long_wait_pressure_count);
4396 CT_STATUS_VAL(CT_LONG_WAIT_PRESSURE_TIME) = read_partitioned_counter(m_long_wait_pressure_time);
4397 }
4398
set_enable_partial_eviction(bool enabled)4399 void evictor::set_enable_partial_eviction(bool enabled) {
4400 m_enable_partial_eviction = enabled;
4401 }
4402
get_enable_partial_eviction(void) const4403 bool evictor::get_enable_partial_eviction(void) const {
4404 return m_enable_partial_eviction;
4405 }
4406
4407 ////////////////////////////////////////////////////////////////////////////////
4408
4409 ENSURE_POD(checkpointer);
4410
4411 //
4412 // Sets the cachetable reference in this checkpointer class, this is temporary.
4413 //
init(pair_list * _pl,TOKULOGGER _logger,evictor * _ev,cachefile_list * files)4414 int checkpointer::init(pair_list *_pl,
4415 TOKULOGGER _logger,
4416 evictor *_ev,
4417 cachefile_list *files) {
4418 m_list = _pl;
4419 m_logger = _logger;
4420 m_ev = _ev;
4421 m_cf_list = files;
4422 bjm_init(&m_checkpoint_clones_bjm);
4423
4424 // Default is no checkpointing.
4425 m_checkpointer_cron_init = false;
4426 int r = toku_minicron_setup(&m_checkpointer_cron, 0, checkpoint_thread, this);
4427 if (r == 0) {
4428 m_checkpointer_cron_init = true;
4429 }
4430 m_checkpointer_init = true;
4431 return r;
4432 }
4433
destroy()4434 void checkpointer::destroy() {
4435 if (!m_checkpointer_init) {
4436 return;
4437 }
4438 if (m_checkpointer_cron_init && !this->has_been_shutdown()) {
4439 // for test code only, production code uses toku_cachetable_minicron_shutdown()
4440 int r = this->shutdown();
4441 assert(r == 0);
4442 }
4443 bjm_destroy(m_checkpoint_clones_bjm);
4444 }
4445
4446 //
4447 // Sets how often the checkpoint thread will run, in seconds
4448 //
set_checkpoint_period(uint32_t new_period)4449 void checkpointer::set_checkpoint_period(uint32_t new_period) {
4450 toku_minicron_change_period(&m_checkpointer_cron, new_period*1000);
4451 }
4452
4453 //
4454 // Sets how often the checkpoint thread will run.
4455 //
get_checkpoint_period()4456 uint32_t checkpointer::get_checkpoint_period() {
4457 return toku_minicron_get_period_in_seconds_unlocked(&m_checkpointer_cron);
4458 }
4459
4460 //
4461 // Stops the checkpoint thread.
4462 //
shutdown()4463 int checkpointer::shutdown() {
4464 return toku_minicron_shutdown(&m_checkpointer_cron);
4465 }
4466
4467 //
4468 // If checkpointing is running, this returns false.
4469 //
has_been_shutdown()4470 bool checkpointer::has_been_shutdown() {
4471 return toku_minicron_has_been_shutdown(&m_checkpointer_cron);
4472 }
4473
get_logger()4474 TOKULOGGER checkpointer::get_logger() {
4475 return m_logger;
4476 }
4477
increment_num_txns()4478 void checkpointer::increment_num_txns() {
4479 m_checkpoint_num_txns++;
4480 }
4481
4482 struct iterate_begin_checkpoint {
4483 LSN lsn_of_checkpoint_in_progress;
iterate_begin_checkpointiterate_begin_checkpoint4484 iterate_begin_checkpoint(LSN lsn) : lsn_of_checkpoint_in_progress(lsn) { }
fniterate_begin_checkpoint4485 static int fn(const CACHEFILE &cf, const uint32_t UU(idx), struct iterate_begin_checkpoint *info) {
4486 assert(cf->begin_checkpoint_userdata);
4487 if (cf->for_checkpoint) {
4488 cf->begin_checkpoint_userdata(info->lsn_of_checkpoint_in_progress, cf->userdata);
4489 }
4490 return 0;
4491 }
4492 };
4493
4494 //
4495 // Update the user data in any cachefiles in our checkpoint list.
4496 //
update_cachefiles()4497 void checkpointer::update_cachefiles() {
4498 struct iterate_begin_checkpoint iterate(m_lsn_of_checkpoint_in_progress);
4499 int r = m_cf_list->m_active_fileid.iterate<struct iterate_begin_checkpoint,
4500 iterate_begin_checkpoint::fn>(&iterate);
4501 assert_zero(r);
4502 }
4503
4504 struct iterate_note_pin {
fniterate_note_pin4505 static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
4506 assert(cf->note_pin_by_checkpoint);
4507 cf->note_pin_by_checkpoint(cf, cf->userdata);
4508 cf->for_checkpoint = true;
4509 return 0;
4510 }
4511 };
4512
4513 //
4514 // Sets up and kicks off a checkpoint.
4515 //
begin_checkpoint()4516 void checkpointer::begin_checkpoint() {
4517 // 1. Initialize the accountability counters.
4518 m_checkpoint_num_txns = 0;
4519
4520 // 2. Make list of cachefiles to be included in the checkpoint.
4521 m_cf_list->read_lock();
4522 m_cf_list->m_active_fileid.iterate<void *, iterate_note_pin::fn>(nullptr);
4523 m_checkpoint_num_files = m_cf_list->m_active_fileid.size();
4524 m_cf_list->read_unlock();
4525
4526 // 3. Create log entries for this checkpoint.
4527 if (m_logger) {
4528 this->log_begin_checkpoint();
4529 }
4530
4531 bjm_reset(m_checkpoint_clones_bjm);
4532
4533 m_list->write_pending_exp_lock();
4534 m_list->read_list_lock();
4535 m_cf_list->read_lock(); // needed for update_cachefiles
4536 m_list->write_pending_cheap_lock();
4537 // 4. Turn on all the relevant checkpoint pending bits.
4538 this->turn_on_pending_bits();
4539
4540 // 5.
4541 this->update_cachefiles();
4542 m_list->write_pending_cheap_unlock();
4543 m_cf_list->read_unlock();
4544 m_list->read_list_unlock();
4545 m_list->write_pending_exp_unlock();
4546 }
4547
4548 struct iterate_log_fassociate {
fniterate_log_fassociate4549 static int fn(const CACHEFILE &cf, uint32_t UU(idx), void **UU(extra)) {
4550 assert(cf->log_fassociate_during_checkpoint);
4551 cf->log_fassociate_during_checkpoint(cf, cf->userdata);
4552 return 0;
4553 }
4554 };
4555
4556 //
4557 // Assuming the logger exists, this will write out the folloing
4558 // information to the log.
4559 //
4560 // 1. Writes the BEGIN_CHECKPOINT to the log.
4561 // 2. Writes the list of open dictionaries to the log.
4562 // 3. Writes the list of open transactions to the log.
4563 // 4. Writes the list of dicionaries that have had rollback logs suppresed.
4564 //
4565 // NOTE: This also has the side effecto of setting the LSN
4566 // of checkpoint in progress.
4567 //
log_begin_checkpoint()4568 void checkpointer::log_begin_checkpoint() {
4569 int r = 0;
4570
4571 // Write the BEGIN_CHECKPOINT to the log.
4572 LSN begin_lsn={ .lsn = (uint64_t) -1 }; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
4573 TXN_MANAGER mgr = toku_logger_get_txn_manager(m_logger);
4574 TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
4575 toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid);
4576 m_lsn_of_checkpoint_in_progress = begin_lsn;
4577
4578 // Log the list of open dictionaries.
4579 m_cf_list->m_active_fileid.iterate<void *, iterate_log_fassociate::fn>(nullptr);
4580
4581 // Write open transactions to the log.
4582 r = toku_txn_manager_iter_over_live_txns(
4583 m_logger->txn_manager,
4584 log_open_txn,
4585 this
4586 );
4587 assert(r == 0);
4588 }
4589
4590 //
4591 // Sets the pending bits of EVERY PAIR in the cachetable, regardless of
4592 // whether the PAIR is clean or not. It will be the responsibility of
4593 // end_checkpoint or client threads to simply clear the pending bit
4594 // if the PAIR is clean.
4595 //
4596 // On entry and exit , the pair list's read list lock is grabbed, and
4597 // both pending locks are grabbed
4598 //
turn_on_pending_bits()4599 void checkpointer::turn_on_pending_bits() {
4600 PAIR p = NULL;
4601 uint32_t i;
4602 for (i = 0, p = m_list->m_checkpoint_head; i < m_list->m_n_in_table; i++, p = p->clock_next) {
4603 assert(!p->checkpoint_pending);
4604 //Only include pairs belonging to cachefiles in the checkpoint
4605 if (!p->cachefile->for_checkpoint) {
4606 continue;
4607 }
4608 // Mark everything as pending a checkpoint
4609 //
4610 // The rule for the checkpoint_pending bit is as follows:
4611 // - begin_checkpoint may set checkpoint_pending to true
4612 // even though the pair lock on the node is not held.
4613 // - any thread that wants to clear the pending bit must own
4614 // the PAIR lock. Otherwise,
4615 // we may end up clearing the pending bit before the
4616 // current lock is ever released.
4617 p->checkpoint_pending = true;
4618 if (m_list->m_pending_head) {
4619 m_list->m_pending_head->pending_prev = p;
4620 }
4621 p->pending_next = m_list->m_pending_head;
4622 p->pending_prev = NULL;
4623 m_list->m_pending_head = p;
4624 }
4625 invariant(p == m_list->m_checkpoint_head);
4626 }
4627
add_background_job()4628 void checkpointer::add_background_job() {
4629 int r = bjm_add_background_job(m_checkpoint_clones_bjm);
4630 assert_zero(r);
4631 }
remove_background_job()4632 void checkpointer::remove_background_job() {
4633 bjm_remove_background_job(m_checkpoint_clones_bjm);
4634 }
4635
end_checkpoint(void (* testcallback_f)(void *),void * testextra)4636 void checkpointer::end_checkpoint(void (*testcallback_f)(void*), void* testextra) {
4637 toku::scoped_malloc checkpoint_cfs_buf(m_checkpoint_num_files * sizeof(CACHEFILE));
4638 CACHEFILE *checkpoint_cfs = reinterpret_cast<CACHEFILE *>(checkpoint_cfs_buf.get());
4639
4640 this->fill_checkpoint_cfs(checkpoint_cfs);
4641 this->checkpoint_pending_pairs();
4642 this->checkpoint_userdata(checkpoint_cfs);
4643 // For testing purposes only. Dictionary has been fsync-ed to disk but log has not yet been written.
4644 if (testcallback_f) {
4645 testcallback_f(testextra);
4646 }
4647 this->log_end_checkpoint();
4648 this->end_checkpoint_userdata(checkpoint_cfs);
4649
4650 // Delete list of cachefiles in the checkpoint,
4651 this->remove_cachefiles(checkpoint_cfs);
4652 }
4653
4654 struct iterate_checkpoint_cfs {
4655 CACHEFILE *checkpoint_cfs;
4656 uint32_t checkpoint_num_files;
4657 uint32_t curr_index;
iterate_checkpoint_cfsiterate_checkpoint_cfs4658 iterate_checkpoint_cfs(CACHEFILE *cfs, uint32_t num_files) :
4659 checkpoint_cfs(cfs), checkpoint_num_files(num_files), curr_index(0) {
4660 }
fniterate_checkpoint_cfs4661 static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_checkpoint_cfs *info) {
4662 if (cf->for_checkpoint) {
4663 assert(info->curr_index < info->checkpoint_num_files);
4664 info->checkpoint_cfs[info->curr_index] = cf;
4665 info->curr_index++;
4666 }
4667 return 0;
4668 }
4669 };
4670
fill_checkpoint_cfs(CACHEFILE * checkpoint_cfs)4671 void checkpointer::fill_checkpoint_cfs(CACHEFILE* checkpoint_cfs) {
4672 struct iterate_checkpoint_cfs iterate(checkpoint_cfs, m_checkpoint_num_files);
4673
4674 m_cf_list->read_lock();
4675 m_cf_list->m_active_fileid.iterate<struct iterate_checkpoint_cfs, iterate_checkpoint_cfs::fn>(&iterate);
4676 assert(iterate.curr_index == m_checkpoint_num_files);
4677 m_cf_list->read_unlock();
4678 }
4679
checkpoint_pending_pairs()4680 void checkpointer::checkpoint_pending_pairs() {
4681 PAIR p;
4682 m_list->read_list_lock();
4683 while ((p = m_list->m_pending_head)!=0) {
4684 // <CER> TODO: Investigate why we move pending head outisde of the pending_pairs_remove() call.
4685 m_list->m_pending_head = m_list->m_pending_head->pending_next;
4686 m_list->pending_pairs_remove(p);
4687 // if still pending, clear the pending bit and write out the node
4688 pair_lock(p);
4689 m_list->read_list_unlock();
4690 write_pair_for_checkpoint_thread(m_ev, p);
4691 pair_unlock(p);
4692 m_list->read_list_lock();
4693 }
4694 assert(!m_list->m_pending_head);
4695 m_list->read_list_unlock();
4696 bjm_wait_for_jobs_to_finish(m_checkpoint_clones_bjm);
4697 }
4698
checkpoint_userdata(CACHEFILE * checkpoint_cfs)4699 void checkpointer::checkpoint_userdata(CACHEFILE* checkpoint_cfs) {
4700 // have just written data blocks, so next write the translation and header for each open dictionary
4701 for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4702 CACHEFILE cf = checkpoint_cfs[i];
4703 assert(cf->for_checkpoint);
4704 assert(cf->checkpoint_userdata);
4705 toku_cachetable_set_checkpointing_user_data_status(1);
4706 cf->checkpoint_userdata(cf, cf->fd, cf->userdata);
4707 toku_cachetable_set_checkpointing_user_data_status(0);
4708 }
4709 }
4710
log_end_checkpoint()4711 void checkpointer::log_end_checkpoint() {
4712 if (m_logger) {
4713 toku_log_end_checkpoint(m_logger, NULL,
4714 1, // want the end_checkpoint to be fsync'd
4715 m_lsn_of_checkpoint_in_progress,
4716 0,
4717 m_checkpoint_num_files,
4718 m_checkpoint_num_txns);
4719 toku_logger_note_checkpoint(m_logger, m_lsn_of_checkpoint_in_progress);
4720 }
4721 }
4722
end_checkpoint_userdata(CACHEFILE * checkpoint_cfs)4723 void checkpointer::end_checkpoint_userdata(CACHEFILE* checkpoint_cfs) {
4724 // everything has been written to file and fsynced
4725 // ... call checkpoint-end function in block translator
4726 // to free obsolete blocks on disk used by previous checkpoint
4727 //cachefiles_in_checkpoint is protected by the checkpoint_safe_lock
4728 for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4729 CACHEFILE cf = checkpoint_cfs[i];
4730 assert(cf->for_checkpoint);
4731 assert(cf->end_checkpoint_userdata);
4732 cf->end_checkpoint_userdata(cf, cf->fd, cf->userdata);
4733 }
4734 }
4735
4736 //
4737 // Deletes all the cachefiles in this checkpointers cachefile list.
4738 //
remove_cachefiles(CACHEFILE * checkpoint_cfs)4739 void checkpointer::remove_cachefiles(CACHEFILE* checkpoint_cfs) {
4740 // making this a while loop because note_unpin_by_checkpoint may destroy the cachefile
4741 for (uint32_t i = 0; i < m_checkpoint_num_files; i++) {
4742 CACHEFILE cf = checkpoint_cfs[i];
4743 // Checking for function existing so that this function
4744 // can be called from cachetable tests.
4745 assert(cf->for_checkpoint);
4746 cf->for_checkpoint = false;
4747 assert(cf->note_unpin_by_checkpoint);
4748 // Clear the bit saying theis file is in the checkpoint.
4749 cf->note_unpin_by_checkpoint(cf, cf->userdata);
4750 }
4751 }
4752
4753
4754 ////////////////////////////////////////////////////////
4755 //
4756 // cachefiles list
4757 //
4758 static_assert(std::is_pod<cachefile_list>::value, "cachefile_list isn't POD");
4759
init()4760 void cachefile_list::init() {
4761 m_next_filenum_to_use.fileid = 0;
4762 m_next_hash_id_to_use = 0;
4763 toku_pthread_rwlock_init(*cachetable_m_lock_key, &m_lock, nullptr);
4764 m_active_filenum.create();
4765 m_active_fileid.create();
4766 m_stale_fileid.create();
4767 }
4768
destroy()4769 void cachefile_list::destroy() {
4770 m_active_filenum.destroy();
4771 m_active_fileid.destroy();
4772 m_stale_fileid.destroy();
4773 toku_pthread_rwlock_destroy(&m_lock);
4774 }
4775
read_lock()4776 void cachefile_list::read_lock() {
4777 toku_pthread_rwlock_rdlock(&m_lock);
4778 }
4779
read_unlock()4780 void cachefile_list::read_unlock() {
4781 toku_pthread_rwlock_rdunlock(&m_lock);
4782 }
4783
write_lock()4784 void cachefile_list::write_lock() {
4785 toku_pthread_rwlock_wrlock(&m_lock);
4786 }
4787
write_unlock()4788 void cachefile_list::write_unlock() {
4789 toku_pthread_rwlock_wrunlock(&m_lock);
4790 }
4791
4792 struct iterate_find_iname {
4793 const char *iname_in_env;
4794 CACHEFILE found_cf;
iterate_find_inameiterate_find_iname4795 iterate_find_iname(const char *iname) : iname_in_env(iname), found_cf(nullptr) { }
fniterate_find_iname4796 static int fn(const CACHEFILE &cf, uint32_t UU(idx), struct iterate_find_iname *info) {
4797 if (cf->fname_in_env && strcmp(cf->fname_in_env, info->iname_in_env) == 0) {
4798 info->found_cf = cf;
4799 return -1;
4800 }
4801 return 0;
4802 }
4803 };
4804
cachefile_of_iname_in_env(const char * iname_in_env,CACHEFILE * cf)4805 int cachefile_list::cachefile_of_iname_in_env(const char *iname_in_env, CACHEFILE *cf) {
4806 struct iterate_find_iname iterate(iname_in_env);
4807
4808 read_lock();
4809 int r = m_active_fileid.iterate<iterate_find_iname, iterate_find_iname::fn>(&iterate);
4810 if (iterate.found_cf != nullptr) {
4811 assert(strcmp(iterate.found_cf->fname_in_env, iname_in_env) == 0);
4812 *cf = iterate.found_cf;
4813 r = 0;
4814 } else {
4815 r = ENOENT;
4816 }
4817 read_unlock();
4818 return r;
4819 }
4820
cachefile_find_by_filenum(const CACHEFILE & a_cf,const FILENUM & b)4821 static int cachefile_find_by_filenum(const CACHEFILE &a_cf, const FILENUM &b) {
4822 const FILENUM a = a_cf->filenum;
4823 if (a.fileid < b.fileid) {
4824 return -1;
4825 } else if (a.fileid == b.fileid) {
4826 return 0;
4827 } else {
4828 return 1;
4829 }
4830 }
4831
cachefile_of_filenum(FILENUM filenum,CACHEFILE * cf)4832 int cachefile_list::cachefile_of_filenum(FILENUM filenum, CACHEFILE *cf) {
4833 read_lock();
4834 int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, cf, nullptr);
4835 if (r == DB_NOTFOUND) {
4836 r = ENOENT;
4837 } else {
4838 invariant_zero(r);
4839 }
4840 read_unlock();
4841 return r;
4842 }
4843
cachefile_find_by_fileid(const CACHEFILE & a_cf,const struct fileid & b)4844 static int cachefile_find_by_fileid(const CACHEFILE &a_cf, const struct fileid &b) {
4845 return toku_fileid_cmp(a_cf->fileid, b);
4846 }
4847
add_cf_unlocked(CACHEFILE cf)4848 void cachefile_list::add_cf_unlocked(CACHEFILE cf) {
4849 int r;
4850 r = m_active_filenum.insert<FILENUM, cachefile_find_by_filenum>(cf, cf->filenum, nullptr);
4851 assert_zero(r);
4852 r = m_active_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr);
4853 assert_zero(r);
4854 }
4855
add_stale_cf(CACHEFILE cf)4856 void cachefile_list::add_stale_cf(CACHEFILE cf) {
4857 write_lock();
4858 int r = m_stale_fileid.insert<struct fileid, cachefile_find_by_fileid>(cf, cf->fileid, nullptr);
4859 assert_zero(r);
4860 write_unlock();
4861 }
4862
remove_cf(CACHEFILE cf)4863 void cachefile_list::remove_cf(CACHEFILE cf) {
4864 write_lock();
4865
4866 uint32_t idx;
4867 int r;
4868 r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(cf->filenum, nullptr, &idx);
4869 assert_zero(r);
4870 r = m_active_filenum.delete_at(idx);
4871 assert_zero(r);
4872
4873 r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx);
4874 assert_zero(r);
4875 r = m_active_fileid.delete_at(idx);
4876 assert_zero(r);
4877
4878 write_unlock();
4879 }
4880
remove_stale_cf_unlocked(CACHEFILE cf)4881 void cachefile_list::remove_stale_cf_unlocked(CACHEFILE cf) {
4882 uint32_t idx;
4883 int r;
4884 r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(cf->fileid, nullptr, &idx);
4885 assert_zero(r);
4886 r = m_stale_fileid.delete_at(idx);
4887 assert_zero(r);
4888 }
4889
reserve_filenum()4890 FILENUM cachefile_list::reserve_filenum() {
4891 // taking a write lock because we are modifying next_filenum_to_use
4892 FILENUM filenum = FILENUM_NONE;
4893 write_lock();
4894 while (1) {
4895 int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(m_next_filenum_to_use, nullptr, nullptr);
4896 if (r == 0) {
4897 m_next_filenum_to_use.fileid++;
4898 continue;
4899 }
4900 assert(r == DB_NOTFOUND);
4901
4902 // skip the reserved value UINT32_MAX and wrap around to zero
4903 if (m_next_filenum_to_use.fileid == FILENUM_NONE.fileid) {
4904 m_next_filenum_to_use.fileid = 0;
4905 continue;
4906 }
4907
4908 filenum = m_next_filenum_to_use;
4909 m_next_filenum_to_use.fileid++;
4910 break;
4911 }
4912 write_unlock();
4913 return filenum;
4914 }
4915
get_new_hash_id_unlocked()4916 uint32_t cachefile_list::get_new_hash_id_unlocked() {
4917 uint32_t retval = m_next_hash_id_to_use;
4918 m_next_hash_id_to_use++;
4919 return retval;
4920 }
4921
find_cachefile_unlocked(struct fileid * fileid)4922 CACHEFILE cachefile_list::find_cachefile_unlocked(struct fileid* fileid) {
4923 CACHEFILE cf = nullptr;
4924 int r = m_active_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
4925 if (r == 0) {
4926 assert(!cf->unlink_on_close);
4927 }
4928 return cf;
4929 }
4930
find_stale_cachefile_unlocked(struct fileid * fileid)4931 CACHEFILE cachefile_list::find_stale_cachefile_unlocked(struct fileid* fileid) {
4932 CACHEFILE cf = nullptr;
4933 int r = m_stale_fileid.find_zero<struct fileid, cachefile_find_by_fileid>(*fileid, &cf, nullptr);
4934 if (r == 0) {
4935 assert(!cf->unlink_on_close);
4936 }
4937 return cf;
4938 }
4939
verify_unused_filenum(FILENUM filenum)4940 void cachefile_list::verify_unused_filenum(FILENUM filenum) {
4941 int r = m_active_filenum.find_zero<FILENUM, cachefile_find_by_filenum>(filenum, nullptr, nullptr);
4942 assert(r == DB_NOTFOUND);
4943 }
4944
4945 // returns true if some eviction ran, false otherwise
evict_some_stale_pair(evictor * ev)4946 bool cachefile_list::evict_some_stale_pair(evictor* ev) {
4947 write_lock();
4948 if (m_stale_fileid.size() == 0) {
4949 write_unlock();
4950 return false;
4951 }
4952
4953 CACHEFILE stale_cf = nullptr;
4954 int r = m_stale_fileid.fetch(0, &stale_cf);
4955 assert_zero(r);
4956
4957 // we should not have a cf in the stale list
4958 // that does not have any pairs
4959 PAIR p = stale_cf->cf_head;
4960 paranoid_invariant(p != NULL);
4961 evict_pair_from_cachefile(p);
4962
4963 // now that we have evicted something,
4964 // let's check if the cachefile is needed anymore
4965 //
4966 // it is not needed if the latest eviction caused
4967 // the cf_head for that cf to become null
4968 bool destroy_cf = stale_cf->cf_head == nullptr;
4969 if (destroy_cf) {
4970 remove_stale_cf_unlocked(stale_cf);
4971 }
4972
4973 write_unlock();
4974
4975 ev->remove_pair_attr(p->attr);
4976 cachetable_free_pair(p);
4977 if (destroy_cf) {
4978 cachefile_destroy(stale_cf);
4979 }
4980 return true;
4981 }
4982
free_stale_data(evictor * ev)4983 void cachefile_list::free_stale_data(evictor* ev) {
4984 write_lock();
4985 while (m_stale_fileid.size() != 0) {
4986 CACHEFILE stale_cf = nullptr;
4987 int r = m_stale_fileid.fetch(0, &stale_cf);
4988 assert_zero(r);
4989
4990 // we should not have a cf in the stale list
4991 // that does not have any pairs
4992 PAIR p = stale_cf->cf_head;
4993 paranoid_invariant(p != NULL);
4994
4995 evict_pair_from_cachefile(p);
4996 ev->remove_pair_attr(p->attr);
4997 cachetable_free_pair(p);
4998
4999 // now that we have evicted something,
5000 // let's check if the cachefile is needed anymore
5001 if (stale_cf->cf_head == NULL) {
5002 remove_stale_cf_unlocked(stale_cf);
5003 cachefile_destroy(stale_cf);
5004 }
5005 }
5006 write_unlock();
5007 }
5008
5009 void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void);
5010 void
toku_cachetable_helgrind_ignore(void)5011 toku_cachetable_helgrind_ignore(void) {
5012 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss);
5013 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
5014 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
5015 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions);
5016 TOKU_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions);
5017 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status);
5018 }
5019