1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package grpcproxy 16 17import ( 18 "github.com/coreos/etcd/clientv3" 19 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 20 "github.com/coreos/etcd/proxy/grpcproxy/cache" 21 22 "golang.org/x/net/context" 23) 24 25type kvProxy struct { 26 kv clientv3.KV 27 cache cache.Cache 28} 29 30func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) { 31 kv := &kvProxy{ 32 kv: c.KV, 33 cache: cache.NewCache(cache.DefaultMaxEntries), 34 } 35 donec := make(chan struct{}) 36 close(donec) 37 return kv, donec 38} 39 40func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { 41 if r.Serializable { 42 resp, err := p.cache.Get(r) 43 switch err { 44 case nil: 45 cacheHits.Inc() 46 return resp, nil 47 case cache.ErrCompacted: 48 cacheHits.Inc() 49 return nil, err 50 } 51 } 52 cachedMisses.Inc() 53 54 resp, err := p.kv.Do(ctx, RangeRequestToOp(r)) 55 if err != nil { 56 return nil, err 57 } 58 59 // cache linearizable as serializable 60 req := *r 61 req.Serializable = true 62 gresp := (*pb.RangeResponse)(resp.Get()) 63 p.cache.Add(&req, gresp) 64 cacheKeys.Set(float64(p.cache.Size())) 65 66 return gresp, nil 67} 68 69func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { 70 p.cache.Invalidate(r.Key, nil) 71 cacheKeys.Set(float64(p.cache.Size())) 72 73 resp, err := p.kv.Do(ctx, PutRequestToOp(r)) 74 return (*pb.PutResponse)(resp.Put()), err 75} 76 77func (p *kvProxy) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { 78 p.cache.Invalidate(r.Key, r.RangeEnd) 79 cacheKeys.Set(float64(p.cache.Size())) 80 81 resp, err := p.kv.Do(ctx, DelRequestToOp(r)) 82 return (*pb.DeleteRangeResponse)(resp.Del()), err 83} 84 85func (p *kvProxy) txnToCache(reqs []*pb.RequestOp, resps []*pb.ResponseOp) { 86 for i := range resps { 87 switch tv := resps[i].Response.(type) { 88 case *pb.ResponseOp_ResponsePut: 89 p.cache.Invalidate(reqs[i].GetRequestPut().Key, nil) 90 case *pb.ResponseOp_ResponseDeleteRange: 91 rdr := reqs[i].GetRequestDeleteRange() 92 p.cache.Invalidate(rdr.Key, rdr.RangeEnd) 93 case *pb.ResponseOp_ResponseRange: 94 req := *(reqs[i].GetRequestRange()) 95 req.Serializable = true 96 p.cache.Add(&req, tv.ResponseRange) 97 } 98 } 99} 100 101func (p *kvProxy) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { 102 txn := p.kv.Txn(ctx) 103 cmps := make([]clientv3.Cmp, len(r.Compare)) 104 thenops := make([]clientv3.Op, len(r.Success)) 105 elseops := make([]clientv3.Op, len(r.Failure)) 106 107 for i := range r.Compare { 108 cmps[i] = (clientv3.Cmp)(*r.Compare[i]) 109 } 110 111 for i := range r.Success { 112 thenops[i] = requestOpToOp(r.Success[i]) 113 } 114 115 for i := range r.Failure { 116 elseops[i] = requestOpToOp(r.Failure[i]) 117 } 118 119 resp, err := txn.If(cmps...).Then(thenops...).Else(elseops...).Commit() 120 121 if err != nil { 122 return nil, err 123 } 124 // txn may claim an outdated key is updated; be safe and invalidate 125 for _, cmp := range r.Compare { 126 p.cache.Invalidate(cmp.Key, nil) 127 } 128 // update any fetched keys 129 if resp.Succeeded { 130 p.txnToCache(r.Success, resp.Responses) 131 } else { 132 p.txnToCache(r.Failure, resp.Responses) 133 } 134 135 cacheKeys.Set(float64(p.cache.Size())) 136 137 return (*pb.TxnResponse)(resp), nil 138} 139 140func (p *kvProxy) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { 141 var opts []clientv3.CompactOption 142 if r.Physical { 143 opts = append(opts, clientv3.WithCompactPhysical()) 144 } 145 146 resp, err := p.kv.Compact(ctx, r.Revision, opts...) 147 if err == nil { 148 p.cache.Compact(r.Revision) 149 } 150 151 cacheKeys.Set(float64(p.cache.Size())) 152 153 return (*pb.CompactionResponse)(resp), err 154} 155 156func requestOpToOp(union *pb.RequestOp) clientv3.Op { 157 switch tv := union.Request.(type) { 158 case *pb.RequestOp_RequestRange: 159 if tv.RequestRange != nil { 160 return RangeRequestToOp(tv.RequestRange) 161 } 162 case *pb.RequestOp_RequestPut: 163 if tv.RequestPut != nil { 164 return PutRequestToOp(tv.RequestPut) 165 } 166 case *pb.RequestOp_RequestDeleteRange: 167 if tv.RequestDeleteRange != nil { 168 return DelRequestToOp(tv.RequestDeleteRange) 169 } 170 } 171 panic("unknown request") 172} 173 174func RangeRequestToOp(r *pb.RangeRequest) clientv3.Op { 175 opts := []clientv3.OpOption{} 176 if len(r.RangeEnd) != 0 { 177 opts = append(opts, clientv3.WithRange(string(r.RangeEnd))) 178 } 179 opts = append(opts, clientv3.WithRev(r.Revision)) 180 opts = append(opts, clientv3.WithLimit(r.Limit)) 181 opts = append(opts, clientv3.WithSort( 182 clientv3.SortTarget(r.SortTarget), 183 clientv3.SortOrder(r.SortOrder)), 184 ) 185 opts = append(opts, clientv3.WithMaxCreateRev(r.MaxCreateRevision)) 186 opts = append(opts, clientv3.WithMinCreateRev(r.MinCreateRevision)) 187 opts = append(opts, clientv3.WithMaxModRev(r.MaxModRevision)) 188 opts = append(opts, clientv3.WithMinModRev(r.MinModRevision)) 189 if r.CountOnly { 190 opts = append(opts, clientv3.WithCountOnly()) 191 } 192 if r.KeysOnly { 193 opts = append(opts, clientv3.WithKeysOnly()) 194 } 195 if r.Serializable { 196 opts = append(opts, clientv3.WithSerializable()) 197 } 198 199 return clientv3.OpGet(string(r.Key), opts...) 200} 201 202func PutRequestToOp(r *pb.PutRequest) clientv3.Op { 203 opts := []clientv3.OpOption{} 204 opts = append(opts, clientv3.WithLease(clientv3.LeaseID(r.Lease))) 205 if r.IgnoreValue { 206 opts = append(opts, clientv3.WithIgnoreValue()) 207 } 208 if r.IgnoreLease { 209 opts = append(opts, clientv3.WithIgnoreLease()) 210 } 211 if r.PrevKv { 212 opts = append(opts, clientv3.WithPrevKV()) 213 } 214 return clientv3.OpPut(string(r.Key), string(r.Value), opts...) 215} 216 217func DelRequestToOp(r *pb.DeleteRangeRequest) clientv3.Op { 218 opts := []clientv3.OpOption{} 219 if len(r.RangeEnd) != 0 { 220 opts = append(opts, clientv3.WithRange(string(r.RangeEnd))) 221 } 222 if r.PrevKv { 223 opts = append(opts, clientv3.WithPrevKV()) 224 } 225 return clientv3.OpDelete(string(r.Key), opts...) 226} 227