1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package cache exports functionality for efficiently caching and mapping 16// `RangeRequest`s to corresponding `RangeResponse`s. 17package cache 18 19import ( 20 "errors" 21 "sync" 22 23 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" 24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 25 "github.com/coreos/etcd/pkg/adt" 26 "github.com/golang/groupcache/lru" 27) 28 29var ( 30 DefaultMaxEntries = 2048 31 ErrCompacted = rpctypes.ErrGRPCCompacted 32) 33 34type Cache interface { 35 Add(req *pb.RangeRequest, resp *pb.RangeResponse) 36 Get(req *pb.RangeRequest) (*pb.RangeResponse, error) 37 Compact(revision int64) 38 Invalidate(key []byte, endkey []byte) 39 Size() int 40 Close() 41} 42 43// keyFunc returns the key of a request, which is used to look up its caching response in the cache. 44func keyFunc(req *pb.RangeRequest) string { 45 // TODO: use marshalTo to reduce allocation 46 b, err := req.Marshal() 47 if err != nil { 48 panic(err) 49 } 50 return string(b) 51} 52 53func NewCache(maxCacheEntries int) Cache { 54 return &cache{ 55 lru: lru.New(maxCacheEntries), 56 cachedRanges: adt.NewIntervalTree(), 57 compactedRev: -1, 58 } 59} 60 61func (c *cache) Close() {} 62 63// cache implements Cache 64type cache struct { 65 mu sync.RWMutex 66 lru *lru.Cache 67 68 // a reverse index for cache invalidation 69 cachedRanges adt.IntervalTree 70 71 compactedRev int64 72} 73 74// Add adds the response of a request to the cache if its revision is larger than the compacted revision of the cache. 75func (c *cache) Add(req *pb.RangeRequest, resp *pb.RangeResponse) { 76 key := keyFunc(req) 77 78 c.mu.Lock() 79 defer c.mu.Unlock() 80 81 if req.Revision > c.compactedRev { 82 c.lru.Add(key, resp) 83 } 84 // we do not need to invalidate a request with a revision specified. 85 // so we do not need to add it into the reverse index. 86 if req.Revision != 0 { 87 return 88 } 89 90 var ( 91 iv *adt.IntervalValue 92 ivl adt.Interval 93 ) 94 if len(req.RangeEnd) != 0 { 95 ivl = adt.NewStringAffineInterval(string(req.Key), string(req.RangeEnd)) 96 } else { 97 ivl = adt.NewStringAffinePoint(string(req.Key)) 98 } 99 100 iv = c.cachedRanges.Find(ivl) 101 102 if iv == nil { 103 val := map[string]struct{}{key: {}} 104 c.cachedRanges.Insert(ivl, val) 105 } else { 106 val := iv.Val.(map[string]struct{}) 107 val[key] = struct{}{} 108 iv.Val = val 109 } 110} 111 112// Get looks up the caching response for a given request. 113// Get is also responsible for lazy eviction when accessing compacted entries. 114func (c *cache) Get(req *pb.RangeRequest) (*pb.RangeResponse, error) { 115 key := keyFunc(req) 116 117 c.mu.Lock() 118 defer c.mu.Unlock() 119 120 if req.Revision > 0 && req.Revision < c.compactedRev { 121 c.lru.Remove(key) 122 return nil, ErrCompacted 123 } 124 125 if resp, ok := c.lru.Get(key); ok { 126 return resp.(*pb.RangeResponse), nil 127 } 128 return nil, errors.New("not exist") 129} 130 131// Invalidate invalidates the cache entries that intersecting with the given range from key to endkey. 132func (c *cache) Invalidate(key, endkey []byte) { 133 c.mu.Lock() 134 defer c.mu.Unlock() 135 136 var ( 137 ivs []*adt.IntervalValue 138 ivl adt.Interval 139 ) 140 if len(endkey) == 0 { 141 ivl = adt.NewStringAffinePoint(string(key)) 142 } else { 143 ivl = adt.NewStringAffineInterval(string(key), string(endkey)) 144 } 145 146 ivs = c.cachedRanges.Stab(ivl) 147 for _, iv := range ivs { 148 keys := iv.Val.(map[string]struct{}) 149 for key := range keys { 150 c.lru.Remove(key) 151 } 152 } 153 // delete after removing all keys since it is destructive to 'ivs' 154 c.cachedRanges.Delete(ivl) 155} 156 157// Compact invalidate all caching response before the given rev. 158// Replace with the invalidation is lazy. The actual removal happens when the entries is accessed. 159func (c *cache) Compact(revision int64) { 160 c.mu.Lock() 161 defer c.mu.Unlock() 162 163 if revision > c.compactedRev { 164 c.compactedRev = revision 165 } 166} 167 168func (c *cache) Size() int { 169 c.mu.RLock() 170 defer c.mu.RUnlock() 171 return c.lru.Len() 172} 173