1/* 2Copyright 2012 Google Inc. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17// Package groupcache provides a data loading mechanism with caching 18// and de-duplication that works across a set of peer processes. 19// 20// Each data Get first consults its local cache, otherwise delegates 21// to the requested key's canonical owner, which then checks its cache 22// or finally gets the data. In the common case, many concurrent 23// cache misses across a set of peers for the same key result in just 24// one cache fill. 25package groupcache 26 27import ( 28 "context" 29 "errors" 30 "math/rand" 31 "strconv" 32 "sync" 33 "sync/atomic" 34 35 pb "github.com/golang/groupcache/groupcachepb" 36 "github.com/golang/groupcache/lru" 37 "github.com/golang/groupcache/singleflight" 38) 39 40// A Getter loads data for a key. 41type Getter interface { 42 // Get returns the value identified by key, populating dest. 43 // 44 // The returned data must be unversioned. That is, key must 45 // uniquely describe the loaded data, without an implicit 46 // current time, and without relying on cache expiration 47 // mechanisms. 48 Get(ctx context.Context, key string, dest Sink) error 49} 50 51// A GetterFunc implements Getter with a function. 52type GetterFunc func(ctx context.Context, key string, dest Sink) error 53 54func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error { 55 return f(ctx, key, dest) 56} 57 58var ( 59 mu sync.RWMutex 60 groups = make(map[string]*Group) 61 62 initPeerServerOnce sync.Once 63 initPeerServer func() 64) 65 66// GetGroup returns the named group previously created with NewGroup, or 67// nil if there's no such group. 68func GetGroup(name string) *Group { 69 mu.RLock() 70 g := groups[name] 71 mu.RUnlock() 72 return g 73} 74 75// NewGroup creates a coordinated group-aware Getter from a Getter. 76// 77// The returned Getter tries (but does not guarantee) to run only one 78// Get call at once for a given key across an entire set of peer 79// processes. Concurrent callers both in the local process and in 80// other processes receive copies of the answer once the original Get 81// completes. 82// 83// The group name must be unique for each getter. 84func NewGroup(name string, cacheBytes int64, getter Getter) *Group { 85 return newGroup(name, cacheBytes, getter, nil) 86} 87 88// If peers is nil, the peerPicker is called via a sync.Once to initialize it. 89func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group { 90 if getter == nil { 91 panic("nil Getter") 92 } 93 mu.Lock() 94 defer mu.Unlock() 95 initPeerServerOnce.Do(callInitPeerServer) 96 if _, dup := groups[name]; dup { 97 panic("duplicate registration of group " + name) 98 } 99 g := &Group{ 100 name: name, 101 getter: getter, 102 peers: peers, 103 cacheBytes: cacheBytes, 104 loadGroup: &singleflight.Group{}, 105 } 106 if fn := newGroupHook; fn != nil { 107 fn(g) 108 } 109 groups[name] = g 110 return g 111} 112 113// newGroupHook, if non-nil, is called right after a new group is created. 114var newGroupHook func(*Group) 115 116// RegisterNewGroupHook registers a hook that is run each time 117// a group is created. 118func RegisterNewGroupHook(fn func(*Group)) { 119 if newGroupHook != nil { 120 panic("RegisterNewGroupHook called more than once") 121 } 122 newGroupHook = fn 123} 124 125// RegisterServerStart registers a hook that is run when the first 126// group is created. 127func RegisterServerStart(fn func()) { 128 if initPeerServer != nil { 129 panic("RegisterServerStart called more than once") 130 } 131 initPeerServer = fn 132} 133 134func callInitPeerServer() { 135 if initPeerServer != nil { 136 initPeerServer() 137 } 138} 139 140// A Group is a cache namespace and associated data loaded spread over 141// a group of 1 or more machines. 142type Group struct { 143 name string 144 getter Getter 145 peersOnce sync.Once 146 peers PeerPicker 147 cacheBytes int64 // limit for sum of mainCache and hotCache size 148 149 // mainCache is a cache of the keys for which this process 150 // (amongst its peers) is authoritative. That is, this cache 151 // contains keys which consistent hash on to this process's 152 // peer number. 153 mainCache cache 154 155 // hotCache contains keys/values for which this peer is not 156 // authoritative (otherwise they would be in mainCache), but 157 // are popular enough to warrant mirroring in this process to 158 // avoid going over the network to fetch from a peer. Having 159 // a hotCache avoids network hotspotting, where a peer's 160 // network card could become the bottleneck on a popular key. 161 // This cache is used sparingly to maximize the total number 162 // of key/value pairs that can be stored globally. 163 hotCache cache 164 165 // loadGroup ensures that each key is only fetched once 166 // (either locally or remotely), regardless of the number of 167 // concurrent callers. 168 loadGroup flightGroup 169 170 _ int32 // force Stats to be 8-byte aligned on 32-bit platforms 171 172 // Stats are statistics on the group. 173 Stats Stats 174} 175 176// flightGroup is defined as an interface which flightgroup.Group 177// satisfies. We define this so that we may test with an alternate 178// implementation. 179type flightGroup interface { 180 // Done is called when Do is done. 181 Do(key string, fn func() (interface{}, error)) (interface{}, error) 182} 183 184// Stats are per-group statistics. 185type Stats struct { 186 Gets AtomicInt // any Get request, including from peers 187 CacheHits AtomicInt // either cache was good 188 PeerLoads AtomicInt // either remote load or remote cache hit (not an error) 189 PeerErrors AtomicInt 190 Loads AtomicInt // (gets - cacheHits) 191 LoadsDeduped AtomicInt // after singleflight 192 LocalLoads AtomicInt // total good local loads 193 LocalLoadErrs AtomicInt // total bad local loads 194 ServerRequests AtomicInt // gets that came over the network from peers 195} 196 197// Name returns the name of the group. 198func (g *Group) Name() string { 199 return g.name 200} 201 202func (g *Group) initPeers() { 203 if g.peers == nil { 204 g.peers = getPeers(g.name) 205 } 206} 207 208func (g *Group) Get(ctx context.Context, key string, dest Sink) error { 209 g.peersOnce.Do(g.initPeers) 210 g.Stats.Gets.Add(1) 211 if dest == nil { 212 return errors.New("groupcache: nil dest Sink") 213 } 214 value, cacheHit := g.lookupCache(key) 215 216 if cacheHit { 217 g.Stats.CacheHits.Add(1) 218 return setSinkView(dest, value) 219 } 220 221 // Optimization to avoid double unmarshalling or copying: keep 222 // track of whether the dest was already populated. One caller 223 // (if local) will set this; the losers will not. The common 224 // case will likely be one caller. 225 destPopulated := false 226 value, destPopulated, err := g.load(ctx, key, dest) 227 if err != nil { 228 return err 229 } 230 if destPopulated { 231 return nil 232 } 233 return setSinkView(dest, value) 234} 235 236// load loads key either by invoking the getter locally or by sending it to another machine. 237func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { 238 g.Stats.Loads.Add(1) 239 viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { 240 // Check the cache again because singleflight can only dedup calls 241 // that overlap concurrently. It's possible for 2 concurrent 242 // requests to miss the cache, resulting in 2 load() calls. An 243 // unfortunate goroutine scheduling would result in this callback 244 // being run twice, serially. If we don't check the cache again, 245 // cache.nbytes would be incremented below even though there will 246 // be only one entry for this key. 247 // 248 // Consider the following serialized event ordering for two 249 // goroutines in which this callback gets called twice for the 250 // same key: 251 // 1: Get("key") 252 // 2: Get("key") 253 // 1: lookupCache("key") 254 // 2: lookupCache("key") 255 // 1: load("key") 256 // 2: load("key") 257 // 1: loadGroup.Do("key", fn) 258 // 1: fn() 259 // 2: loadGroup.Do("key", fn) 260 // 2: fn() 261 if value, cacheHit := g.lookupCache(key); cacheHit { 262 g.Stats.CacheHits.Add(1) 263 return value, nil 264 } 265 g.Stats.LoadsDeduped.Add(1) 266 var value ByteView 267 var err error 268 if peer, ok := g.peers.PickPeer(key); ok { 269 value, err = g.getFromPeer(ctx, peer, key) 270 if err == nil { 271 g.Stats.PeerLoads.Add(1) 272 return value, nil 273 } 274 g.Stats.PeerErrors.Add(1) 275 // TODO(bradfitz): log the peer's error? keep 276 // log of the past few for /groupcachez? It's 277 // probably boring (normal task movement), so not 278 // worth logging I imagine. 279 } 280 value, err = g.getLocally(ctx, key, dest) 281 if err != nil { 282 g.Stats.LocalLoadErrs.Add(1) 283 return nil, err 284 } 285 g.Stats.LocalLoads.Add(1) 286 destPopulated = true // only one caller of load gets this return value 287 g.populateCache(key, value, &g.mainCache) 288 return value, nil 289 }) 290 if err == nil { 291 value = viewi.(ByteView) 292 } 293 return 294} 295 296func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) { 297 err := g.getter.Get(ctx, key, dest) 298 if err != nil { 299 return ByteView{}, err 300 } 301 return dest.view() 302} 303 304func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) { 305 req := &pb.GetRequest{ 306 Group: &g.name, 307 Key: &key, 308 } 309 res := &pb.GetResponse{} 310 err := peer.Get(ctx, req, res) 311 if err != nil { 312 return ByteView{}, err 313 } 314 value := ByteView{b: res.Value} 315 // TODO(bradfitz): use res.MinuteQps or something smart to 316 // conditionally populate hotCache. For now just do it some 317 // percentage of the time. 318 if rand.Intn(10) == 0 { 319 g.populateCache(key, value, &g.hotCache) 320 } 321 return value, nil 322} 323 324func (g *Group) lookupCache(key string) (value ByteView, ok bool) { 325 if g.cacheBytes <= 0 { 326 return 327 } 328 value, ok = g.mainCache.get(key) 329 if ok { 330 return 331 } 332 value, ok = g.hotCache.get(key) 333 return 334} 335 336func (g *Group) populateCache(key string, value ByteView, cache *cache) { 337 if g.cacheBytes <= 0 { 338 return 339 } 340 cache.add(key, value) 341 342 // Evict items from cache(s) if necessary. 343 for { 344 mainBytes := g.mainCache.bytes() 345 hotBytes := g.hotCache.bytes() 346 if mainBytes+hotBytes <= g.cacheBytes { 347 return 348 } 349 350 // TODO(bradfitz): this is good-enough-for-now logic. 351 // It should be something based on measurements and/or 352 // respecting the costs of different resources. 353 victim := &g.mainCache 354 if hotBytes > mainBytes/8 { 355 victim = &g.hotCache 356 } 357 victim.removeOldest() 358 } 359} 360 361// CacheType represents a type of cache. 362type CacheType int 363 364const ( 365 // The MainCache is the cache for items that this peer is the 366 // owner for. 367 MainCache CacheType = iota + 1 368 369 // The HotCache is the cache for items that seem popular 370 // enough to replicate to this node, even though it's not the 371 // owner. 372 HotCache 373) 374 375// CacheStats returns stats about the provided cache within the group. 376func (g *Group) CacheStats(which CacheType) CacheStats { 377 switch which { 378 case MainCache: 379 return g.mainCache.stats() 380 case HotCache: 381 return g.hotCache.stats() 382 default: 383 return CacheStats{} 384 } 385} 386 387// cache is a wrapper around an *lru.Cache that adds synchronization, 388// makes values always be ByteView, and counts the size of all keys and 389// values. 390type cache struct { 391 mu sync.RWMutex 392 nbytes int64 // of all keys and values 393 lru *lru.Cache 394 nhit, nget int64 395 nevict int64 // number of evictions 396} 397 398func (c *cache) stats() CacheStats { 399 c.mu.RLock() 400 defer c.mu.RUnlock() 401 return CacheStats{ 402 Bytes: c.nbytes, 403 Items: c.itemsLocked(), 404 Gets: c.nget, 405 Hits: c.nhit, 406 Evictions: c.nevict, 407 } 408} 409 410func (c *cache) add(key string, value ByteView) { 411 c.mu.Lock() 412 defer c.mu.Unlock() 413 if c.lru == nil { 414 c.lru = &lru.Cache{ 415 OnEvicted: func(key lru.Key, value interface{}) { 416 val := value.(ByteView) 417 c.nbytes -= int64(len(key.(string))) + int64(val.Len()) 418 c.nevict++ 419 }, 420 } 421 } 422 c.lru.Add(key, value) 423 c.nbytes += int64(len(key)) + int64(value.Len()) 424} 425 426func (c *cache) get(key string) (value ByteView, ok bool) { 427 c.mu.Lock() 428 defer c.mu.Unlock() 429 c.nget++ 430 if c.lru == nil { 431 return 432 } 433 vi, ok := c.lru.Get(key) 434 if !ok { 435 return 436 } 437 c.nhit++ 438 return vi.(ByteView), true 439} 440 441func (c *cache) removeOldest() { 442 c.mu.Lock() 443 defer c.mu.Unlock() 444 if c.lru != nil { 445 c.lru.RemoveOldest() 446 } 447} 448 449func (c *cache) bytes() int64 { 450 c.mu.RLock() 451 defer c.mu.RUnlock() 452 return c.nbytes 453} 454 455func (c *cache) items() int64 { 456 c.mu.RLock() 457 defer c.mu.RUnlock() 458 return c.itemsLocked() 459} 460 461func (c *cache) itemsLocked() int64 { 462 if c.lru == nil { 463 return 0 464 } 465 return int64(c.lru.Len()) 466} 467 468// An AtomicInt is an int64 to be accessed atomically. 469type AtomicInt int64 470 471// Add atomically adds n to i. 472func (i *AtomicInt) Add(n int64) { 473 atomic.AddInt64((*int64)(i), n) 474} 475 476// Get atomically gets the value of i. 477func (i *AtomicInt) Get() int64 { 478 return atomic.LoadInt64((*int64)(i)) 479} 480 481func (i *AtomicInt) String() string { 482 return strconv.FormatInt(i.Get(), 10) 483} 484 485// CacheStats are returned by stats accessors on Group. 486type CacheStats struct { 487 Bytes int64 488 Items int64 489 Gets int64 490 Hits int64 491 Evictions int64 492} 493