1/*
2Copyright 2012 Google Inc.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// Package groupcache provides a data loading mechanism with caching
18// and de-duplication that works across a set of peer processes.
19//
20// Each data Get first consults its local cache, otherwise delegates
21// to the requested key's canonical owner, which then checks its cache
22// or finally gets the data.  In the common case, many concurrent
23// cache misses across a set of peers for the same key result in just
24// one cache fill.
25package groupcache
26
27import (
28	"context"
29	"errors"
30	"math/rand"
31	"strconv"
32	"sync"
33	"sync/atomic"
34
35	pb "github.com/golang/groupcache/groupcachepb"
36	"github.com/golang/groupcache/lru"
37	"github.com/golang/groupcache/singleflight"
38)
39
40// A Getter loads data for a key.
41type Getter interface {
42	// Get returns the value identified by key, populating dest.
43	//
44	// The returned data must be unversioned. That is, key must
45	// uniquely describe the loaded data, without an implicit
46	// current time, and without relying on cache expiration
47	// mechanisms.
48	Get(ctx context.Context, key string, dest Sink) error
49}
50
51// A GetterFunc implements Getter with a function.
52type GetterFunc func(ctx context.Context, key string, dest Sink) error
53
54func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
55	return f(ctx, key, dest)
56}
57
58var (
59	mu     sync.RWMutex
60	groups = make(map[string]*Group)
61
62	initPeerServerOnce sync.Once
63	initPeerServer     func()
64)
65
66// GetGroup returns the named group previously created with NewGroup, or
67// nil if there's no such group.
68func GetGroup(name string) *Group {
69	mu.RLock()
70	g := groups[name]
71	mu.RUnlock()
72	return g
73}
74
75// NewGroup creates a coordinated group-aware Getter from a Getter.
76//
77// The returned Getter tries (but does not guarantee) to run only one
78// Get call at once for a given key across an entire set of peer
79// processes. Concurrent callers both in the local process and in
80// other processes receive copies of the answer once the original Get
81// completes.
82//
83// The group name must be unique for each getter.
84func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
85	return newGroup(name, cacheBytes, getter, nil)
86}
87
88// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
89func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
90	if getter == nil {
91		panic("nil Getter")
92	}
93	mu.Lock()
94	defer mu.Unlock()
95	initPeerServerOnce.Do(callInitPeerServer)
96	if _, dup := groups[name]; dup {
97		panic("duplicate registration of group " + name)
98	}
99	g := &Group{
100		name:       name,
101		getter:     getter,
102		peers:      peers,
103		cacheBytes: cacheBytes,
104		loadGroup:  &singleflight.Group{},
105	}
106	if fn := newGroupHook; fn != nil {
107		fn(g)
108	}
109	groups[name] = g
110	return g
111}
112
113// newGroupHook, if non-nil, is called right after a new group is created.
114var newGroupHook func(*Group)
115
116// RegisterNewGroupHook registers a hook that is run each time
117// a group is created.
118func RegisterNewGroupHook(fn func(*Group)) {
119	if newGroupHook != nil {
120		panic("RegisterNewGroupHook called more than once")
121	}
122	newGroupHook = fn
123}
124
125// RegisterServerStart registers a hook that is run when the first
126// group is created.
127func RegisterServerStart(fn func()) {
128	if initPeerServer != nil {
129		panic("RegisterServerStart called more than once")
130	}
131	initPeerServer = fn
132}
133
134func callInitPeerServer() {
135	if initPeerServer != nil {
136		initPeerServer()
137	}
138}
139
140// A Group is a cache namespace and associated data loaded spread over
141// a group of 1 or more machines.
142type Group struct {
143	name       string
144	getter     Getter
145	peersOnce  sync.Once
146	peers      PeerPicker
147	cacheBytes int64 // limit for sum of mainCache and hotCache size
148
149	// mainCache is a cache of the keys for which this process
150	// (amongst its peers) is authoritative. That is, this cache
151	// contains keys which consistent hash on to this process's
152	// peer number.
153	mainCache cache
154
155	// hotCache contains keys/values for which this peer is not
156	// authoritative (otherwise they would be in mainCache), but
157	// are popular enough to warrant mirroring in this process to
158	// avoid going over the network to fetch from a peer.  Having
159	// a hotCache avoids network hotspotting, where a peer's
160	// network card could become the bottleneck on a popular key.
161	// This cache is used sparingly to maximize the total number
162	// of key/value pairs that can be stored globally.
163	hotCache cache
164
165	// loadGroup ensures that each key is only fetched once
166	// (either locally or remotely), regardless of the number of
167	// concurrent callers.
168	loadGroup flightGroup
169
170	_ int32 // force Stats to be 8-byte aligned on 32-bit platforms
171
172	// Stats are statistics on the group.
173	Stats Stats
174}
175
176// flightGroup is defined as an interface which flightgroup.Group
177// satisfies.  We define this so that we may test with an alternate
178// implementation.
179type flightGroup interface {
180	// Done is called when Do is done.
181	Do(key string, fn func() (interface{}, error)) (interface{}, error)
182}
183
184// Stats are per-group statistics.
185type Stats struct {
186	Gets           AtomicInt // any Get request, including from peers
187	CacheHits      AtomicInt // either cache was good
188	PeerLoads      AtomicInt // either remote load or remote cache hit (not an error)
189	PeerErrors     AtomicInt
190	Loads          AtomicInt // (gets - cacheHits)
191	LoadsDeduped   AtomicInt // after singleflight
192	LocalLoads     AtomicInt // total good local loads
193	LocalLoadErrs  AtomicInt // total bad local loads
194	ServerRequests AtomicInt // gets that came over the network from peers
195}
196
197// Name returns the name of the group.
198func (g *Group) Name() string {
199	return g.name
200}
201
202func (g *Group) initPeers() {
203	if g.peers == nil {
204		g.peers = getPeers(g.name)
205	}
206}
207
208func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
209	g.peersOnce.Do(g.initPeers)
210	g.Stats.Gets.Add(1)
211	if dest == nil {
212		return errors.New("groupcache: nil dest Sink")
213	}
214	value, cacheHit := g.lookupCache(key)
215
216	if cacheHit {
217		g.Stats.CacheHits.Add(1)
218		return setSinkView(dest, value)
219	}
220
221	// Optimization to avoid double unmarshalling or copying: keep
222	// track of whether the dest was already populated. One caller
223	// (if local) will set this; the losers will not. The common
224	// case will likely be one caller.
225	destPopulated := false
226	value, destPopulated, err := g.load(ctx, key, dest)
227	if err != nil {
228		return err
229	}
230	if destPopulated {
231		return nil
232	}
233	return setSinkView(dest, value)
234}
235
236// load loads key either by invoking the getter locally or by sending it to another machine.
237func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
238	g.Stats.Loads.Add(1)
239	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
240		// Check the cache again because singleflight can only dedup calls
241		// that overlap concurrently.  It's possible for 2 concurrent
242		// requests to miss the cache, resulting in 2 load() calls.  An
243		// unfortunate goroutine scheduling would result in this callback
244		// being run twice, serially.  If we don't check the cache again,
245		// cache.nbytes would be incremented below even though there will
246		// be only one entry for this key.
247		//
248		// Consider the following serialized event ordering for two
249		// goroutines in which this callback gets called twice for the
250		// same key:
251		// 1: Get("key")
252		// 2: Get("key")
253		// 1: lookupCache("key")
254		// 2: lookupCache("key")
255		// 1: load("key")
256		// 2: load("key")
257		// 1: loadGroup.Do("key", fn)
258		// 1: fn()
259		// 2: loadGroup.Do("key", fn)
260		// 2: fn()
261		if value, cacheHit := g.lookupCache(key); cacheHit {
262			g.Stats.CacheHits.Add(1)
263			return value, nil
264		}
265		g.Stats.LoadsDeduped.Add(1)
266		var value ByteView
267		var err error
268		if peer, ok := g.peers.PickPeer(key); ok {
269			value, err = g.getFromPeer(ctx, peer, key)
270			if err == nil {
271				g.Stats.PeerLoads.Add(1)
272				return value, nil
273			}
274			g.Stats.PeerErrors.Add(1)
275			// TODO(bradfitz): log the peer's error? keep
276			// log of the past few for /groupcachez?  It's
277			// probably boring (normal task movement), so not
278			// worth logging I imagine.
279		}
280		value, err = g.getLocally(ctx, key, dest)
281		if err != nil {
282			g.Stats.LocalLoadErrs.Add(1)
283			return nil, err
284		}
285		g.Stats.LocalLoads.Add(1)
286		destPopulated = true // only one caller of load gets this return value
287		g.populateCache(key, value, &g.mainCache)
288		return value, nil
289	})
290	if err == nil {
291		value = viewi.(ByteView)
292	}
293	return
294}
295
296func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
297	err := g.getter.Get(ctx, key, dest)
298	if err != nil {
299		return ByteView{}, err
300	}
301	return dest.view()
302}
303
304func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
305	req := &pb.GetRequest{
306		Group: &g.name,
307		Key:   &key,
308	}
309	res := &pb.GetResponse{}
310	err := peer.Get(ctx, req, res)
311	if err != nil {
312		return ByteView{}, err
313	}
314	value := ByteView{b: res.Value}
315	// TODO(bradfitz): use res.MinuteQps or something smart to
316	// conditionally populate hotCache.  For now just do it some
317	// percentage of the time.
318	if rand.Intn(10) == 0 {
319		g.populateCache(key, value, &g.hotCache)
320	}
321	return value, nil
322}
323
324func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
325	if g.cacheBytes <= 0 {
326		return
327	}
328	value, ok = g.mainCache.get(key)
329	if ok {
330		return
331	}
332	value, ok = g.hotCache.get(key)
333	return
334}
335
336func (g *Group) populateCache(key string, value ByteView, cache *cache) {
337	if g.cacheBytes <= 0 {
338		return
339	}
340	cache.add(key, value)
341
342	// Evict items from cache(s) if necessary.
343	for {
344		mainBytes := g.mainCache.bytes()
345		hotBytes := g.hotCache.bytes()
346		if mainBytes+hotBytes <= g.cacheBytes {
347			return
348		}
349
350		// TODO(bradfitz): this is good-enough-for-now logic.
351		// It should be something based on measurements and/or
352		// respecting the costs of different resources.
353		victim := &g.mainCache
354		if hotBytes > mainBytes/8 {
355			victim = &g.hotCache
356		}
357		victim.removeOldest()
358	}
359}
360
361// CacheType represents a type of cache.
362type CacheType int
363
364const (
365	// The MainCache is the cache for items that this peer is the
366	// owner for.
367	MainCache CacheType = iota + 1
368
369	// The HotCache is the cache for items that seem popular
370	// enough to replicate to this node, even though it's not the
371	// owner.
372	HotCache
373)
374
375// CacheStats returns stats about the provided cache within the group.
376func (g *Group) CacheStats(which CacheType) CacheStats {
377	switch which {
378	case MainCache:
379		return g.mainCache.stats()
380	case HotCache:
381		return g.hotCache.stats()
382	default:
383		return CacheStats{}
384	}
385}
386
387// cache is a wrapper around an *lru.Cache that adds synchronization,
388// makes values always be ByteView, and counts the size of all keys and
389// values.
390type cache struct {
391	mu         sync.RWMutex
392	nbytes     int64 // of all keys and values
393	lru        *lru.Cache
394	nhit, nget int64
395	nevict     int64 // number of evictions
396}
397
398func (c *cache) stats() CacheStats {
399	c.mu.RLock()
400	defer c.mu.RUnlock()
401	return CacheStats{
402		Bytes:     c.nbytes,
403		Items:     c.itemsLocked(),
404		Gets:      c.nget,
405		Hits:      c.nhit,
406		Evictions: c.nevict,
407	}
408}
409
410func (c *cache) add(key string, value ByteView) {
411	c.mu.Lock()
412	defer c.mu.Unlock()
413	if c.lru == nil {
414		c.lru = &lru.Cache{
415			OnEvicted: func(key lru.Key, value interface{}) {
416				val := value.(ByteView)
417				c.nbytes -= int64(len(key.(string))) + int64(val.Len())
418				c.nevict++
419			},
420		}
421	}
422	c.lru.Add(key, value)
423	c.nbytes += int64(len(key)) + int64(value.Len())
424}
425
426func (c *cache) get(key string) (value ByteView, ok bool) {
427	c.mu.Lock()
428	defer c.mu.Unlock()
429	c.nget++
430	if c.lru == nil {
431		return
432	}
433	vi, ok := c.lru.Get(key)
434	if !ok {
435		return
436	}
437	c.nhit++
438	return vi.(ByteView), true
439}
440
441func (c *cache) removeOldest() {
442	c.mu.Lock()
443	defer c.mu.Unlock()
444	if c.lru != nil {
445		c.lru.RemoveOldest()
446	}
447}
448
449func (c *cache) bytes() int64 {
450	c.mu.RLock()
451	defer c.mu.RUnlock()
452	return c.nbytes
453}
454
455func (c *cache) items() int64 {
456	c.mu.RLock()
457	defer c.mu.RUnlock()
458	return c.itemsLocked()
459}
460
461func (c *cache) itemsLocked() int64 {
462	if c.lru == nil {
463		return 0
464	}
465	return int64(c.lru.Len())
466}
467
468// An AtomicInt is an int64 to be accessed atomically.
469type AtomicInt int64
470
471// Add atomically adds n to i.
472func (i *AtomicInt) Add(n int64) {
473	atomic.AddInt64((*int64)(i), n)
474}
475
476// Get atomically gets the value of i.
477func (i *AtomicInt) Get() int64 {
478	return atomic.LoadInt64((*int64)(i))
479}
480
481func (i *AtomicInt) String() string {
482	return strconv.FormatInt(i.Get(), 10)
483}
484
485// CacheStats are returned by stats accessors on Group.
486type CacheStats struct {
487	Bytes     int64
488	Items     int64
489	Gets      int64
490	Hits      int64
491	Evictions int64
492}
493