1// An LRU cached aimed at high concurrency
2package ccache
3
4import (
5	"container/list"
6	"hash/fnv"
7	"sync/atomic"
8	"time"
9)
10
11type Cache struct {
12	*Configuration
13	list        *list.List
14	size        int64
15	buckets     []*bucket
16	bucketMask  uint32
17	deletables  chan *Item
18	promotables chan *Item
19}
20
21// Create a new cache with the specified configuration
22// See ccache.Configure() for creating a configuration
23func New(config *Configuration) *Cache {
24	c := &Cache{
25		list:          list.New(),
26		Configuration: config,
27		bucketMask:    uint32(config.buckets) - 1,
28		buckets:       make([]*bucket, config.buckets),
29		deletables:    make(chan *Item, config.deleteBuffer),
30		promotables:   make(chan *Item, config.promoteBuffer),
31	}
32	for i := 0; i < int(config.buckets); i++ {
33		c.buckets[i] = &bucket{
34			lookup: make(map[string]*Item),
35		}
36	}
37	go c.worker()
38	return c
39}
40
41// Get an item from the cache. Returns nil if the item wasn't found.
42// This can return an expired item. Use item.Expired() to see if the item
43// is expired and item.TTL() to see how long until the item expires (which
44// will be negative for an already expired item).
45func (c *Cache) Get(key string) *Item {
46	item := c.bucket(key).get(key)
47	if item == nil {
48		return nil
49	}
50	if item.expires > time.Now().UnixNano() {
51		c.promote(item)
52	}
53	return item
54}
55
56// Used when the cache was created with the Track() configuration option.
57// Avoid otherwise
58func (c *Cache) TrackingGet(key string) TrackedItem {
59	item := c.Get(key)
60	if item == nil {
61		return NilTracked
62	}
63	item.track()
64	return item
65}
66
67// Set the value in the cache for the specified duration
68func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
69	c.set(key, value, duration)
70}
71
72// Replace the value if it exists, does not set if it doesn't.
73// Returns true if the item existed an was replaced, false otherwise.
74// Replace does not reset item's TTL
75func (c *Cache) Replace(key string, value interface{}) bool {
76	item := c.bucket(key).get(key)
77	if item == nil {
78		return false
79	}
80	c.Set(key, value, item.TTL())
81	return true
82}
83
84// Attempts to get the value from the cache and calles fetch on a miss (missing
85// or stale item). If fetch returns an error, no value is cached and the error
86// is returned back to the caller.
87func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
88	item := c.Get(key)
89	if item != nil && !item.Expired() {
90		return item, nil
91	}
92	value, err := fetch()
93	if err != nil {
94		return nil, err
95	}
96	return c.set(key, value, duration), nil
97}
98
99// Remove the item from the cache, return true if the item was present, false otherwise.
100func (c *Cache) Delete(key string) bool {
101	item := c.bucket(key).delete(key)
102	if item != nil {
103		c.deletables <- item
104		return true
105	}
106	return false
107}
108
109//this isn't thread safe. It's meant to be called from non-concurrent tests
110func (c *Cache) Clear() {
111	for _, bucket := range c.buckets {
112		bucket.clear()
113	}
114	c.size = 0
115	c.list = list.New()
116}
117
118// Stops the background worker. Operations performed on the cache after Stop
119// is called are likely to panic
120func (c *Cache) Stop() {
121	close(c.promotables)
122}
123
124func (c *Cache) deleteItem(bucket *bucket, item *Item) {
125	bucket.delete(item.key) //stop other GETs from getting it
126	c.deletables <- item
127}
128
129func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item {
130	item, existing := c.bucket(key).set(key, value, duration)
131	if existing != nil {
132		c.deletables <- existing
133	}
134	c.promote(item)
135	return item
136}
137
138func (c *Cache) bucket(key string) *bucket {
139	h := fnv.New32a()
140	h.Write([]byte(key))
141	return c.buckets[h.Sum32()&c.bucketMask]
142}
143
144func (c *Cache) promote(item *Item) {
145	c.promotables <- item
146}
147
148func (c *Cache) worker() {
149	for {
150		select {
151		case item, ok := <-c.promotables:
152			if ok == false {
153				goto drain
154			}
155			if c.doPromote(item) && c.size > c.maxSize {
156				c.gc()
157			}
158		case item := <-c.deletables:
159			c.doDelete(item)
160		}
161	}
162
163drain:
164	for {
165		select {
166		case item := <-c.deletables:
167			c.doDelete(item)
168		default:
169			close(c.deletables)
170			return
171		}
172	}
173}
174
175func (c *Cache) doDelete(item *Item) {
176	if item.element == nil {
177		item.promotions = -2
178	} else {
179		c.size -= item.size
180		c.list.Remove(item.element)
181	}
182}
183
184func (c *Cache) doPromote(item *Item) bool {
185	//already deleted
186	if item.promotions == -2 {
187		return false
188	}
189	if item.element != nil { //not a new item
190		if item.shouldPromote(c.getsPerPromote) {
191			c.list.MoveToFront(item.element)
192			item.promotions = 0
193		}
194		return false
195	}
196
197	c.size += item.size
198	item.element = c.list.PushFront(item)
199	return true
200}
201
202func (c *Cache) gc() {
203	element := c.list.Back()
204	for i := 0; i < c.itemsToPrune; i++ {
205		if element == nil {
206			return
207		}
208		prev := element.Prev()
209		item := element.Value.(*Item)
210		if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
211			c.bucket(item.key).delete(item.key)
212			c.size -= item.size
213			c.list.Remove(element)
214			item.promotions = -2
215		}
216		element = prev
217	}
218}
219