1/*
2Copyright 2012 Google Inc.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// Package groupcache provides a data loading mechanism with caching
18// and de-duplication that works across a set of peer processes.
19//
20// Each data Get first consults its local cache, otherwise delegates
21// to the requested key's canonical owner, which then checks its cache
22// or finally gets the data.  In the common case, many concurrent
23// cache misses across a set of peers for the same key result in just
24// one cache fill.
25package groupcache
26
27import (
28	"errors"
29	"math/rand"
30	"strconv"
31	"sync"
32	"sync/atomic"
33
34	pb "github.com/golang/groupcache/groupcachepb"
35	"github.com/golang/groupcache/lru"
36	"github.com/golang/groupcache/singleflight"
37)
38
39// A Getter loads data for a key.
40type Getter interface {
41	// Get returns the value identified by key, populating dest.
42	//
43	// The returned data must be unversioned. That is, key must
44	// uniquely describe the loaded data, without an implicit
45	// current time, and without relying on cache expiration
46	// mechanisms.
47	Get(ctx Context, key string, dest Sink) error
48}
49
50// A GetterFunc implements Getter with a function.
51type GetterFunc func(ctx Context, key string, dest Sink) error
52
53func (f GetterFunc) Get(ctx Context, key string, dest Sink) error {
54	return f(ctx, key, dest)
55}
56
57var (
58	mu     sync.RWMutex
59	groups = make(map[string]*Group)
60
61	initPeerServerOnce sync.Once
62	initPeerServer     func()
63)
64
65// GetGroup returns the named group previously created with NewGroup, or
66// nil if there's no such group.
67func GetGroup(name string) *Group {
68	mu.RLock()
69	g := groups[name]
70	mu.RUnlock()
71	return g
72}
73
74// NewGroup creates a coordinated group-aware Getter from a Getter.
75//
76// The returned Getter tries (but does not guarantee) to run only one
77// Get call at once for a given key across an entire set of peer
78// processes. Concurrent callers both in the local process and in
79// other processes receive copies of the answer once the original Get
80// completes.
81//
82// The group name must be unique for each getter.
83func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
84	return newGroup(name, cacheBytes, getter, nil)
85}
86
87// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
88func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
89	if getter == nil {
90		panic("nil Getter")
91	}
92	mu.Lock()
93	defer mu.Unlock()
94	initPeerServerOnce.Do(callInitPeerServer)
95	if _, dup := groups[name]; dup {
96		panic("duplicate registration of group " + name)
97	}
98	g := &Group{
99		name:       name,
100		getter:     getter,
101		peers:      peers,
102		cacheBytes: cacheBytes,
103		loadGroup:  &singleflight.Group{},
104	}
105	if fn := newGroupHook; fn != nil {
106		fn(g)
107	}
108	groups[name] = g
109	return g
110}
111
112// newGroupHook, if non-nil, is called right after a new group is created.
113var newGroupHook func(*Group)
114
115// RegisterNewGroupHook registers a hook that is run each time
116// a group is created.
117func RegisterNewGroupHook(fn func(*Group)) {
118	if newGroupHook != nil {
119		panic("RegisterNewGroupHook called more than once")
120	}
121	newGroupHook = fn
122}
123
124// RegisterServerStart registers a hook that is run when the first
125// group is created.
126func RegisterServerStart(fn func()) {
127	if initPeerServer != nil {
128		panic("RegisterServerStart called more than once")
129	}
130	initPeerServer = fn
131}
132
133func callInitPeerServer() {
134	if initPeerServer != nil {
135		initPeerServer()
136	}
137}
138
139// A Group is a cache namespace and associated data loaded spread over
140// a group of 1 or more machines.
141type Group struct {
142	name       string
143	getter     Getter
144	peersOnce  sync.Once
145	peers      PeerPicker
146	cacheBytes int64 // limit for sum of mainCache and hotCache size
147
148	// mainCache is a cache of the keys for which this process
149	// (amongst its peers) is authoritative. That is, this cache
150	// contains keys which consistent hash on to this process's
151	// peer number.
152	mainCache cache
153
154	// hotCache contains keys/values for which this peer is not
155	// authoritative (otherwise they would be in mainCache), but
156	// are popular enough to warrant mirroring in this process to
157	// avoid going over the network to fetch from a peer.  Having
158	// a hotCache avoids network hotspotting, where a peer's
159	// network card could become the bottleneck on a popular key.
160	// This cache is used sparingly to maximize the total number
161	// of key/value pairs that can be stored globally.
162	hotCache cache
163
164	// loadGroup ensures that each key is only fetched once
165	// (either locally or remotely), regardless of the number of
166	// concurrent callers.
167	loadGroup flightGroup
168
169	_ int32 // force Stats to be 8-byte aligned on 32-bit platforms
170
171	// Stats are statistics on the group.
172	Stats Stats
173}
174
175// flightGroup is defined as an interface which flightgroup.Group
176// satisfies.  We define this so that we may test with an alternate
177// implementation.
178type flightGroup interface {
179	// Done is called when Do is done.
180	Do(key string, fn func() (interface{}, error)) (interface{}, error)
181}
182
183// Stats are per-group statistics.
184type Stats struct {
185	Gets           AtomicInt // any Get request, including from peers
186	CacheHits      AtomicInt // either cache was good
187	PeerLoads      AtomicInt // either remote load or remote cache hit (not an error)
188	PeerErrors     AtomicInt
189	Loads          AtomicInt // (gets - cacheHits)
190	LoadsDeduped   AtomicInt // after singleflight
191	LocalLoads     AtomicInt // total good local loads
192	LocalLoadErrs  AtomicInt // total bad local loads
193	ServerRequests AtomicInt // gets that came over the network from peers
194}
195
196// Name returns the name of the group.
197func (g *Group) Name() string {
198	return g.name
199}
200
201func (g *Group) initPeers() {
202	if g.peers == nil {
203		g.peers = getPeers(g.name)
204	}
205}
206
207func (g *Group) Get(ctx Context, key string, dest Sink) error {
208	g.peersOnce.Do(g.initPeers)
209	g.Stats.Gets.Add(1)
210	if dest == nil {
211		return errors.New("groupcache: nil dest Sink")
212	}
213	value, cacheHit := g.lookupCache(key)
214
215	if cacheHit {
216		g.Stats.CacheHits.Add(1)
217		return setSinkView(dest, value)
218	}
219
220	// Optimization to avoid double unmarshalling or copying: keep
221	// track of whether the dest was already populated. One caller
222	// (if local) will set this; the losers will not. The common
223	// case will likely be one caller.
224	destPopulated := false
225	value, destPopulated, err := g.load(ctx, key, dest)
226	if err != nil {
227		return err
228	}
229	if destPopulated {
230		return nil
231	}
232	return setSinkView(dest, value)
233}
234
235// load loads key either by invoking the getter locally or by sending it to another machine.
236func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
237	g.Stats.Loads.Add(1)
238	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
239		// Check the cache again because singleflight can only dedup calls
240		// that overlap concurrently.  It's possible for 2 concurrent
241		// requests to miss the cache, resulting in 2 load() calls.  An
242		// unfortunate goroutine scheduling would result in this callback
243		// being run twice, serially.  If we don't check the cache again,
244		// cache.nbytes would be incremented below even though there will
245		// be only one entry for this key.
246		//
247		// Consider the following serialized event ordering for two
248		// goroutines in which this callback gets called twice for the
249		// same key:
250		// 1: Get("key")
251		// 2: Get("key")
252		// 1: lookupCache("key")
253		// 2: lookupCache("key")
254		// 1: load("key")
255		// 2: load("key")
256		// 1: loadGroup.Do("key", fn)
257		// 1: fn()
258		// 2: loadGroup.Do("key", fn)
259		// 2: fn()
260		if value, cacheHit := g.lookupCache(key); cacheHit {
261			g.Stats.CacheHits.Add(1)
262			return value, nil
263		}
264		g.Stats.LoadsDeduped.Add(1)
265		var value ByteView
266		var err error
267		if peer, ok := g.peers.PickPeer(key); ok {
268			value, err = g.getFromPeer(ctx, peer, key)
269			if err == nil {
270				g.Stats.PeerLoads.Add(1)
271				return value, nil
272			}
273			g.Stats.PeerErrors.Add(1)
274			// TODO(bradfitz): log the peer's error? keep
275			// log of the past few for /groupcachez?  It's
276			// probably boring (normal task movement), so not
277			// worth logging I imagine.
278		}
279		value, err = g.getLocally(ctx, key, dest)
280		if err != nil {
281			g.Stats.LocalLoadErrs.Add(1)
282			return nil, err
283		}
284		g.Stats.LocalLoads.Add(1)
285		destPopulated = true // only one caller of load gets this return value
286		g.populateCache(key, value, &g.mainCache)
287		return value, nil
288	})
289	if err == nil {
290		value = viewi.(ByteView)
291	}
292	return
293}
294
295func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) {
296	err := g.getter.Get(ctx, key, dest)
297	if err != nil {
298		return ByteView{}, err
299	}
300	return dest.view()
301}
302
303func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
304	req := &pb.GetRequest{
305		Group: &g.name,
306		Key:   &key,
307	}
308	res := &pb.GetResponse{}
309	err := peer.Get(ctx, req, res)
310	if err != nil {
311		return ByteView{}, err
312	}
313	value := ByteView{b: res.Value}
314	// TODO(bradfitz): use res.MinuteQps or something smart to
315	// conditionally populate hotCache.  For now just do it some
316	// percentage of the time.
317	if rand.Intn(10) == 0 {
318		g.populateCache(key, value, &g.hotCache)
319	}
320	return value, nil
321}
322
323func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
324	if g.cacheBytes <= 0 {
325		return
326	}
327	value, ok = g.mainCache.get(key)
328	if ok {
329		return
330	}
331	value, ok = g.hotCache.get(key)
332	return
333}
334
335func (g *Group) populateCache(key string, value ByteView, cache *cache) {
336	if g.cacheBytes <= 0 {
337		return
338	}
339	cache.add(key, value)
340
341	// Evict items from cache(s) if necessary.
342	for {
343		mainBytes := g.mainCache.bytes()
344		hotBytes := g.hotCache.bytes()
345		if mainBytes+hotBytes <= g.cacheBytes {
346			return
347		}
348
349		// TODO(bradfitz): this is good-enough-for-now logic.
350		// It should be something based on measurements and/or
351		// respecting the costs of different resources.
352		victim := &g.mainCache
353		if hotBytes > mainBytes/8 {
354			victim = &g.hotCache
355		}
356		victim.removeOldest()
357	}
358}
359
360// CacheType represents a type of cache.
361type CacheType int
362
363const (
364	// The MainCache is the cache for items that this peer is the
365	// owner for.
366	MainCache CacheType = iota + 1
367
368	// The HotCache is the cache for items that seem popular
369	// enough to replicate to this node, even though it's not the
370	// owner.
371	HotCache
372)
373
374// CacheStats returns stats about the provided cache within the group.
375func (g *Group) CacheStats(which CacheType) CacheStats {
376	switch which {
377	case MainCache:
378		return g.mainCache.stats()
379	case HotCache:
380		return g.hotCache.stats()
381	default:
382		return CacheStats{}
383	}
384}
385
386// cache is a wrapper around an *lru.Cache that adds synchronization,
387// makes values always be ByteView, and counts the size of all keys and
388// values.
389type cache struct {
390	mu         sync.RWMutex
391	nbytes     int64 // of all keys and values
392	lru        *lru.Cache
393	nhit, nget int64
394	nevict     int64 // number of evictions
395}
396
397func (c *cache) stats() CacheStats {
398	c.mu.RLock()
399	defer c.mu.RUnlock()
400	return CacheStats{
401		Bytes:     c.nbytes,
402		Items:     c.itemsLocked(),
403		Gets:      c.nget,
404		Hits:      c.nhit,
405		Evictions: c.nevict,
406	}
407}
408
409func (c *cache) add(key string, value ByteView) {
410	c.mu.Lock()
411	defer c.mu.Unlock()
412	if c.lru == nil {
413		c.lru = &lru.Cache{
414			OnEvicted: func(key lru.Key, value interface{}) {
415				val := value.(ByteView)
416				c.nbytes -= int64(len(key.(string))) + int64(val.Len())
417				c.nevict++
418			},
419		}
420	}
421	c.lru.Add(key, value)
422	c.nbytes += int64(len(key)) + int64(value.Len())
423}
424
425func (c *cache) get(key string) (value ByteView, ok bool) {
426	c.mu.Lock()
427	defer c.mu.Unlock()
428	c.nget++
429	if c.lru == nil {
430		return
431	}
432	vi, ok := c.lru.Get(key)
433	if !ok {
434		return
435	}
436	c.nhit++
437	return vi.(ByteView), true
438}
439
440func (c *cache) removeOldest() {
441	c.mu.Lock()
442	defer c.mu.Unlock()
443	if c.lru != nil {
444		c.lru.RemoveOldest()
445	}
446}
447
448func (c *cache) bytes() int64 {
449	c.mu.RLock()
450	defer c.mu.RUnlock()
451	return c.nbytes
452}
453
454func (c *cache) items() int64 {
455	c.mu.RLock()
456	defer c.mu.RUnlock()
457	return c.itemsLocked()
458}
459
460func (c *cache) itemsLocked() int64 {
461	if c.lru == nil {
462		return 0
463	}
464	return int64(c.lru.Len())
465}
466
467// An AtomicInt is an int64 to be accessed atomically.
468type AtomicInt int64
469
470// Add atomically adds n to i.
471func (i *AtomicInt) Add(n int64) {
472	atomic.AddInt64((*int64)(i), n)
473}
474
475// Get atomically gets the value of i.
476func (i *AtomicInt) Get() int64 {
477	return atomic.LoadInt64((*int64)(i))
478}
479
480func (i *AtomicInt) String() string {
481	return strconv.FormatInt(i.Get(), 10)
482}
483
484// CacheStats are returned by stats accessors on Group.
485type CacheStats struct {
486	Bytes     int64
487	Items     int64
488	Gets      int64
489	Hits      int64
490	Evictions int64
491}
492