1// An LRU cached aimed at high concurrency 2package ccache 3 4import ( 5 "container/list" 6 "hash/fnv" 7 "sync/atomic" 8 "time" 9) 10 11type Cache struct { 12 *Configuration 13 list *list.List 14 size int64 15 buckets []*bucket 16 bucketMask uint32 17 deletables chan *Item 18 promotables chan *Item 19} 20 21// Create a new cache with the specified configuration 22// See ccache.Configure() for creating a configuration 23func New(config *Configuration) *Cache { 24 c := &Cache{ 25 list: list.New(), 26 Configuration: config, 27 bucketMask: uint32(config.buckets) - 1, 28 buckets: make([]*bucket, config.buckets), 29 deletables: make(chan *Item, config.deleteBuffer), 30 promotables: make(chan *Item, config.promoteBuffer), 31 } 32 for i := 0; i < int(config.buckets); i++ { 33 c.buckets[i] = &bucket{ 34 lookup: make(map[string]*Item), 35 } 36 } 37 go c.worker() 38 return c 39} 40 41// Get an item from the cache. Returns nil if the item wasn't found. 42// This can return an expired item. Use item.Expired() to see if the item 43// is expired and item.TTL() to see how long until the item expires (which 44// will be negative for an already expired item). 45func (c *Cache) Get(key string) *Item { 46 item := c.bucket(key).get(key) 47 if item == nil { 48 return nil 49 } 50 if item.expires > time.Now().UnixNano() { 51 c.promote(item) 52 } 53 return item 54} 55 56// Used when the cache was created with the Track() configuration option. 57// Avoid otherwise 58func (c *Cache) TrackingGet(key string) TrackedItem { 59 item := c.Get(key) 60 if item == nil { 61 return NilTracked 62 } 63 item.track() 64 return item 65} 66 67// Set the value in the cache for the specified duration 68func (c *Cache) Set(key string, value interface{}, duration time.Duration) { 69 c.set(key, value, duration) 70} 71 72// Replace the value if it exists, does not set if it doesn't. 73// Returns true if the item existed an was replaced, false otherwise. 74// Replace does not reset item's TTL 75func (c *Cache) Replace(key string, value interface{}) bool { 76 item := c.bucket(key).get(key) 77 if item == nil { 78 return false 79 } 80 c.Set(key, value, item.TTL()) 81 return true 82} 83 84// Attempts to get the value from the cache and calles fetch on a miss (missing 85// or stale item). If fetch returns an error, no value is cached and the error 86// is returned back to the caller. 87func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) { 88 item := c.Get(key) 89 if item != nil && !item.Expired() { 90 return item, nil 91 } 92 value, err := fetch() 93 if err != nil { 94 return nil, err 95 } 96 return c.set(key, value, duration), nil 97} 98 99// Remove the item from the cache, return true if the item was present, false otherwise. 100func (c *Cache) Delete(key string) bool { 101 item := c.bucket(key).delete(key) 102 if item != nil { 103 c.deletables <- item 104 return true 105 } 106 return false 107} 108 109//this isn't thread safe. It's meant to be called from non-concurrent tests 110func (c *Cache) Clear() { 111 for _, bucket := range c.buckets { 112 bucket.clear() 113 } 114 c.size = 0 115 c.list = list.New() 116} 117 118// Stops the background worker. Operations performed on the cache after Stop 119// is called are likely to panic 120func (c *Cache) Stop() { 121 close(c.promotables) 122} 123 124func (c *Cache) deleteItem(bucket *bucket, item *Item) { 125 bucket.delete(item.key) //stop other GETs from getting it 126 c.deletables <- item 127} 128 129func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item { 130 item, existing := c.bucket(key).set(key, value, duration) 131 if existing != nil { 132 c.deletables <- existing 133 } 134 c.promote(item) 135 return item 136} 137 138func (c *Cache) bucket(key string) *bucket { 139 h := fnv.New32a() 140 h.Write([]byte(key)) 141 return c.buckets[h.Sum32()&c.bucketMask] 142} 143 144func (c *Cache) promote(item *Item) { 145 c.promotables <- item 146} 147 148func (c *Cache) worker() { 149 for { 150 select { 151 case item, ok := <-c.promotables: 152 if ok == false { 153 goto drain 154 } 155 if c.doPromote(item) && c.size > c.maxSize { 156 c.gc() 157 } 158 case item := <-c.deletables: 159 c.doDelete(item) 160 } 161 } 162 163drain: 164 for { 165 select { 166 case item := <-c.deletables: 167 c.doDelete(item) 168 default: 169 close(c.deletables) 170 return 171 } 172 } 173} 174 175func (c *Cache) doDelete(item *Item) { 176 if item.element == nil { 177 item.promotions = -2 178 } else { 179 c.size -= item.size 180 c.list.Remove(item.element) 181 } 182} 183 184func (c *Cache) doPromote(item *Item) bool { 185 //already deleted 186 if item.promotions == -2 { 187 return false 188 } 189 if item.element != nil { //not a new item 190 if item.shouldPromote(c.getsPerPromote) { 191 c.list.MoveToFront(item.element) 192 item.promotions = 0 193 } 194 return false 195 } 196 197 c.size += item.size 198 item.element = c.list.PushFront(item) 199 return true 200} 201 202func (c *Cache) gc() { 203 element := c.list.Back() 204 for i := 0; i < c.itemsToPrune; i++ { 205 if element == nil { 206 return 207 } 208 prev := element.Prev() 209 item := element.Value.(*Item) 210 if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 { 211 c.bucket(item.key).delete(item.key) 212 c.size -= item.size 213 c.list.Remove(element) 214 item.promotions = -2 215 } 216 element = prev 217 } 218} 219