1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7// Package cache provides interface and implementation of a cache algorithms. 8package cache 9 10import ( 11 "sync" 12 "sync/atomic" 13 "unsafe" 14 15 "github.com/syndtr/goleveldb/leveldb/util" 16) 17 18// Cacher provides interface to implements a caching functionality. 19// An implementation must be safe for concurrent use. 20type Cacher interface { 21 // Capacity returns cache capacity. 22 Capacity() int 23 24 // SetCapacity sets cache capacity. 25 SetCapacity(capacity int) 26 27 // Promote promotes the 'cache node'. 28 Promote(n *Node) 29 30 // Ban evicts the 'cache node' and prevent subsequent 'promote'. 31 Ban(n *Node) 32 33 // Evict evicts the 'cache node'. 34 Evict(n *Node) 35 36 // EvictNS evicts 'cache node' with the given namespace. 37 EvictNS(ns uint64) 38 39 // EvictAll evicts all 'cache node'. 40 EvictAll() 41 42 // Close closes the 'cache tree' 43 Close() error 44} 45 46// Value is a 'cacheable object'. It may implements util.Releaser, if 47// so the the Release method will be called once object is released. 48type Value interface{} 49 50// NamespaceGetter provides convenient wrapper for namespace. 51type NamespaceGetter struct { 52 Cache *Cache 53 NS uint64 54} 55 56// Get simply calls Cache.Get() method. 57func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle { 58 return g.Cache.Get(g.NS, key, setFunc) 59} 60 61// The hash tables implementation is based on: 62// "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu, 63// Kunlong Zhang, and Michael Spear. 64// ACM Symposium on Principles of Distributed Computing, Jul 2014. 65 66const ( 67 mInitialSize = 1 << 4 68 mOverflowThreshold = 1 << 5 69 mOverflowGrowThreshold = 1 << 7 70) 71 72type mBucket struct { 73 mu sync.Mutex 74 node []*Node 75 frozen bool 76} 77 78func (b *mBucket) freeze() []*Node { 79 b.mu.Lock() 80 defer b.mu.Unlock() 81 if !b.frozen { 82 b.frozen = true 83 } 84 return b.node 85} 86 87func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) { 88 b.mu.Lock() 89 90 if b.frozen { 91 b.mu.Unlock() 92 return 93 } 94 95 // Scan the node. 96 for _, n := range b.node { 97 if n.hash == hash && n.ns == ns && n.key == key { 98 atomic.AddInt32(&n.ref, 1) 99 b.mu.Unlock() 100 return true, false, n 101 } 102 } 103 104 // Get only. 105 if noset { 106 b.mu.Unlock() 107 return true, false, nil 108 } 109 110 // Create node. 111 n = &Node{ 112 r: r, 113 hash: hash, 114 ns: ns, 115 key: key, 116 ref: 1, 117 } 118 // Add node to bucket. 119 b.node = append(b.node, n) 120 bLen := len(b.node) 121 b.mu.Unlock() 122 123 // Update counter. 124 grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold 125 if bLen > mOverflowThreshold { 126 grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold 127 } 128 129 // Grow. 130 if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { 131 nhLen := len(h.buckets) << 1 132 nh := &mNode{ 133 buckets: make([]unsafe.Pointer, nhLen), 134 mask: uint32(nhLen) - 1, 135 pred: unsafe.Pointer(h), 136 growThreshold: int32(nhLen * mOverflowThreshold), 137 shrinkThreshold: int32(nhLen >> 1), 138 } 139 ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) 140 if !ok { 141 panic("BUG: failed swapping head") 142 } 143 go nh.initBuckets() 144 } 145 146 return true, true, n 147} 148 149func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) { 150 b.mu.Lock() 151 152 if b.frozen { 153 b.mu.Unlock() 154 return 155 } 156 157 // Scan the node. 158 var ( 159 n *Node 160 bLen int 161 ) 162 for i := range b.node { 163 n = b.node[i] 164 if n.ns == ns && n.key == key { 165 if atomic.LoadInt32(&n.ref) == 0 { 166 deleted = true 167 168 // Call releaser. 169 if n.value != nil { 170 if r, ok := n.value.(util.Releaser); ok { 171 r.Release() 172 } 173 n.value = nil 174 } 175 176 // Remove node from bucket. 177 b.node = append(b.node[:i], b.node[i+1:]...) 178 bLen = len(b.node) 179 } 180 break 181 } 182 } 183 b.mu.Unlock() 184 185 if deleted { 186 // Call OnDel. 187 for _, f := range n.onDel { 188 f() 189 } 190 191 // Update counter. 192 atomic.AddInt32(&r.size, int32(n.size)*-1) 193 shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold 194 if bLen >= mOverflowThreshold { 195 atomic.AddInt32(&h.overflow, -1) 196 } 197 198 // Shrink. 199 if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { 200 nhLen := len(h.buckets) >> 1 201 nh := &mNode{ 202 buckets: make([]unsafe.Pointer, nhLen), 203 mask: uint32(nhLen) - 1, 204 pred: unsafe.Pointer(h), 205 growThreshold: int32(nhLen * mOverflowThreshold), 206 shrinkThreshold: int32(nhLen >> 1), 207 } 208 ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) 209 if !ok { 210 panic("BUG: failed swapping head") 211 } 212 go nh.initBuckets() 213 } 214 } 215 216 return true, deleted 217} 218 219type mNode struct { 220 buckets []unsafe.Pointer // []*mBucket 221 mask uint32 222 pred unsafe.Pointer // *mNode 223 resizeInProgess int32 224 225 overflow int32 226 growThreshold int32 227 shrinkThreshold int32 228} 229 230func (n *mNode) initBucket(i uint32) *mBucket { 231 if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil { 232 return b 233 } 234 235 p := (*mNode)(atomic.LoadPointer(&n.pred)) 236 if p != nil { 237 var node []*Node 238 if n.mask > p.mask { 239 // Grow. 240 pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask])) 241 if pb == nil { 242 pb = p.initBucket(i & p.mask) 243 } 244 m := pb.freeze() 245 // Split nodes. 246 for _, x := range m { 247 if x.hash&n.mask == i { 248 node = append(node, x) 249 } 250 } 251 } else { 252 // Shrink. 253 pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i])) 254 if pb0 == nil { 255 pb0 = p.initBucket(i) 256 } 257 pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))])) 258 if pb1 == nil { 259 pb1 = p.initBucket(i + uint32(len(n.buckets))) 260 } 261 m0 := pb0.freeze() 262 m1 := pb1.freeze() 263 // Merge nodes. 264 node = make([]*Node, 0, len(m0)+len(m1)) 265 node = append(node, m0...) 266 node = append(node, m1...) 267 } 268 b := &mBucket{node: node} 269 if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) { 270 if len(node) > mOverflowThreshold { 271 atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold)) 272 } 273 return b 274 } 275 } 276 277 return (*mBucket)(atomic.LoadPointer(&n.buckets[i])) 278} 279 280func (n *mNode) initBuckets() { 281 for i := range n.buckets { 282 n.initBucket(uint32(i)) 283 } 284 atomic.StorePointer(&n.pred, nil) 285} 286 287// Cache is a 'cache map'. 288type Cache struct { 289 mu sync.RWMutex 290 mHead unsafe.Pointer // *mNode 291 nodes int32 292 size int32 293 cacher Cacher 294 closed bool 295} 296 297// NewCache creates a new 'cache map'. The cacher is optional and 298// may be nil. 299func NewCache(cacher Cacher) *Cache { 300 h := &mNode{ 301 buckets: make([]unsafe.Pointer, mInitialSize), 302 mask: mInitialSize - 1, 303 growThreshold: int32(mInitialSize * mOverflowThreshold), 304 shrinkThreshold: 0, 305 } 306 for i := range h.buckets { 307 h.buckets[i] = unsafe.Pointer(&mBucket{}) 308 } 309 r := &Cache{ 310 mHead: unsafe.Pointer(h), 311 cacher: cacher, 312 } 313 return r 314} 315 316func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) { 317 h := (*mNode)(atomic.LoadPointer(&r.mHead)) 318 i := hash & h.mask 319 b := (*mBucket)(atomic.LoadPointer(&h.buckets[i])) 320 if b == nil { 321 b = h.initBucket(i) 322 } 323 return h, b 324} 325 326func (r *Cache) delete(n *Node) bool { 327 for { 328 h, b := r.getBucket(n.hash) 329 done, deleted := b.delete(r, h, n.hash, n.ns, n.key) 330 if done { 331 return deleted 332 } 333 } 334} 335 336// Nodes returns number of 'cache node' in the map. 337func (r *Cache) Nodes() int { 338 return int(atomic.LoadInt32(&r.nodes)) 339} 340 341// Size returns sums of 'cache node' size in the map. 342func (r *Cache) Size() int { 343 return int(atomic.LoadInt32(&r.size)) 344} 345 346// Capacity returns cache capacity. 347func (r *Cache) Capacity() int { 348 if r.cacher == nil { 349 return 0 350 } 351 return r.cacher.Capacity() 352} 353 354// SetCapacity sets cache capacity. 355func (r *Cache) SetCapacity(capacity int) { 356 if r.cacher != nil { 357 r.cacher.SetCapacity(capacity) 358 } 359} 360 361// Get gets 'cache node' with the given namespace and key. 362// If cache node is not found and setFunc is not nil, Get will atomically creates 363// the 'cache node' by calling setFunc. Otherwise Get will returns nil. 364// 365// The returned 'cache handle' should be released after use by calling Release 366// method. 367func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle { 368 r.mu.RLock() 369 defer r.mu.RUnlock() 370 if r.closed { 371 return nil 372 } 373 374 hash := murmur32(ns, key, 0xf00) 375 for { 376 h, b := r.getBucket(hash) 377 done, _, n := b.get(r, h, hash, ns, key, setFunc == nil) 378 if done { 379 if n != nil { 380 n.mu.Lock() 381 if n.value == nil { 382 if setFunc == nil { 383 n.mu.Unlock() 384 n.unref() 385 return nil 386 } 387 388 n.size, n.value = setFunc() 389 if n.value == nil { 390 n.size = 0 391 n.mu.Unlock() 392 n.unref() 393 return nil 394 } 395 atomic.AddInt32(&r.size, int32(n.size)) 396 } 397 n.mu.Unlock() 398 if r.cacher != nil { 399 r.cacher.Promote(n) 400 } 401 return &Handle{unsafe.Pointer(n)} 402 } 403 404 break 405 } 406 } 407 return nil 408} 409 410// Delete removes and ban 'cache node' with the given namespace and key. 411// A banned 'cache node' will never inserted into the 'cache tree'. Ban 412// only attributed to the particular 'cache node', so when a 'cache node' 413// is recreated it will not be banned. 414// 415// If onDel is not nil, then it will be executed if such 'cache node' 416// doesn't exist or once the 'cache node' is released. 417// 418// Delete return true is such 'cache node' exist. 419func (r *Cache) Delete(ns, key uint64, onDel func()) bool { 420 r.mu.RLock() 421 defer r.mu.RUnlock() 422 if r.closed { 423 return false 424 } 425 426 hash := murmur32(ns, key, 0xf00) 427 for { 428 h, b := r.getBucket(hash) 429 done, _, n := b.get(r, h, hash, ns, key, true) 430 if done { 431 if n != nil { 432 if onDel != nil { 433 n.mu.Lock() 434 n.onDel = append(n.onDel, onDel) 435 n.mu.Unlock() 436 } 437 if r.cacher != nil { 438 r.cacher.Ban(n) 439 } 440 n.unref() 441 return true 442 } 443 444 break 445 } 446 } 447 448 if onDel != nil { 449 onDel() 450 } 451 452 return false 453} 454 455// Evict evicts 'cache node' with the given namespace and key. This will 456// simply call Cacher.Evict. 457// 458// Evict return true is such 'cache node' exist. 459func (r *Cache) Evict(ns, key uint64) bool { 460 r.mu.RLock() 461 defer r.mu.RUnlock() 462 if r.closed { 463 return false 464 } 465 466 hash := murmur32(ns, key, 0xf00) 467 for { 468 h, b := r.getBucket(hash) 469 done, _, n := b.get(r, h, hash, ns, key, true) 470 if done { 471 if n != nil { 472 if r.cacher != nil { 473 r.cacher.Evict(n) 474 } 475 n.unref() 476 return true 477 } 478 479 break 480 } 481 } 482 483 return false 484} 485 486// EvictNS evicts 'cache node' with the given namespace. This will 487// simply call Cacher.EvictNS. 488func (r *Cache) EvictNS(ns uint64) { 489 r.mu.RLock() 490 defer r.mu.RUnlock() 491 if r.closed { 492 return 493 } 494 495 if r.cacher != nil { 496 r.cacher.EvictNS(ns) 497 } 498} 499 500// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll. 501func (r *Cache) EvictAll() { 502 r.mu.RLock() 503 defer r.mu.RUnlock() 504 if r.closed { 505 return 506 } 507 508 if r.cacher != nil { 509 r.cacher.EvictAll() 510 } 511} 512 513// Close closes the 'cache map' and forcefully releases all 'cache node'. 514func (r *Cache) Close() error { 515 r.mu.Lock() 516 if !r.closed { 517 r.closed = true 518 519 h := (*mNode)(r.mHead) 520 h.initBuckets() 521 522 for i := range h.buckets { 523 b := (*mBucket)(h.buckets[i]) 524 for _, n := range b.node { 525 // Call releaser. 526 if n.value != nil { 527 if r, ok := n.value.(util.Releaser); ok { 528 r.Release() 529 } 530 n.value = nil 531 } 532 533 // Call OnDel. 534 for _, f := range n.onDel { 535 f() 536 } 537 n.onDel = nil 538 } 539 } 540 } 541 r.mu.Unlock() 542 543 // Avoid deadlock. 544 if r.cacher != nil { 545 if err := r.cacher.Close(); err != nil { 546 return err 547 } 548 } 549 return nil 550} 551 552// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but 553// unlike Close it doesn't forcefully releases 'cache node'. 554func (r *Cache) CloseWeak() error { 555 r.mu.Lock() 556 if !r.closed { 557 r.closed = true 558 } 559 r.mu.Unlock() 560 561 // Avoid deadlock. 562 if r.cacher != nil { 563 r.cacher.EvictAll() 564 if err := r.cacher.Close(); err != nil { 565 return err 566 } 567 } 568 return nil 569} 570 571// Node is a 'cache node'. 572type Node struct { 573 r *Cache 574 575 hash uint32 576 ns, key uint64 577 578 mu sync.Mutex 579 size int 580 value Value 581 582 ref int32 583 onDel []func() 584 585 CacheData unsafe.Pointer 586} 587 588// NS returns this 'cache node' namespace. 589func (n *Node) NS() uint64 { 590 return n.ns 591} 592 593// Key returns this 'cache node' key. 594func (n *Node) Key() uint64 { 595 return n.key 596} 597 598// Size returns this 'cache node' size. 599func (n *Node) Size() int { 600 return n.size 601} 602 603// Value returns this 'cache node' value. 604func (n *Node) Value() Value { 605 return n.value 606} 607 608// Ref returns this 'cache node' ref counter. 609func (n *Node) Ref() int32 { 610 return atomic.LoadInt32(&n.ref) 611} 612 613// GetHandle returns an handle for this 'cache node'. 614func (n *Node) GetHandle() *Handle { 615 if atomic.AddInt32(&n.ref, 1) <= 1 { 616 panic("BUG: Node.GetHandle on zero ref") 617 } 618 return &Handle{unsafe.Pointer(n)} 619} 620 621func (n *Node) unref() { 622 if atomic.AddInt32(&n.ref, -1) == 0 { 623 n.r.delete(n) 624 } 625} 626 627func (n *Node) unrefLocked() { 628 if atomic.AddInt32(&n.ref, -1) == 0 { 629 n.r.mu.RLock() 630 if !n.r.closed { 631 n.r.delete(n) 632 } 633 n.r.mu.RUnlock() 634 } 635} 636 637// Handle is a 'cache handle' of a 'cache node'. 638type Handle struct { 639 n unsafe.Pointer // *Node 640} 641 642// Value returns the value of the 'cache node'. 643func (h *Handle) Value() Value { 644 n := (*Node)(atomic.LoadPointer(&h.n)) 645 if n != nil { 646 return n.value 647 } 648 return nil 649} 650 651// Release releases this 'cache handle'. 652// It is safe to call release multiple times. 653func (h *Handle) Release() { 654 nPtr := atomic.LoadPointer(&h.n) 655 if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) { 656 n := (*Node)(nPtr) 657 n.unrefLocked() 658 } 659} 660 661func murmur32(ns, key uint64, seed uint32) uint32 { 662 const ( 663 m = uint32(0x5bd1e995) 664 r = 24 665 ) 666 667 k1 := uint32(ns >> 32) 668 k2 := uint32(ns) 669 k3 := uint32(key >> 32) 670 k4 := uint32(key) 671 672 k1 *= m 673 k1 ^= k1 >> r 674 k1 *= m 675 676 k2 *= m 677 k2 ^= k2 >> r 678 k2 *= m 679 680 k3 *= m 681 k3 ^= k3 >> r 682 k3 *= m 683 684 k4 *= m 685 k4 ^= k4 >> r 686 k4 *= m 687 688 h := seed 689 690 h *= m 691 h ^= k1 692 h *= m 693 h ^= k2 694 h *= m 695 h ^= k3 696 h *= m 697 h ^= k4 698 699 h ^= h >> 13 700 h *= m 701 h ^= h >> 15 702 703 return h 704} 705