1/* 2Copyright 2012 Google Inc. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17// Package groupcache provides a data loading mechanism with caching 18// and de-duplication that works across a set of peer processes. 19// 20// Each data Get first consults its local cache, otherwise delegates 21// to the requested key's canonical owner, which then checks its cache 22// or finally gets the data. In the common case, many concurrent 23// cache misses across a set of peers for the same key result in just 24// one cache fill. 25package groupcache 26 27import ( 28 "errors" 29 "math/rand" 30 "strconv" 31 "sync" 32 "sync/atomic" 33 34 pb "github.com/golang/groupcache/groupcachepb" 35 "github.com/golang/groupcache/lru" 36 "github.com/golang/groupcache/singleflight" 37) 38 39// A Getter loads data for a key. 40type Getter interface { 41 // Get returns the value identified by key, populating dest. 42 // 43 // The returned data must be unversioned. That is, key must 44 // uniquely describe the loaded data, without an implicit 45 // current time, and without relying on cache expiration 46 // mechanisms. 47 Get(ctx Context, key string, dest Sink) error 48} 49 50// A GetterFunc implements Getter with a function. 51type GetterFunc func(ctx Context, key string, dest Sink) error 52 53func (f GetterFunc) Get(ctx Context, key string, dest Sink) error { 54 return f(ctx, key, dest) 55} 56 57var ( 58 mu sync.RWMutex 59 groups = make(map[string]*Group) 60 61 initPeerServerOnce sync.Once 62 initPeerServer func() 63) 64 65// GetGroup returns the named group previously created with NewGroup, or 66// nil if there's no such group. 67func GetGroup(name string) *Group { 68 mu.RLock() 69 g := groups[name] 70 mu.RUnlock() 71 return g 72} 73 74// NewGroup creates a coordinated group-aware Getter from a Getter. 75// 76// The returned Getter tries (but does not guarantee) to run only one 77// Get call at once for a given key across an entire set of peer 78// processes. Concurrent callers both in the local process and in 79// other processes receive copies of the answer once the original Get 80// completes. 81// 82// The group name must be unique for each getter. 83func NewGroup(name string, cacheBytes int64, getter Getter) *Group { 84 return newGroup(name, cacheBytes, getter, nil) 85} 86 87// If peers is nil, the peerPicker is called via a sync.Once to initialize it. 88func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group { 89 if getter == nil { 90 panic("nil Getter") 91 } 92 mu.Lock() 93 defer mu.Unlock() 94 initPeerServerOnce.Do(callInitPeerServer) 95 if _, dup := groups[name]; dup { 96 panic("duplicate registration of group " + name) 97 } 98 g := &Group{ 99 name: name, 100 getter: getter, 101 peers: peers, 102 cacheBytes: cacheBytes, 103 loadGroup: &singleflight.Group{}, 104 } 105 if fn := newGroupHook; fn != nil { 106 fn(g) 107 } 108 groups[name] = g 109 return g 110} 111 112// newGroupHook, if non-nil, is called right after a new group is created. 113var newGroupHook func(*Group) 114 115// RegisterNewGroupHook registers a hook that is run each time 116// a group is created. 117func RegisterNewGroupHook(fn func(*Group)) { 118 if newGroupHook != nil { 119 panic("RegisterNewGroupHook called more than once") 120 } 121 newGroupHook = fn 122} 123 124// RegisterServerStart registers a hook that is run when the first 125// group is created. 126func RegisterServerStart(fn func()) { 127 if initPeerServer != nil { 128 panic("RegisterServerStart called more than once") 129 } 130 initPeerServer = fn 131} 132 133func callInitPeerServer() { 134 if initPeerServer != nil { 135 initPeerServer() 136 } 137} 138 139// A Group is a cache namespace and associated data loaded spread over 140// a group of 1 or more machines. 141type Group struct { 142 name string 143 getter Getter 144 peersOnce sync.Once 145 peers PeerPicker 146 cacheBytes int64 // limit for sum of mainCache and hotCache size 147 148 // mainCache is a cache of the keys for which this process 149 // (amongst its peers) is authoritative. That is, this cache 150 // contains keys which consistent hash on to this process's 151 // peer number. 152 mainCache cache 153 154 // hotCache contains keys/values for which this peer is not 155 // authoritative (otherwise they would be in mainCache), but 156 // are popular enough to warrant mirroring in this process to 157 // avoid going over the network to fetch from a peer. Having 158 // a hotCache avoids network hotspotting, where a peer's 159 // network card could become the bottleneck on a popular key. 160 // This cache is used sparingly to maximize the total number 161 // of key/value pairs that can be stored globally. 162 hotCache cache 163 164 // loadGroup ensures that each key is only fetched once 165 // (either locally or remotely), regardless of the number of 166 // concurrent callers. 167 loadGroup flightGroup 168 169 _ int32 // force Stats to be 8-byte aligned on 32-bit platforms 170 171 // Stats are statistics on the group. 172 Stats Stats 173} 174 175// flightGroup is defined as an interface which flightgroup.Group 176// satisfies. We define this so that we may test with an alternate 177// implementation. 178type flightGroup interface { 179 // Done is called when Do is done. 180 Do(key string, fn func() (interface{}, error)) (interface{}, error) 181} 182 183// Stats are per-group statistics. 184type Stats struct { 185 Gets AtomicInt // any Get request, including from peers 186 CacheHits AtomicInt // either cache was good 187 PeerLoads AtomicInt // either remote load or remote cache hit (not an error) 188 PeerErrors AtomicInt 189 Loads AtomicInt // (gets - cacheHits) 190 LoadsDeduped AtomicInt // after singleflight 191 LocalLoads AtomicInt // total good local loads 192 LocalLoadErrs AtomicInt // total bad local loads 193 ServerRequests AtomicInt // gets that came over the network from peers 194} 195 196// Name returns the name of the group. 197func (g *Group) Name() string { 198 return g.name 199} 200 201func (g *Group) initPeers() { 202 if g.peers == nil { 203 g.peers = getPeers(g.name) 204 } 205} 206 207func (g *Group) Get(ctx Context, key string, dest Sink) error { 208 g.peersOnce.Do(g.initPeers) 209 g.Stats.Gets.Add(1) 210 if dest == nil { 211 return errors.New("groupcache: nil dest Sink") 212 } 213 value, cacheHit := g.lookupCache(key) 214 215 if cacheHit { 216 g.Stats.CacheHits.Add(1) 217 return setSinkView(dest, value) 218 } 219 220 // Optimization to avoid double unmarshalling or copying: keep 221 // track of whether the dest was already populated. One caller 222 // (if local) will set this; the losers will not. The common 223 // case will likely be one caller. 224 destPopulated := false 225 value, destPopulated, err := g.load(ctx, key, dest) 226 if err != nil { 227 return err 228 } 229 if destPopulated { 230 return nil 231 } 232 return setSinkView(dest, value) 233} 234 235// load loads key either by invoking the getter locally or by sending it to another machine. 236func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { 237 g.Stats.Loads.Add(1) 238 viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { 239 // Check the cache again because singleflight can only dedup calls 240 // that overlap concurrently. It's possible for 2 concurrent 241 // requests to miss the cache, resulting in 2 load() calls. An 242 // unfortunate goroutine scheduling would result in this callback 243 // being run twice, serially. If we don't check the cache again, 244 // cache.nbytes would be incremented below even though there will 245 // be only one entry for this key. 246 // 247 // Consider the following serialized event ordering for two 248 // goroutines in which this callback gets called twice for the 249 // same key: 250 // 1: Get("key") 251 // 2: Get("key") 252 // 1: lookupCache("key") 253 // 2: lookupCache("key") 254 // 1: load("key") 255 // 2: load("key") 256 // 1: loadGroup.Do("key", fn) 257 // 1: fn() 258 // 2: loadGroup.Do("key", fn) 259 // 2: fn() 260 if value, cacheHit := g.lookupCache(key); cacheHit { 261 g.Stats.CacheHits.Add(1) 262 return value, nil 263 } 264 g.Stats.LoadsDeduped.Add(1) 265 var value ByteView 266 var err error 267 if peer, ok := g.peers.PickPeer(key); ok { 268 value, err = g.getFromPeer(ctx, peer, key) 269 if err == nil { 270 g.Stats.PeerLoads.Add(1) 271 return value, nil 272 } 273 g.Stats.PeerErrors.Add(1) 274 // TODO(bradfitz): log the peer's error? keep 275 // log of the past few for /groupcachez? It's 276 // probably boring (normal task movement), so not 277 // worth logging I imagine. 278 } 279 value, err = g.getLocally(ctx, key, dest) 280 if err != nil { 281 g.Stats.LocalLoadErrs.Add(1) 282 return nil, err 283 } 284 g.Stats.LocalLoads.Add(1) 285 destPopulated = true // only one caller of load gets this return value 286 g.populateCache(key, value, &g.mainCache) 287 return value, nil 288 }) 289 if err == nil { 290 value = viewi.(ByteView) 291 } 292 return 293} 294 295func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) { 296 err := g.getter.Get(ctx, key, dest) 297 if err != nil { 298 return ByteView{}, err 299 } 300 return dest.view() 301} 302 303func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) { 304 req := &pb.GetRequest{ 305 Group: &g.name, 306 Key: &key, 307 } 308 res := &pb.GetResponse{} 309 err := peer.Get(ctx, req, res) 310 if err != nil { 311 return ByteView{}, err 312 } 313 value := ByteView{b: res.Value} 314 // TODO(bradfitz): use res.MinuteQps or something smart to 315 // conditionally populate hotCache. For now just do it some 316 // percentage of the time. 317 if rand.Intn(10) == 0 { 318 g.populateCache(key, value, &g.hotCache) 319 } 320 return value, nil 321} 322 323func (g *Group) lookupCache(key string) (value ByteView, ok bool) { 324 if g.cacheBytes <= 0 { 325 return 326 } 327 value, ok = g.mainCache.get(key) 328 if ok { 329 return 330 } 331 value, ok = g.hotCache.get(key) 332 return 333} 334 335func (g *Group) populateCache(key string, value ByteView, cache *cache) { 336 if g.cacheBytes <= 0 { 337 return 338 } 339 cache.add(key, value) 340 341 // Evict items from cache(s) if necessary. 342 for { 343 mainBytes := g.mainCache.bytes() 344 hotBytes := g.hotCache.bytes() 345 if mainBytes+hotBytes <= g.cacheBytes { 346 return 347 } 348 349 // TODO(bradfitz): this is good-enough-for-now logic. 350 // It should be something based on measurements and/or 351 // respecting the costs of different resources. 352 victim := &g.mainCache 353 if hotBytes > mainBytes/8 { 354 victim = &g.hotCache 355 } 356 victim.removeOldest() 357 } 358} 359 360// CacheType represents a type of cache. 361type CacheType int 362 363const ( 364 // The MainCache is the cache for items that this peer is the 365 // owner for. 366 MainCache CacheType = iota + 1 367 368 // The HotCache is the cache for items that seem popular 369 // enough to replicate to this node, even though it's not the 370 // owner. 371 HotCache 372) 373 374// CacheStats returns stats about the provided cache within the group. 375func (g *Group) CacheStats(which CacheType) CacheStats { 376 switch which { 377 case MainCache: 378 return g.mainCache.stats() 379 case HotCache: 380 return g.hotCache.stats() 381 default: 382 return CacheStats{} 383 } 384} 385 386// cache is a wrapper around an *lru.Cache that adds synchronization, 387// makes values always be ByteView, and counts the size of all keys and 388// values. 389type cache struct { 390 mu sync.RWMutex 391 nbytes int64 // of all keys and values 392 lru *lru.Cache 393 nhit, nget int64 394 nevict int64 // number of evictions 395} 396 397func (c *cache) stats() CacheStats { 398 c.mu.RLock() 399 defer c.mu.RUnlock() 400 return CacheStats{ 401 Bytes: c.nbytes, 402 Items: c.itemsLocked(), 403 Gets: c.nget, 404 Hits: c.nhit, 405 Evictions: c.nevict, 406 } 407} 408 409func (c *cache) add(key string, value ByteView) { 410 c.mu.Lock() 411 defer c.mu.Unlock() 412 if c.lru == nil { 413 c.lru = &lru.Cache{ 414 OnEvicted: func(key lru.Key, value interface{}) { 415 val := value.(ByteView) 416 c.nbytes -= int64(len(key.(string))) + int64(val.Len()) 417 c.nevict++ 418 }, 419 } 420 } 421 c.lru.Add(key, value) 422 c.nbytes += int64(len(key)) + int64(value.Len()) 423} 424 425func (c *cache) get(key string) (value ByteView, ok bool) { 426 c.mu.Lock() 427 defer c.mu.Unlock() 428 c.nget++ 429 if c.lru == nil { 430 return 431 } 432 vi, ok := c.lru.Get(key) 433 if !ok { 434 return 435 } 436 c.nhit++ 437 return vi.(ByteView), true 438} 439 440func (c *cache) removeOldest() { 441 c.mu.Lock() 442 defer c.mu.Unlock() 443 if c.lru != nil { 444 c.lru.RemoveOldest() 445 } 446} 447 448func (c *cache) bytes() int64 { 449 c.mu.RLock() 450 defer c.mu.RUnlock() 451 return c.nbytes 452} 453 454func (c *cache) items() int64 { 455 c.mu.RLock() 456 defer c.mu.RUnlock() 457 return c.itemsLocked() 458} 459 460func (c *cache) itemsLocked() int64 { 461 if c.lru == nil { 462 return 0 463 } 464 return int64(c.lru.Len()) 465} 466 467// An AtomicInt is an int64 to be accessed atomically. 468type AtomicInt int64 469 470// Add atomically adds n to i. 471func (i *AtomicInt) Add(n int64) { 472 atomic.AddInt64((*int64)(i), n) 473} 474 475// Get atomically gets the value of i. 476func (i *AtomicInt) Get() int64 { 477 return atomic.LoadInt64((*int64)(i)) 478} 479 480func (i *AtomicInt) String() string { 481 return strconv.FormatInt(i.Get(), 10) 482} 483 484// CacheStats are returned by stats accessors on Group. 485type CacheStats struct { 486 Bytes int64 487 Items int64 488 Gets int64 489 Hits int64 490 Evictions int64 491} 492