1// Copyright 2015 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package v3rpc implements etcd v3 RPC system based on gRPC. 16package v3rpc 17 18import ( 19 "sort" 20 21 "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" 22 "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" 23 "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" 24 "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes" 25 "github.com/coreos/etcd/etcdserver" 26 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" 27 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 28 "github.com/coreos/etcd/lease" 29 "github.com/coreos/etcd/storage" 30) 31 32var ( 33 plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver/api", "v3rpc") 34 35 // Max operations per txn list. For example, Txn.Success can have at most 128 operations, 36 // and Txn.Failure can have at most 128 operations. 37 MaxOpsPerTxn = 128 38) 39 40type kvServer struct { 41 clusterID int64 42 memberID int64 43 raftTimer etcdserver.RaftTimer 44 45 kv etcdserver.RaftKV 46} 47 48func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer { 49 return &kvServer{ 50 clusterID: int64(s.Cluster().ID()), 51 memberID: int64(s.ID()), 52 raftTimer: s, 53 kv: s, 54 } 55} 56 57func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { 58 if err := checkRangeRequest(r); err != nil { 59 return nil, err 60 } 61 62 resp, err := s.kv.Range(ctx, r) 63 if err != nil { 64 return nil, togRPCError(err) 65 } 66 67 if resp.Header == nil { 68 plog.Panic("unexpected nil resp.Header") 69 } 70 s.fillInHeader(resp.Header) 71 return resp, err 72} 73 74func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { 75 if err := checkPutRequest(r); err != nil { 76 return nil, err 77 } 78 79 resp, err := s.kv.Put(ctx, r) 80 if err != nil { 81 return nil, togRPCError(err) 82 } 83 84 if resp.Header == nil { 85 plog.Panic("unexpected nil resp.Header") 86 } 87 s.fillInHeader(resp.Header) 88 return resp, err 89} 90 91func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { 92 if err := checkDeleteRequest(r); err != nil { 93 return nil, err 94 } 95 96 resp, err := s.kv.DeleteRange(ctx, r) 97 if err != nil { 98 return nil, togRPCError(err) 99 } 100 101 if resp.Header == nil { 102 plog.Panic("unexpected nil resp.Header") 103 } 104 s.fillInHeader(resp.Header) 105 return resp, err 106} 107 108func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { 109 if err := checkTxnRequest(r); err != nil { 110 return nil, err 111 } 112 113 resp, err := s.kv.Txn(ctx, r) 114 if err != nil { 115 return nil, togRPCError(err) 116 } 117 118 if resp.Header == nil { 119 plog.Panic("unexpected nil resp.Header") 120 } 121 s.fillInHeader(resp.Header) 122 return resp, err 123} 124 125func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { 126 resp, err := s.kv.Compact(ctx, r) 127 if err != nil { 128 return nil, togRPCError(err) 129 } 130 131 if resp.Header == nil { 132 plog.Panic("unexpected nil resp.Header") 133 } 134 s.fillInHeader(resp.Header) 135 return resp, nil 136} 137 138func (s *kvServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { 139 resp, err := s.kv.Hash(ctx, r) 140 if err != nil { 141 return nil, togRPCError(err) 142 } 143 s.fillInHeader(resp.Header) 144 return resp, nil 145} 146 147// fillInHeader populates pb.ResponseHeader from kvServer, except Revision. 148func (s *kvServer) fillInHeader(h *pb.ResponseHeader) { 149 h.ClusterId = uint64(s.clusterID) 150 h.MemberId = uint64(s.memberID) 151 h.RaftTerm = s.raftTimer.Term() 152} 153 154func checkRangeRequest(r *pb.RangeRequest) error { 155 if len(r.Key) == 0 { 156 return rpctypes.ErrEmptyKey 157 } 158 return nil 159} 160 161func checkPutRequest(r *pb.PutRequest) error { 162 if len(r.Key) == 0 { 163 return rpctypes.ErrEmptyKey 164 } 165 return nil 166} 167 168func checkDeleteRequest(r *pb.DeleteRangeRequest) error { 169 if len(r.Key) == 0 { 170 return rpctypes.ErrEmptyKey 171 } 172 return nil 173} 174 175func checkTxnRequest(r *pb.TxnRequest) error { 176 if len(r.Compare) > MaxOpsPerTxn || len(r.Success) > MaxOpsPerTxn || len(r.Failure) > MaxOpsPerTxn { 177 return rpctypes.ErrTooManyOps 178 } 179 180 for _, c := range r.Compare { 181 if len(c.Key) == 0 { 182 return rpctypes.ErrEmptyKey 183 } 184 } 185 186 for _, u := range r.Success { 187 if err := checkRequestUnion(u); err != nil { 188 return err 189 } 190 } 191 if err := checkRequestDupKeys(r.Success); err != nil { 192 return err 193 } 194 195 for _, u := range r.Failure { 196 if err := checkRequestUnion(u); err != nil { 197 return err 198 } 199 } 200 if err := checkRequestDupKeys(r.Failure); err != nil { 201 return err 202 } 203 204 return nil 205} 206 207// checkRequestDupKeys gives rpctypes.ErrDuplicateKey if the same key is modified twice 208func checkRequestDupKeys(reqs []*pb.RequestUnion) error { 209 // check put overlap 210 keys := make(map[string]struct{}) 211 for _, requ := range reqs { 212 tv, ok := requ.Request.(*pb.RequestUnion_RequestPut) 213 if !ok { 214 continue 215 } 216 preq := tv.RequestPut 217 if preq == nil { 218 continue 219 } 220 key := string(preq.Key) 221 if _, ok := keys[key]; ok { 222 return rpctypes.ErrDuplicateKey 223 } 224 keys[key] = struct{}{} 225 } 226 227 // no need to check deletes if no puts; delete overlaps are permitted 228 if len(keys) == 0 { 229 return nil 230 } 231 232 // sort keys for range checking 233 sortedKeys := []string{} 234 for k := range keys { 235 sortedKeys = append(sortedKeys, k) 236 } 237 sort.Strings(sortedKeys) 238 239 // check put overlap with deletes 240 for _, requ := range reqs { 241 tv, ok := requ.Request.(*pb.RequestUnion_RequestDeleteRange) 242 if !ok { 243 continue 244 } 245 dreq := tv.RequestDeleteRange 246 if dreq == nil { 247 continue 248 } 249 key := string(dreq.Key) 250 if dreq.RangeEnd == nil { 251 if _, found := keys[key]; found { 252 return rpctypes.ErrDuplicateKey 253 } 254 } else { 255 lo := sort.SearchStrings(sortedKeys, key) 256 hi := sort.SearchStrings(sortedKeys, string(dreq.RangeEnd)) 257 if lo != hi { 258 // element between lo and hi => overlap 259 return rpctypes.ErrDuplicateKey 260 } 261 } 262 } 263 264 return nil 265} 266 267func checkRequestUnion(u *pb.RequestUnion) error { 268 // TODO: ensure only one of the field is set. 269 switch uv := u.Request.(type) { 270 case *pb.RequestUnion_RequestRange: 271 if uv.RequestRange != nil { 272 return checkRangeRequest(uv.RequestRange) 273 } 274 case *pb.RequestUnion_RequestPut: 275 if uv.RequestPut != nil { 276 return checkPutRequest(uv.RequestPut) 277 } 278 case *pb.RequestUnion_RequestDeleteRange: 279 if uv.RequestDeleteRange != nil { 280 return checkDeleteRequest(uv.RequestDeleteRange) 281 } 282 default: 283 // empty union 284 return nil 285 } 286 return nil 287} 288 289func togRPCError(err error) error { 290 switch err { 291 case storage.ErrCompacted: 292 return rpctypes.ErrCompacted 293 case storage.ErrFutureRev: 294 return rpctypes.ErrFutureRev 295 case lease.ErrLeaseNotFound: 296 return rpctypes.ErrLeaseNotFound 297 // TODO: handle error from raft and timeout 298 case etcdserver.ErrRequestTooLarge: 299 return rpctypes.ErrRequestTooLarge 300 default: 301 return grpc.Errorf(codes.Internal, err.Error()) 302 } 303} 304