1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package cache exports functionality for efficiently caching and mapping
16// `RangeRequest`s to corresponding `RangeResponse`s.
17package cache
18
19import (
20	"errors"
21	"sync"
22
23	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
24	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25	"github.com/coreos/etcd/pkg/adt"
26	"github.com/golang/groupcache/lru"
27)
28
29var (
30	DefaultMaxEntries = 2048
31	ErrCompacted      = rpctypes.ErrGRPCCompacted
32)
33
34type Cache interface {
35	Add(req *pb.RangeRequest, resp *pb.RangeResponse)
36	Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
37	Compact(revision int64)
38	Invalidate(key []byte, endkey []byte)
39	Size() int
40	Close()
41}
42
43// keyFunc returns the key of a request, which is used to look up its caching response in the cache.
44func keyFunc(req *pb.RangeRequest) string {
45	// TODO: use marshalTo to reduce allocation
46	b, err := req.Marshal()
47	if err != nil {
48		panic(err)
49	}
50	return string(b)
51}
52
53func NewCache(maxCacheEntries int) Cache {
54	return &cache{
55		lru:          lru.New(maxCacheEntries),
56		cachedRanges: adt.NewIntervalTree(),
57		compactedRev: -1,
58	}
59}
60
61func (c *cache) Close() {}
62
63// cache implements Cache
64type cache struct {
65	mu  sync.RWMutex
66	lru *lru.Cache
67
68	// a reverse index for cache invalidation
69	cachedRanges adt.IntervalTree
70
71	compactedRev int64
72}
73
74// Add adds the response of a request to the cache if its revision is larger than the compacted revision of the cache.
75func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) {
76	key := keyFunc(req)
77
78	c.mu.Lock()
79	defer c.mu.Unlock()
80
81	if req.Revision > c.compactedRev {
82		c.lru.Add(key, resp)
83	}
84	// we do not need to invalidate a request with a revision specified.
85	// so we do not need to add it into the reverse index.
86	if req.Revision != 0 {
87		return
88	}
89
90	var (
91		iv  *adt.IntervalValue
92		ivl adt.Interval
93	)
94	if len(req.RangeEnd) != 0 {
95		ivl = adt.NewStringAffineInterval(string(req.Key), string(req.RangeEnd))
96	} else {
97		ivl = adt.NewStringAffinePoint(string(req.Key))
98	}
99
100	iv = c.cachedRanges.Find(ivl)
101
102	if iv == nil {
103		val := map[string]struct{}{key: {}}
104		c.cachedRanges.Insert(ivl, val)
105	} else {
106		val := iv.Val.(map[string]struct{})
107		val[key] = struct{}{}
108		iv.Val = val
109	}
110}
111
112// Get looks up the caching response for a given request.
113// Get is also responsible for lazy eviction when accessing compacted entries.
114func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) {
115	key := keyFunc(req)
116
117	c.mu.Lock()
118	defer c.mu.Unlock()
119
120	if req.Revision > 0 && req.Revision < c.compactedRev {
121		c.lru.Remove(key)
122		return nil, ErrCompacted
123	}
124
125	if resp, ok := c.lru.Get(key); ok {
126		return resp.(*pb.RangeResponse), nil
127	}
128	return nil, errors.New("not exist")
129}
130
131// Invalidate invalidates the cache entries that intersecting with the given range from key to endkey.
132func (c *cache) Invalidate(key, endkey []byte) {
133	c.mu.Lock()
134	defer c.mu.Unlock()
135
136	var (
137		ivs []*adt.IntervalValue
138		ivl adt.Interval
139	)
140	if len(endkey) == 0 {
141		ivl = adt.NewStringAffinePoint(string(key))
142	} else {
143		ivl = adt.NewStringAffineInterval(string(key), string(endkey))
144	}
145
146	ivs = c.cachedRanges.Stab(ivl)
147	for _, iv := range ivs {
148		keys := iv.Val.(map[string]struct{})
149		for key := range keys {
150			c.lru.Remove(key)
151		}
152	}
153	// delete after removing all keys since it is destructive to 'ivs'
154	c.cachedRanges.Delete(ivl)
155}
156
157// Compact invalidate all caching response before the given rev.
158// Replace with the invalidation is lazy. The actual removal happens when the entries is accessed.
159func (c *cache) Compact(revision int64) {
160	c.mu.Lock()
161	defer c.mu.Unlock()
162
163	if revision > c.compactedRev {
164		c.compactedRev = revision
165	}
166}
167
168func (c *cache) Size() int {
169	c.mu.RLock()
170	defer c.mu.RUnlock()
171	return c.lru.Len()
172}
173