1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7// Package cache provides interface and implementation of a cache algorithms.
8package cache
9
10import (
11	"sync"
12	"sync/atomic"
13	"unsafe"
14
15	"github.com/syndtr/goleveldb/leveldb/util"
16)
17
18// Cacher provides interface to implements a caching functionality.
19// An implementation must be safe for concurrent use.
20type Cacher interface {
21	// Capacity returns cache capacity.
22	Capacity() int
23
24	// SetCapacity sets cache capacity.
25	SetCapacity(capacity int)
26
27	// Promote promotes the 'cache node'.
28	Promote(n *Node)
29
30	// Ban evicts the 'cache node' and prevent subsequent 'promote'.
31	Ban(n *Node)
32
33	// Evict evicts the 'cache node'.
34	Evict(n *Node)
35
36	// EvictNS evicts 'cache node' with the given namespace.
37	EvictNS(ns uint64)
38
39	// EvictAll evicts all 'cache node'.
40	EvictAll()
41
42	// Close closes the 'cache tree'
43	Close() error
44}
45
46// Value is a 'cacheable object'. It may implements util.Releaser, if
47// so the the Release method will be called once object is released.
48type Value interface{}
49
50// NamespaceGetter provides convenient wrapper for namespace.
51type NamespaceGetter struct {
52	Cache *Cache
53	NS    uint64
54}
55
56// Get simply calls Cache.Get() method.
57func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
58	return g.Cache.Get(g.NS, key, setFunc)
59}
60
61// The hash tables implementation is based on:
62// "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
63// Kunlong Zhang, and Michael Spear.
64// ACM Symposium on Principles of Distributed Computing, Jul 2014.
65
66const (
67	mInitialSize           = 1 << 4
68	mOverflowThreshold     = 1 << 5
69	mOverflowGrowThreshold = 1 << 7
70)
71
72type mBucket struct {
73	mu     sync.Mutex
74	node   []*Node
75	frozen bool
76}
77
78func (b *mBucket) freeze() []*Node {
79	b.mu.Lock()
80	defer b.mu.Unlock()
81	if !b.frozen {
82		b.frozen = true
83	}
84	return b.node
85}
86
87func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
88	b.mu.Lock()
89
90	if b.frozen {
91		b.mu.Unlock()
92		return
93	}
94
95	// Scan the node.
96	for _, n := range b.node {
97		if n.hash == hash && n.ns == ns && n.key == key {
98			atomic.AddInt32(&n.ref, 1)
99			b.mu.Unlock()
100			return true, false, n
101		}
102	}
103
104	// Get only.
105	if noset {
106		b.mu.Unlock()
107		return true, false, nil
108	}
109
110	// Create node.
111	n = &Node{
112		r:    r,
113		hash: hash,
114		ns:   ns,
115		key:  key,
116		ref:  1,
117	}
118	// Add node to bucket.
119	b.node = append(b.node, n)
120	bLen := len(b.node)
121	b.mu.Unlock()
122
123	// Update counter.
124	grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
125	if bLen > mOverflowThreshold {
126		grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
127	}
128
129	// Grow.
130	if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
131		nhLen := len(h.buckets) << 1
132		nh := &mNode{
133			buckets:         make([]unsafe.Pointer, nhLen),
134			mask:            uint32(nhLen) - 1,
135			pred:            unsafe.Pointer(h),
136			growThreshold:   int32(nhLen * mOverflowThreshold),
137			shrinkThreshold: int32(nhLen >> 1),
138		}
139		ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
140		if !ok {
141			panic("BUG: failed swapping head")
142		}
143		go nh.initBuckets()
144	}
145
146	return true, true, n
147}
148
149func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
150	b.mu.Lock()
151
152	if b.frozen {
153		b.mu.Unlock()
154		return
155	}
156
157	// Scan the node.
158	var (
159		n    *Node
160		bLen int
161	)
162	for i := range b.node {
163		n = b.node[i]
164		if n.ns == ns && n.key == key {
165			if atomic.LoadInt32(&n.ref) == 0 {
166				deleted = true
167
168				// Call releaser.
169				if n.value != nil {
170					if r, ok := n.value.(util.Releaser); ok {
171						r.Release()
172					}
173					n.value = nil
174				}
175
176				// Remove node from bucket.
177				b.node = append(b.node[:i], b.node[i+1:]...)
178				bLen = len(b.node)
179			}
180			break
181		}
182	}
183	b.mu.Unlock()
184
185	if deleted {
186		// Call OnDel.
187		for _, f := range n.onDel {
188			f()
189		}
190
191		// Update counter.
192		atomic.AddInt32(&r.size, int32(n.size)*-1)
193		shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
194		if bLen >= mOverflowThreshold {
195			atomic.AddInt32(&h.overflow, -1)
196		}
197
198		// Shrink.
199		if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
200			nhLen := len(h.buckets) >> 1
201			nh := &mNode{
202				buckets:         make([]unsafe.Pointer, nhLen),
203				mask:            uint32(nhLen) - 1,
204				pred:            unsafe.Pointer(h),
205				growThreshold:   int32(nhLen * mOverflowThreshold),
206				shrinkThreshold: int32(nhLen >> 1),
207			}
208			ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
209			if !ok {
210				panic("BUG: failed swapping head")
211			}
212			go nh.initBuckets()
213		}
214	}
215
216	return true, deleted
217}
218
219type mNode struct {
220	buckets         []unsafe.Pointer // []*mBucket
221	mask            uint32
222	pred            unsafe.Pointer // *mNode
223	resizeInProgess int32
224
225	overflow        int32
226	growThreshold   int32
227	shrinkThreshold int32
228}
229
230func (n *mNode) initBucket(i uint32) *mBucket {
231	if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
232		return b
233	}
234
235	p := (*mNode)(atomic.LoadPointer(&n.pred))
236	if p != nil {
237		var node []*Node
238		if n.mask > p.mask {
239			// Grow.
240			pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
241			if pb == nil {
242				pb = p.initBucket(i & p.mask)
243			}
244			m := pb.freeze()
245			// Split nodes.
246			for _, x := range m {
247				if x.hash&n.mask == i {
248					node = append(node, x)
249				}
250			}
251		} else {
252			// Shrink.
253			pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
254			if pb0 == nil {
255				pb0 = p.initBucket(i)
256			}
257			pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
258			if pb1 == nil {
259				pb1 = p.initBucket(i + uint32(len(n.buckets)))
260			}
261			m0 := pb0.freeze()
262			m1 := pb1.freeze()
263			// Merge nodes.
264			node = make([]*Node, 0, len(m0)+len(m1))
265			node = append(node, m0...)
266			node = append(node, m1...)
267		}
268		b := &mBucket{node: node}
269		if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
270			if len(node) > mOverflowThreshold {
271				atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
272			}
273			return b
274		}
275	}
276
277	return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
278}
279
280func (n *mNode) initBuckets() {
281	for i := range n.buckets {
282		n.initBucket(uint32(i))
283	}
284	atomic.StorePointer(&n.pred, nil)
285}
286
287// Cache is a 'cache map'.
288type Cache struct {
289	mu     sync.RWMutex
290	mHead  unsafe.Pointer // *mNode
291	nodes  int32
292	size   int32
293	cacher Cacher
294	closed bool
295}
296
297// NewCache creates a new 'cache map'. The cacher is optional and
298// may be nil.
299func NewCache(cacher Cacher) *Cache {
300	h := &mNode{
301		buckets:         make([]unsafe.Pointer, mInitialSize),
302		mask:            mInitialSize - 1,
303		growThreshold:   int32(mInitialSize * mOverflowThreshold),
304		shrinkThreshold: 0,
305	}
306	for i := range h.buckets {
307		h.buckets[i] = unsafe.Pointer(&mBucket{})
308	}
309	r := &Cache{
310		mHead:  unsafe.Pointer(h),
311		cacher: cacher,
312	}
313	return r
314}
315
316func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
317	h := (*mNode)(atomic.LoadPointer(&r.mHead))
318	i := hash & h.mask
319	b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
320	if b == nil {
321		b = h.initBucket(i)
322	}
323	return h, b
324}
325
326func (r *Cache) delete(n *Node) bool {
327	for {
328		h, b := r.getBucket(n.hash)
329		done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
330		if done {
331			return deleted
332		}
333	}
334}
335
336// Nodes returns number of 'cache node' in the map.
337func (r *Cache) Nodes() int {
338	return int(atomic.LoadInt32(&r.nodes))
339}
340
341// Size returns sums of 'cache node' size in the map.
342func (r *Cache) Size() int {
343	return int(atomic.LoadInt32(&r.size))
344}
345
346// Capacity returns cache capacity.
347func (r *Cache) Capacity() int {
348	if r.cacher == nil {
349		return 0
350	}
351	return r.cacher.Capacity()
352}
353
354// SetCapacity sets cache capacity.
355func (r *Cache) SetCapacity(capacity int) {
356	if r.cacher != nil {
357		r.cacher.SetCapacity(capacity)
358	}
359}
360
361// Get gets 'cache node' with the given namespace and key.
362// If cache node is not found and setFunc is not nil, Get will atomically creates
363// the 'cache node' by calling setFunc. Otherwise Get will returns nil.
364//
365// The returned 'cache handle' should be released after use by calling Release
366// method.
367func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
368	r.mu.RLock()
369	defer r.mu.RUnlock()
370	if r.closed {
371		return nil
372	}
373
374	hash := murmur32(ns, key, 0xf00)
375	for {
376		h, b := r.getBucket(hash)
377		done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
378		if done {
379			if n != nil {
380				n.mu.Lock()
381				if n.value == nil {
382					if setFunc == nil {
383						n.mu.Unlock()
384						n.unref()
385						return nil
386					}
387
388					n.size, n.value = setFunc()
389					if n.value == nil {
390						n.size = 0
391						n.mu.Unlock()
392						n.unref()
393						return nil
394					}
395					atomic.AddInt32(&r.size, int32(n.size))
396				}
397				n.mu.Unlock()
398				if r.cacher != nil {
399					r.cacher.Promote(n)
400				}
401				return &Handle{unsafe.Pointer(n)}
402			}
403
404			break
405		}
406	}
407	return nil
408}
409
410// Delete removes and ban 'cache node' with the given namespace and key.
411// A banned 'cache node' will never inserted into the 'cache tree'. Ban
412// only attributed to the particular 'cache node', so when a 'cache node'
413// is recreated it will not be banned.
414//
415// If onDel is not nil, then it will be executed if such 'cache node'
416// doesn't exist or once the 'cache node' is released.
417//
418// Delete return true is such 'cache node' exist.
419func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
420	r.mu.RLock()
421	defer r.mu.RUnlock()
422	if r.closed {
423		return false
424	}
425
426	hash := murmur32(ns, key, 0xf00)
427	for {
428		h, b := r.getBucket(hash)
429		done, _, n := b.get(r, h, hash, ns, key, true)
430		if done {
431			if n != nil {
432				if onDel != nil {
433					n.mu.Lock()
434					n.onDel = append(n.onDel, onDel)
435					n.mu.Unlock()
436				}
437				if r.cacher != nil {
438					r.cacher.Ban(n)
439				}
440				n.unref()
441				return true
442			}
443
444			break
445		}
446	}
447
448	if onDel != nil {
449		onDel()
450	}
451
452	return false
453}
454
455// Evict evicts 'cache node' with the given namespace and key. This will
456// simply call Cacher.Evict.
457//
458// Evict return true is such 'cache node' exist.
459func (r *Cache) Evict(ns, key uint64) bool {
460	r.mu.RLock()
461	defer r.mu.RUnlock()
462	if r.closed {
463		return false
464	}
465
466	hash := murmur32(ns, key, 0xf00)
467	for {
468		h, b := r.getBucket(hash)
469		done, _, n := b.get(r, h, hash, ns, key, true)
470		if done {
471			if n != nil {
472				if r.cacher != nil {
473					r.cacher.Evict(n)
474				}
475				n.unref()
476				return true
477			}
478
479			break
480		}
481	}
482
483	return false
484}
485
486// EvictNS evicts 'cache node' with the given namespace. This will
487// simply call Cacher.EvictNS.
488func (r *Cache) EvictNS(ns uint64) {
489	r.mu.RLock()
490	defer r.mu.RUnlock()
491	if r.closed {
492		return
493	}
494
495	if r.cacher != nil {
496		r.cacher.EvictNS(ns)
497	}
498}
499
500// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
501func (r *Cache) EvictAll() {
502	r.mu.RLock()
503	defer r.mu.RUnlock()
504	if r.closed {
505		return
506	}
507
508	if r.cacher != nil {
509		r.cacher.EvictAll()
510	}
511}
512
513// Close closes the 'cache map' and forcefully releases all 'cache node'.
514func (r *Cache) Close() error {
515	r.mu.Lock()
516	if !r.closed {
517		r.closed = true
518
519		h := (*mNode)(r.mHead)
520		h.initBuckets()
521
522		for i := range h.buckets {
523			b := (*mBucket)(h.buckets[i])
524			for _, n := range b.node {
525				// Call releaser.
526				if n.value != nil {
527					if r, ok := n.value.(util.Releaser); ok {
528						r.Release()
529					}
530					n.value = nil
531				}
532
533				// Call OnDel.
534				for _, f := range n.onDel {
535					f()
536				}
537				n.onDel = nil
538			}
539		}
540	}
541	r.mu.Unlock()
542
543	// Avoid deadlock.
544	if r.cacher != nil {
545		if err := r.cacher.Close(); err != nil {
546			return err
547		}
548	}
549	return nil
550}
551
552// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
553// unlike Close it doesn't forcefully releases 'cache node'.
554func (r *Cache) CloseWeak() error {
555	r.mu.Lock()
556	if !r.closed {
557		r.closed = true
558	}
559	r.mu.Unlock()
560
561	// Avoid deadlock.
562	if r.cacher != nil {
563		r.cacher.EvictAll()
564		if err := r.cacher.Close(); err != nil {
565			return err
566		}
567	}
568	return nil
569}
570
571// Node is a 'cache node'.
572type Node struct {
573	r *Cache
574
575	hash    uint32
576	ns, key uint64
577
578	mu    sync.Mutex
579	size  int
580	value Value
581
582	ref   int32
583	onDel []func()
584
585	CacheData unsafe.Pointer
586}
587
588// NS returns this 'cache node' namespace.
589func (n *Node) NS() uint64 {
590	return n.ns
591}
592
593// Key returns this 'cache node' key.
594func (n *Node) Key() uint64 {
595	return n.key
596}
597
598// Size returns this 'cache node' size.
599func (n *Node) Size() int {
600	return n.size
601}
602
603// Value returns this 'cache node' value.
604func (n *Node) Value() Value {
605	return n.value
606}
607
608// Ref returns this 'cache node' ref counter.
609func (n *Node) Ref() int32 {
610	return atomic.LoadInt32(&n.ref)
611}
612
613// GetHandle returns an handle for this 'cache node'.
614func (n *Node) GetHandle() *Handle {
615	if atomic.AddInt32(&n.ref, 1) <= 1 {
616		panic("BUG: Node.GetHandle on zero ref")
617	}
618	return &Handle{unsafe.Pointer(n)}
619}
620
621func (n *Node) unref() {
622	if atomic.AddInt32(&n.ref, -1) == 0 {
623		n.r.delete(n)
624	}
625}
626
627func (n *Node) unrefLocked() {
628	if atomic.AddInt32(&n.ref, -1) == 0 {
629		n.r.mu.RLock()
630		if !n.r.closed {
631			n.r.delete(n)
632		}
633		n.r.mu.RUnlock()
634	}
635}
636
637// Handle is a 'cache handle' of a 'cache node'.
638type Handle struct {
639	n unsafe.Pointer // *Node
640}
641
642// Value returns the value of the 'cache node'.
643func (h *Handle) Value() Value {
644	n := (*Node)(atomic.LoadPointer(&h.n))
645	if n != nil {
646		return n.value
647	}
648	return nil
649}
650
651// Release releases this 'cache handle'.
652// It is safe to call release multiple times.
653func (h *Handle) Release() {
654	nPtr := atomic.LoadPointer(&h.n)
655	if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
656		n := (*Node)(nPtr)
657		n.unrefLocked()
658	}
659}
660
661func murmur32(ns, key uint64, seed uint32) uint32 {
662	const (
663		m = uint32(0x5bd1e995)
664		r = 24
665	)
666
667	k1 := uint32(ns >> 32)
668	k2 := uint32(ns)
669	k3 := uint32(key >> 32)
670	k4 := uint32(key)
671
672	k1 *= m
673	k1 ^= k1 >> r
674	k1 *= m
675
676	k2 *= m
677	k2 ^= k2 >> r
678	k2 *= m
679
680	k3 *= m
681	k3 ^= k3 >> r
682	k3 *= m
683
684	k4 *= m
685	k4 ^= k4 >> r
686	k4 *= m
687
688	h := seed
689
690	h *= m
691	h ^= k1
692	h *= m
693	h ^= k2
694	h *= m
695	h ^= k3
696	h *= m
697	h ^= k4
698
699	h ^= h >> 13
700	h *= m
701	h ^= h >> 15
702
703	return h
704}
705