1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "bytes" 19 "context" 20 "fmt" 21 "sort" 22 "strconv" 23 "time" 24 25 "github.com/coreos/go-semver/semver" 26 pb "go.etcd.io/etcd/api/v3/etcdserverpb" 27 "go.etcd.io/etcd/api/v3/membershippb" 28 "go.etcd.io/etcd/api/v3/mvccpb" 29 "go.etcd.io/etcd/pkg/v3/traceutil" 30 "go.etcd.io/etcd/pkg/v3/types" 31 "go.etcd.io/etcd/server/v3/auth" 32 "go.etcd.io/etcd/server/v3/etcdserver/api" 33 "go.etcd.io/etcd/server/v3/etcdserver/api/membership" 34 "go.etcd.io/etcd/server/v3/lease" 35 "go.etcd.io/etcd/server/v3/mvcc" 36 37 "github.com/gogo/protobuf/proto" 38 "go.uber.org/zap" 39) 40 41const ( 42 v3Version = "v3" 43) 44 45type applyResult struct { 46 resp proto.Message 47 err error 48 // physc signals the physical effect of the request has completed in addition 49 // to being logically reflected by the node. Currently only used for 50 // Compaction requests. 51 physc <-chan struct{} 52 trace *traceutil.Trace 53} 54 55// applierV3Internal is the interface for processing internal V3 raft request 56type applierV3Internal interface { 57 ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) 58 ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) 59 DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) 60} 61 62// applierV3 is the interface for processing V3 raft messages 63type applierV3 interface { 64 Apply(r *pb.InternalRaftRequest) *applyResult 65 66 Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) 67 Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) 68 DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) 69 Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) 70 Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) 71 72 LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) 73 LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) 74 75 LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) 76 77 Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error) 78 79 Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) 80 81 AuthEnable() (*pb.AuthEnableResponse, error) 82 AuthDisable() (*pb.AuthDisableResponse, error) 83 AuthStatus() (*pb.AuthStatusResponse, error) 84 85 UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) 86 UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) 87 UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) 88 UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) 89 UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) 90 UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) 91 RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) 92 RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) 93 RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) 94 RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) 95 RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) 96 UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) 97 RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) 98} 99 100type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error 101 102type applierV3backend struct { 103 s *EtcdServer 104 105 checkPut checkReqFunc 106 checkRange checkReqFunc 107} 108 109func (s *EtcdServer) newApplierV3Backend() applierV3 { 110 base := &applierV3backend{s: s} 111 base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error { 112 return base.checkRequestPut(rv, req) 113 } 114 base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error { 115 return base.checkRequestRange(rv, req) 116 } 117 return base 118} 119 120func (s *EtcdServer) newApplierV3Internal() applierV3Internal { 121 base := &applierV3backend{s: s} 122 return base 123} 124 125func (s *EtcdServer) newApplierV3() applierV3 { 126 return newAuthApplierV3( 127 s.AuthStore(), 128 newQuotaApplierV3(s, s.newApplierV3Backend()), 129 s.lessor, 130 ) 131} 132 133func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { 134 op := "unknown" 135 ar := &applyResult{} 136 defer func(start time.Time) { 137 success := ar.err == nil || ar.err == mvcc.ErrCompacted 138 applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) 139 warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) 140 if !success { 141 warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) 142 } 143 }(time.Now()) 144 145 // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls 146 switch { 147 case r.Range != nil: 148 op = "Range" 149 ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) 150 case r.Put != nil: 151 op = "Put" 152 ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put) 153 case r.DeleteRange != nil: 154 op = "DeleteRange" 155 ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) 156 case r.Txn != nil: 157 op = "Txn" 158 ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn) 159 case r.Compaction != nil: 160 op = "Compaction" 161 ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) 162 case r.LeaseGrant != nil: 163 op = "LeaseGrant" 164 ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) 165 case r.LeaseRevoke != nil: 166 op = "LeaseRevoke" 167 ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke) 168 case r.LeaseCheckpoint != nil: 169 op = "LeaseCheckpoint" 170 ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) 171 case r.Alarm != nil: 172 op = "Alarm" 173 ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm) 174 case r.Authenticate != nil: 175 op = "Authenticate" 176 ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate) 177 case r.AuthEnable != nil: 178 op = "AuthEnable" 179 ar.resp, ar.err = a.s.applyV3.AuthEnable() 180 case r.AuthDisable != nil: 181 op = "AuthDisable" 182 ar.resp, ar.err = a.s.applyV3.AuthDisable() 183 case r.AuthStatus != nil: 184 ar.resp, ar.err = a.s.applyV3.AuthStatus() 185 case r.AuthUserAdd != nil: 186 op = "AuthUserAdd" 187 ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd) 188 case r.AuthUserDelete != nil: 189 op = "AuthUserDelete" 190 ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete) 191 case r.AuthUserChangePassword != nil: 192 op = "AuthUserChangePassword" 193 ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword) 194 case r.AuthUserGrantRole != nil: 195 op = "AuthUserGrantRole" 196 ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole) 197 case r.AuthUserGet != nil: 198 op = "AuthUserGet" 199 ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet) 200 case r.AuthUserRevokeRole != nil: 201 op = "AuthUserRevokeRole" 202 ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole) 203 case r.AuthRoleAdd != nil: 204 op = "AuthRoleAdd" 205 ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd) 206 case r.AuthRoleGrantPermission != nil: 207 op = "AuthRoleGrantPermission" 208 ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission) 209 case r.AuthRoleGet != nil: 210 op = "AuthRoleGet" 211 ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet) 212 case r.AuthRoleRevokePermission != nil: 213 op = "AuthRoleRevokePermission" 214 ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission) 215 case r.AuthRoleDelete != nil: 216 op = "AuthRoleDelete" 217 ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete) 218 case r.AuthUserList != nil: 219 op = "AuthUserList" 220 ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList) 221 case r.AuthRoleList != nil: 222 op = "AuthRoleList" 223 ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) 224 case r.ClusterVersionSet != nil: 225 op = "ClusterVersionSet" 226 a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) 227 case r.ClusterMemberAttrSet != nil: 228 op = "ClusterMemberAttrSet" 229 a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) 230 case r.DowngradeInfoSet != nil: 231 op = "DowngradeInfoSet" 232 a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet) 233 default: 234 panic("not implemented") 235 } 236 return ar 237} 238 239func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { 240 resp = &pb.PutResponse{} 241 resp.Header = &pb.ResponseHeader{} 242 trace = traceutil.Get(ctx) 243 // create put tracing if the trace in context is empty 244 if trace.IsEmpty() { 245 trace = traceutil.New("put", 246 a.s.getLogger(), 247 traceutil.Field{Key: "key", Value: string(p.Key)}, 248 traceutil.Field{Key: "req_size", Value: proto.Size(p)}, 249 ) 250 } 251 val, leaseID := p.Value, lease.LeaseID(p.Lease) 252 if txn == nil { 253 if leaseID != lease.NoLease { 254 if l := a.s.lessor.Lookup(leaseID); l == nil { 255 return nil, nil, lease.ErrLeaseNotFound 256 } 257 } 258 txn = a.s.KV().Write(trace) 259 defer txn.End() 260 } 261 262 var rr *mvcc.RangeResult 263 if p.IgnoreValue || p.IgnoreLease || p.PrevKv { 264 trace.StepWithFunction(func() { 265 rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) 266 }, "get previous kv pair") 267 268 if err != nil { 269 return nil, nil, err 270 } 271 } 272 if p.IgnoreValue || p.IgnoreLease { 273 if rr == nil || len(rr.KVs) == 0 { 274 // ignore_{lease,value} flag expects previous key-value pair 275 return nil, nil, ErrKeyNotFound 276 } 277 } 278 if p.IgnoreValue { 279 val = rr.KVs[0].Value 280 } 281 if p.IgnoreLease { 282 leaseID = lease.LeaseID(rr.KVs[0].Lease) 283 } 284 if p.PrevKv { 285 if rr != nil && len(rr.KVs) != 0 { 286 resp.PrevKv = &rr.KVs[0] 287 } 288 } 289 290 resp.Header.Revision = txn.Put(p.Key, val, leaseID) 291 trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) 292 return resp, trace, nil 293} 294 295func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { 296 resp := &pb.DeleteRangeResponse{} 297 resp.Header = &pb.ResponseHeader{} 298 end := mkGteRange(dr.RangeEnd) 299 300 if txn == nil { 301 txn = a.s.kv.Write(traceutil.TODO()) 302 defer txn.End() 303 } 304 305 if dr.PrevKv { 306 rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{}) 307 if err != nil { 308 return nil, err 309 } 310 if rr != nil { 311 resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs)) 312 for i := range rr.KVs { 313 resp.PrevKvs[i] = &rr.KVs[i] 314 } 315 } 316 } 317 318 resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end) 319 return resp, nil 320} 321 322func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { 323 trace := traceutil.Get(ctx) 324 325 resp := &pb.RangeResponse{} 326 resp.Header = &pb.ResponseHeader{} 327 328 if txn == nil { 329 txn = a.s.kv.Read(trace) 330 defer txn.End() 331 } 332 333 limit := r.Limit 334 if r.SortOrder != pb.RangeRequest_NONE || 335 r.MinModRevision != 0 || r.MaxModRevision != 0 || 336 r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 { 337 // fetch everything; sort and truncate afterwards 338 limit = 0 339 } 340 if limit > 0 { 341 // fetch one extra for 'more' flag 342 limit = limit + 1 343 } 344 345 ro := mvcc.RangeOptions{ 346 Limit: limit, 347 Rev: r.Revision, 348 Count: r.CountOnly, 349 } 350 351 rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) 352 if err != nil { 353 return nil, err 354 } 355 356 if r.MaxModRevision != 0 { 357 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision } 358 pruneKVs(rr, f) 359 } 360 if r.MinModRevision != 0 { 361 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision } 362 pruneKVs(rr, f) 363 } 364 if r.MaxCreateRevision != 0 { 365 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision } 366 pruneKVs(rr, f) 367 } 368 if r.MinCreateRevision != 0 { 369 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision } 370 pruneKVs(rr, f) 371 } 372 373 sortOrder := r.SortOrder 374 if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE { 375 // Since current mvcc.Range implementation returns results 376 // sorted by keys in lexiographically ascending order, 377 // sort ASCEND by default only when target is not 'KEY' 378 sortOrder = pb.RangeRequest_ASCEND 379 } 380 if sortOrder != pb.RangeRequest_NONE { 381 var sorter sort.Interface 382 switch { 383 case r.SortTarget == pb.RangeRequest_KEY: 384 sorter = &kvSortByKey{&kvSort{rr.KVs}} 385 case r.SortTarget == pb.RangeRequest_VERSION: 386 sorter = &kvSortByVersion{&kvSort{rr.KVs}} 387 case r.SortTarget == pb.RangeRequest_CREATE: 388 sorter = &kvSortByCreate{&kvSort{rr.KVs}} 389 case r.SortTarget == pb.RangeRequest_MOD: 390 sorter = &kvSortByMod{&kvSort{rr.KVs}} 391 case r.SortTarget == pb.RangeRequest_VALUE: 392 sorter = &kvSortByValue{&kvSort{rr.KVs}} 393 } 394 switch { 395 case sortOrder == pb.RangeRequest_ASCEND: 396 sort.Sort(sorter) 397 case sortOrder == pb.RangeRequest_DESCEND: 398 sort.Sort(sort.Reverse(sorter)) 399 } 400 } 401 402 if r.Limit > 0 && len(rr.KVs) > int(r.Limit) { 403 rr.KVs = rr.KVs[:r.Limit] 404 resp.More = true 405 } 406 trace.Step("filter and sort the key-value pairs") 407 resp.Header.Revision = rr.Rev 408 resp.Count = int64(rr.Count) 409 resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) 410 for i := range rr.KVs { 411 if r.KeysOnly { 412 rr.KVs[i].Value = nil 413 } 414 resp.Kvs[i] = &rr.KVs[i] 415 } 416 trace.Step("assemble the response") 417 return resp, nil 418} 419 420func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { 421 trace := traceutil.Get(ctx) 422 if trace.IsEmpty() { 423 trace = traceutil.New("transaction", a.s.getLogger()) 424 ctx = context.WithValue(ctx, traceutil.TraceKey, trace) 425 } 426 isWrite := !isTxnReadonly(rt) 427 txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace)) 428 429 var txnPath []bool 430 trace.StepWithFunction( 431 func() { 432 txnPath = compareToPath(txn, rt) 433 }, 434 "compare", 435 ) 436 437 if isWrite { 438 trace.AddField(traceutil.Field{Key: "read_only", Value: false}) 439 if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil { 440 txn.End() 441 return nil, nil, err 442 } 443 } 444 if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil { 445 txn.End() 446 return nil, nil, err 447 } 448 trace.Step("check requests") 449 txnResp, _ := newTxnResp(rt, txnPath) 450 451 // When executing mutable txn ops, etcd must hold the txn lock so 452 // readers do not see any intermediate results. Since writes are 453 // serialized on the raft loop, the revision in the read view will 454 // be the revision of the write txn. 455 if isWrite { 456 txn.End() 457 txn = a.s.KV().Write(trace) 458 } 459 a.applyTxn(ctx, txn, rt, txnPath, txnResp) 460 rev := txn.Rev() 461 if len(txn.Changes()) != 0 { 462 rev++ 463 } 464 txn.End() 465 466 txnResp.Header.Revision = rev 467 trace.AddField( 468 traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)}, 469 traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision}, 470 ) 471 return txnResp, trace, nil 472} 473 474// newTxnResp allocates a txn response for a txn request given a path. 475func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) { 476 reqs := rt.Success 477 if !txnPath[0] { 478 reqs = rt.Failure 479 } 480 resps := make([]*pb.ResponseOp, len(reqs)) 481 txnResp = &pb.TxnResponse{ 482 Responses: resps, 483 Succeeded: txnPath[0], 484 Header: &pb.ResponseHeader{}, 485 } 486 for i, req := range reqs { 487 switch tv := req.Request.(type) { 488 case *pb.RequestOp_RequestRange: 489 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}} 490 case *pb.RequestOp_RequestPut: 491 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}} 492 case *pb.RequestOp_RequestDeleteRange: 493 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}} 494 case *pb.RequestOp_RequestTxn: 495 resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:]) 496 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}} 497 txnPath = txnPath[1+txns:] 498 txnCount += txns + 1 499 default: 500 } 501 } 502 return txnResp, txnCount 503} 504 505func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool { 506 txnPath := make([]bool, 1) 507 ops := rt.Success 508 if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] { 509 ops = rt.Failure 510 } 511 for _, op := range ops { 512 tv, ok := op.Request.(*pb.RequestOp_RequestTxn) 513 if !ok || tv.RequestTxn == nil { 514 continue 515 } 516 txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...) 517 } 518 return txnPath 519} 520 521func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool { 522 for _, c := range cmps { 523 if !applyCompare(rv, c) { 524 return false 525 } 526 } 527 return true 528} 529 530// applyCompare applies the compare request. 531// If the comparison succeeds, it returns true. Otherwise, returns false. 532func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool { 533 // TODO: possible optimizations 534 // * chunk reads for large ranges to conserve memory 535 // * rewrite rules for common patterns: 536 // ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0" 537 // * caching 538 rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{}) 539 if err != nil { 540 return false 541 } 542 if len(rr.KVs) == 0 { 543 if c.Target == pb.Compare_VALUE { 544 // Always fail if comparing a value on a key/keys that doesn't exist; 545 // nil == empty string in grpc; no way to represent missing value 546 return false 547 } 548 return compareKV(c, mvccpb.KeyValue{}) 549 } 550 for _, kv := range rr.KVs { 551 if !compareKV(c, kv) { 552 return false 553 } 554 } 555 return true 556} 557 558func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool { 559 var result int 560 rev := int64(0) 561 switch c.Target { 562 case pb.Compare_VALUE: 563 v := []byte{} 564 if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil { 565 v = tv.Value 566 } 567 result = bytes.Compare(ckv.Value, v) 568 case pb.Compare_CREATE: 569 if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil { 570 rev = tv.CreateRevision 571 } 572 result = compareInt64(ckv.CreateRevision, rev) 573 case pb.Compare_MOD: 574 if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil { 575 rev = tv.ModRevision 576 } 577 result = compareInt64(ckv.ModRevision, rev) 578 case pb.Compare_VERSION: 579 if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil { 580 rev = tv.Version 581 } 582 result = compareInt64(ckv.Version, rev) 583 case pb.Compare_LEASE: 584 if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil { 585 rev = tv.Lease 586 } 587 result = compareInt64(ckv.Lease, rev) 588 } 589 switch c.Result { 590 case pb.Compare_EQUAL: 591 return result == 0 592 case pb.Compare_NOT_EQUAL: 593 return result != 0 594 case pb.Compare_GREATER: 595 return result > 0 596 case pb.Compare_LESS: 597 return result < 0 598 } 599 return true 600} 601 602func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { 603 trace := traceutil.Get(ctx) 604 reqs := rt.Success 605 if !txnPath[0] { 606 reqs = rt.Failure 607 } 608 609 lg := a.s.getLogger() 610 for i, req := range reqs { 611 respi := tresp.Responses[i].Response 612 switch tv := req.Request.(type) { 613 case *pb.RequestOp_RequestRange: 614 trace.StartSubTrace( 615 traceutil.Field{Key: "req_type", Value: "range"}, 616 traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)}, 617 traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)}) 618 resp, err := a.Range(ctx, txn, tv.RequestRange) 619 if err != nil { 620 lg.Panic("unexpected error during txn", zap.Error(err)) 621 } 622 respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp 623 trace.StopSubTrace() 624 case *pb.RequestOp_RequestPut: 625 trace.StartSubTrace( 626 traceutil.Field{Key: "req_type", Value: "put"}, 627 traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)}, 628 traceutil.Field{Key: "req_size", Value: proto.Size(tv.RequestPut)}) 629 resp, _, err := a.Put(ctx, txn, tv.RequestPut) 630 if err != nil { 631 lg.Panic("unexpected error during txn", zap.Error(err)) 632 } 633 respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp 634 trace.StopSubTrace() 635 case *pb.RequestOp_RequestDeleteRange: 636 resp, err := a.DeleteRange(txn, tv.RequestDeleteRange) 637 if err != nil { 638 lg.Panic("unexpected error during txn", zap.Error(err)) 639 } 640 respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp 641 case *pb.RequestOp_RequestTxn: 642 resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn 643 applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp) 644 txns += applyTxns + 1 645 txnPath = txnPath[applyTxns+1:] 646 default: 647 // empty union 648 } 649 } 650 return txns 651} 652 653func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { 654 resp := &pb.CompactionResponse{} 655 resp.Header = &pb.ResponseHeader{} 656 trace := traceutil.New("compact", 657 a.s.getLogger(), 658 traceutil.Field{Key: "revision", Value: compaction.Revision}, 659 ) 660 661 ch, err := a.s.KV().Compact(trace, compaction.Revision) 662 if err != nil { 663 return nil, ch, nil, err 664 } 665 // get the current revision. which key to get is not important. 666 rr, _ := a.s.KV().Range(context.TODO(), []byte("compaction"), nil, mvcc.RangeOptions{}) 667 resp.Header.Revision = rr.Rev 668 return resp, ch, trace, err 669} 670 671func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 672 l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL) 673 resp := &pb.LeaseGrantResponse{} 674 if err == nil { 675 resp.ID = int64(l.ID) 676 resp.TTL = l.TTL() 677 resp.Header = newHeader(a.s) 678 } 679 return resp, err 680} 681 682func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { 683 err := a.s.lessor.Revoke(lease.LeaseID(lc.ID)) 684 return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err 685} 686 687func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) { 688 for _, c := range lc.Checkpoints { 689 err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL) 690 if err != nil { 691 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err 692 } 693 } 694 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil 695} 696 697func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { 698 resp := &pb.AlarmResponse{} 699 oldCount := len(a.s.alarmStore.Get(ar.Alarm)) 700 701 lg := a.s.getLogger() 702 switch ar.Action { 703 case pb.AlarmRequest_GET: 704 resp.Alarms = a.s.alarmStore.Get(ar.Alarm) 705 case pb.AlarmRequest_ACTIVATE: 706 m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm) 707 if m == nil { 708 break 709 } 710 resp.Alarms = append(resp.Alarms, m) 711 activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1 712 if !activated { 713 break 714 } 715 716 lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) 717 switch m.Alarm { 718 case pb.AlarmType_CORRUPT: 719 a.s.applyV3 = newApplierV3Corrupt(a) 720 case pb.AlarmType_NOSPACE: 721 a.s.applyV3 = newApplierV3Capped(a) 722 default: 723 lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m))) 724 } 725 case pb.AlarmRequest_DEACTIVATE: 726 m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm) 727 if m == nil { 728 break 729 } 730 resp.Alarms = append(resp.Alarms, m) 731 deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0 732 if !deactivated { 733 break 734 } 735 736 switch m.Alarm { 737 case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT: 738 // TODO: check kv hash before deactivating CORRUPT? 739 lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) 740 a.s.applyV3 = a.s.newApplierV3() 741 default: 742 lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m))) 743 } 744 default: 745 return nil, nil 746 } 747 return resp, nil 748} 749 750type applierV3Capped struct { 751 applierV3 752 q backendQuota 753} 754 755// newApplierV3Capped creates an applyV3 that will reject Puts and transactions 756// with Puts so that the number of keys in the store is capped. 757func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } 758 759func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { 760 return nil, nil, ErrNoSpace 761} 762 763func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { 764 if a.q.Cost(r) > 0 { 765 return nil, nil, ErrNoSpace 766 } 767 return a.applierV3.Txn(ctx, r) 768} 769 770func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 771 return nil, ErrNoSpace 772} 773 774func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) { 775 err := a.s.AuthStore().AuthEnable() 776 if err != nil { 777 return nil, err 778 } 779 return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil 780} 781 782func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) { 783 a.s.AuthStore().AuthDisable() 784 return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil 785} 786 787func (a *applierV3backend) AuthStatus() (*pb.AuthStatusResponse, error) { 788 enabled := a.s.AuthStore().IsAuthEnabled() 789 authRevision := a.s.AuthStore().Revision() 790 return &pb.AuthStatusResponse{Header: newHeader(a.s), Enabled: enabled, AuthRevision: authRevision}, nil 791} 792 793func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) { 794 ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken) 795 resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password) 796 if resp != nil { 797 resp.Header = newHeader(a.s) 798 } 799 return resp, err 800} 801 802func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { 803 resp, err := a.s.AuthStore().UserAdd(r) 804 if resp != nil { 805 resp.Header = newHeader(a.s) 806 } 807 return resp, err 808} 809 810func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { 811 resp, err := a.s.AuthStore().UserDelete(r) 812 if resp != nil { 813 resp.Header = newHeader(a.s) 814 } 815 return resp, err 816} 817 818func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { 819 resp, err := a.s.AuthStore().UserChangePassword(r) 820 if resp != nil { 821 resp.Header = newHeader(a.s) 822 } 823 return resp, err 824} 825 826func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { 827 resp, err := a.s.AuthStore().UserGrantRole(r) 828 if resp != nil { 829 resp.Header = newHeader(a.s) 830 } 831 return resp, err 832} 833 834func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { 835 resp, err := a.s.AuthStore().UserGet(r) 836 if resp != nil { 837 resp.Header = newHeader(a.s) 838 } 839 return resp, err 840} 841 842func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) { 843 resp, err := a.s.AuthStore().UserRevokeRole(r) 844 if resp != nil { 845 resp.Header = newHeader(a.s) 846 } 847 return resp, err 848} 849 850func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { 851 resp, err := a.s.AuthStore().RoleAdd(r) 852 if resp != nil { 853 resp.Header = newHeader(a.s) 854 } 855 return resp, err 856} 857 858func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { 859 resp, err := a.s.AuthStore().RoleGrantPermission(r) 860 if resp != nil { 861 resp.Header = newHeader(a.s) 862 } 863 return resp, err 864} 865 866func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { 867 resp, err := a.s.AuthStore().RoleGet(r) 868 if resp != nil { 869 resp.Header = newHeader(a.s) 870 } 871 return resp, err 872} 873 874func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { 875 resp, err := a.s.AuthStore().RoleRevokePermission(r) 876 if resp != nil { 877 resp.Header = newHeader(a.s) 878 } 879 return resp, err 880} 881 882func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) { 883 resp, err := a.s.AuthStore().RoleDelete(r) 884 if resp != nil { 885 resp.Header = newHeader(a.s) 886 } 887 return resp, err 888} 889 890func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { 891 resp, err := a.s.AuthStore().UserList(r) 892 if resp != nil { 893 resp.Header = newHeader(a.s) 894 } 895 return resp, err 896} 897 898func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { 899 resp, err := a.s.AuthStore().RoleList(r) 900 if resp != nil { 901 resp.Header = newHeader(a.s) 902 } 903 return resp, err 904} 905 906func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) { 907 a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability) 908} 909 910func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) { 911 a.s.cluster.UpdateAttributes( 912 types.ID(r.Member_ID), 913 membership.Attributes{ 914 Name: r.MemberAttributes.Name, 915 ClientURLs: r.MemberAttributes.ClientUrls, 916 }, 917 ) 918} 919 920func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) { 921 d := membership.DowngradeInfo{Enabled: false} 922 if r.Enabled { 923 d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} 924 } 925 a.s.cluster.SetDowngradeInfo(&d) 926} 927 928type quotaApplierV3 struct { 929 applierV3 930 q Quota 931} 932 933func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { 934 return "aApplierV3{app, NewBackendQuota(s, "v3-applier")} 935} 936 937func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { 938 ok := a.q.Available(p) 939 resp, trace, err := a.applierV3.Put(ctx, txn, p) 940 if err == nil && !ok { 941 err = ErrNoSpace 942 } 943 return resp, trace, err 944} 945 946func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { 947 ok := a.q.Available(rt) 948 resp, trace, err := a.applierV3.Txn(ctx, rt) 949 if err == nil && !ok { 950 err = ErrNoSpace 951 } 952 return resp, trace, err 953} 954 955func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 956 ok := a.q.Available(lc) 957 resp, err := a.applierV3.LeaseGrant(lc) 958 if err == nil && !ok { 959 err = ErrNoSpace 960 } 961 return resp, err 962} 963 964type kvSort struct{ kvs []mvccpb.KeyValue } 965 966func (s *kvSort) Swap(i, j int) { 967 t := s.kvs[i] 968 s.kvs[i] = s.kvs[j] 969 s.kvs[j] = t 970} 971func (s *kvSort) Len() int { return len(s.kvs) } 972 973type kvSortByKey struct{ *kvSort } 974 975func (s *kvSortByKey) Less(i, j int) bool { 976 return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0 977} 978 979type kvSortByVersion struct{ *kvSort } 980 981func (s *kvSortByVersion) Less(i, j int) bool { 982 return (s.kvs[i].Version - s.kvs[j].Version) < 0 983} 984 985type kvSortByCreate struct{ *kvSort } 986 987func (s *kvSortByCreate) Less(i, j int) bool { 988 return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0 989} 990 991type kvSortByMod struct{ *kvSort } 992 993func (s *kvSortByMod) Less(i, j int) bool { 994 return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0 995} 996 997type kvSortByValue struct{ *kvSort } 998 999func (s *kvSortByValue) Less(i, j int) bool { 1000 return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 1001} 1002 1003func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) { 1004 txnCount := 0 1005 reqs := rt.Success 1006 if !txnPath[0] { 1007 reqs = rt.Failure 1008 } 1009 for _, req := range reqs { 1010 if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil { 1011 txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f) 1012 if err != nil { 1013 return 0, err 1014 } 1015 txnCount += txns + 1 1016 txnPath = txnPath[txns+1:] 1017 continue 1018 } 1019 if err := f(rv, req); err != nil { 1020 return 0, err 1021 } 1022 } 1023 return txnCount, nil 1024} 1025 1026func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error { 1027 tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut) 1028 if !ok || tv.RequestPut == nil { 1029 return nil 1030 } 1031 req := tv.RequestPut 1032 if req.IgnoreValue || req.IgnoreLease { 1033 // expects previous key-value, error if not exist 1034 rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{}) 1035 if err != nil { 1036 return err 1037 } 1038 if rr == nil || len(rr.KVs) == 0 { 1039 return ErrKeyNotFound 1040 } 1041 } 1042 if lease.LeaseID(req.Lease) != lease.NoLease { 1043 if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil { 1044 return lease.ErrLeaseNotFound 1045 } 1046 } 1047 return nil 1048} 1049 1050func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error { 1051 tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange) 1052 if !ok || tv.RequestRange == nil { 1053 return nil 1054 } 1055 req := tv.RequestRange 1056 switch { 1057 case req.Revision == 0: 1058 return nil 1059 case req.Revision > rv.Rev(): 1060 return mvcc.ErrFutureRev 1061 case req.Revision < rv.FirstRev(): 1062 return mvcc.ErrCompacted 1063 } 1064 return nil 1065} 1066 1067func compareInt64(a, b int64) int { 1068 switch { 1069 case a < b: 1070 return -1 1071 case a > b: 1072 return 1 1073 default: 1074 return 0 1075 } 1076} 1077 1078// mkGteRange determines if the range end is a >= range. This works around grpc 1079// sending empty byte strings as nil; >= is encoded in the range end as '\0'. 1080// If it is a GTE range, then []byte{} is returned to indicate the empty byte 1081// string (vs nil being no byte string). 1082func mkGteRange(rangeEnd []byte) []byte { 1083 if len(rangeEnd) == 1 && rangeEnd[0] == 0 { 1084 return []byte{} 1085 } 1086 return rangeEnd 1087} 1088 1089func noSideEffect(r *pb.InternalRaftRequest) bool { 1090 return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil 1091} 1092 1093func removeNeedlessRangeReqs(txn *pb.TxnRequest) { 1094 f := func(ops []*pb.RequestOp) []*pb.RequestOp { 1095 j := 0 1096 for i := 0; i < len(ops); i++ { 1097 if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok { 1098 continue 1099 } 1100 ops[j] = ops[i] 1101 j++ 1102 } 1103 1104 return ops[:j] 1105 } 1106 1107 txn.Success = f(txn.Success) 1108 txn.Failure = f(txn.Failure) 1109} 1110 1111func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) { 1112 j := 0 1113 for i := range rr.KVs { 1114 rr.KVs[j] = rr.KVs[i] 1115 if !isPrunable(&rr.KVs[i]) { 1116 j++ 1117 } 1118 } 1119 rr.KVs = rr.KVs[:j] 1120} 1121 1122func newHeader(s *EtcdServer) *pb.ResponseHeader { 1123 return &pb.ResponseHeader{ 1124 ClusterId: uint64(s.Cluster().ID()), 1125 MemberId: uint64(s.ID()), 1126 Revision: s.KV().Rev(), 1127 RaftTerm: s.Term(), 1128 } 1129} 1130