1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "bytes" 19 "context" 20 "fmt" 21 "sort" 22 "strconv" 23 "time" 24 25 "github.com/coreos/go-semver/semver" 26 pb "go.etcd.io/etcd/api/v3/etcdserverpb" 27 "go.etcd.io/etcd/api/v3/membershippb" 28 "go.etcd.io/etcd/api/v3/mvccpb" 29 "go.etcd.io/etcd/client/pkg/v3/types" 30 "go.etcd.io/etcd/pkg/v3/traceutil" 31 "go.etcd.io/etcd/server/v3/auth" 32 "go.etcd.io/etcd/server/v3/etcdserver/api" 33 "go.etcd.io/etcd/server/v3/etcdserver/api/membership" 34 "go.etcd.io/etcd/server/v3/lease" 35 "go.etcd.io/etcd/server/v3/mvcc" 36 37 "github.com/gogo/protobuf/proto" 38 "go.uber.org/zap" 39) 40 41const ( 42 v3Version = "v3" 43) 44 45type applyResult struct { 46 resp proto.Message 47 err error 48 // physc signals the physical effect of the request has completed in addition 49 // to being logically reflected by the node. Currently only used for 50 // Compaction requests. 51 physc <-chan struct{} 52 trace *traceutil.Trace 53} 54 55// applierV3Internal is the interface for processing internal V3 raft request 56type applierV3Internal interface { 57 ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) 58 ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) 59 DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) 60} 61 62// applierV3 is the interface for processing V3 raft messages 63type applierV3 interface { 64 Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult 65 66 Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) 67 Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) 68 DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) 69 Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) 70 Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) 71 72 LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) 73 LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) 74 75 LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) 76 77 Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error) 78 79 Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) 80 81 AuthEnable() (*pb.AuthEnableResponse, error) 82 AuthDisable() (*pb.AuthDisableResponse, error) 83 AuthStatus() (*pb.AuthStatusResponse, error) 84 85 UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) 86 UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) 87 UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) 88 UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) 89 UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) 90 UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) 91 RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) 92 RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) 93 RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) 94 RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) 95 RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) 96 UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) 97 RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) 98} 99 100type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error 101 102type applierV3backend struct { 103 s *EtcdServer 104 105 checkPut checkReqFunc 106 checkRange checkReqFunc 107} 108 109func (s *EtcdServer) newApplierV3Backend() applierV3 { 110 base := &applierV3backend{s: s} 111 base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error { 112 return base.checkRequestPut(rv, req) 113 } 114 base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error { 115 return base.checkRequestRange(rv, req) 116 } 117 return base 118} 119 120func (s *EtcdServer) newApplierV3Internal() applierV3Internal { 121 base := &applierV3backend{s: s} 122 return base 123} 124 125func (s *EtcdServer) newApplierV3() applierV3 { 126 return newAuthApplierV3( 127 s.AuthStore(), 128 newQuotaApplierV3(s, s.newApplierV3Backend()), 129 s.lessor, 130 ) 131} 132 133func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult { 134 op := "unknown" 135 ar := &applyResult{} 136 defer func(start time.Time) { 137 success := ar.err == nil || ar.err == mvcc.ErrCompacted 138 applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds()) 139 warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) 140 if !success { 141 warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) 142 } 143 }(time.Now()) 144 145 switch { 146 case r.ClusterVersionSet != nil: // Implemented in 3.5.x 147 op = "ClusterVersionSet" 148 a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3) 149 return nil 150 case r.ClusterMemberAttrSet != nil: 151 op = "ClusterMemberAttrSet" // Implemented in 3.5.x 152 a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3) 153 return nil 154 case r.DowngradeInfoSet != nil: 155 op = "DowngradeInfoSet" // Implemented in 3.5.x 156 a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) 157 return nil 158 } 159 160 if !shouldApplyV3 { 161 return nil 162 } 163 164 // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls 165 switch { 166 case r.Range != nil: 167 op = "Range" 168 ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) 169 case r.Put != nil: 170 op = "Put" 171 ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put) 172 case r.DeleteRange != nil: 173 op = "DeleteRange" 174 ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) 175 case r.Txn != nil: 176 op = "Txn" 177 ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn) 178 case r.Compaction != nil: 179 op = "Compaction" 180 ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) 181 case r.LeaseGrant != nil: 182 op = "LeaseGrant" 183 ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) 184 case r.LeaseRevoke != nil: 185 op = "LeaseRevoke" 186 ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke) 187 case r.LeaseCheckpoint != nil: 188 op = "LeaseCheckpoint" 189 ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint) 190 case r.Alarm != nil: 191 op = "Alarm" 192 ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm) 193 case r.Authenticate != nil: 194 op = "Authenticate" 195 ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate) 196 case r.AuthEnable != nil: 197 op = "AuthEnable" 198 ar.resp, ar.err = a.s.applyV3.AuthEnable() 199 case r.AuthDisable != nil: 200 op = "AuthDisable" 201 ar.resp, ar.err = a.s.applyV3.AuthDisable() 202 case r.AuthStatus != nil: 203 ar.resp, ar.err = a.s.applyV3.AuthStatus() 204 case r.AuthUserAdd != nil: 205 op = "AuthUserAdd" 206 ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd) 207 case r.AuthUserDelete != nil: 208 op = "AuthUserDelete" 209 ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete) 210 case r.AuthUserChangePassword != nil: 211 op = "AuthUserChangePassword" 212 ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword) 213 case r.AuthUserGrantRole != nil: 214 op = "AuthUserGrantRole" 215 ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole) 216 case r.AuthUserGet != nil: 217 op = "AuthUserGet" 218 ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet) 219 case r.AuthUserRevokeRole != nil: 220 op = "AuthUserRevokeRole" 221 ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole) 222 case r.AuthRoleAdd != nil: 223 op = "AuthRoleAdd" 224 ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd) 225 case r.AuthRoleGrantPermission != nil: 226 op = "AuthRoleGrantPermission" 227 ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission) 228 case r.AuthRoleGet != nil: 229 op = "AuthRoleGet" 230 ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet) 231 case r.AuthRoleRevokePermission != nil: 232 op = "AuthRoleRevokePermission" 233 ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission) 234 case r.AuthRoleDelete != nil: 235 op = "AuthRoleDelete" 236 ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete) 237 case r.AuthUserList != nil: 238 op = "AuthUserList" 239 ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList) 240 case r.AuthRoleList != nil: 241 op = "AuthRoleList" 242 ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) 243 default: 244 a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) 245 } 246 return ar 247} 248 249func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { 250 resp = &pb.PutResponse{} 251 resp.Header = &pb.ResponseHeader{} 252 trace = traceutil.Get(ctx) 253 // create put tracing if the trace in context is empty 254 if trace.IsEmpty() { 255 trace = traceutil.New("put", 256 a.s.Logger(), 257 traceutil.Field{Key: "key", Value: string(p.Key)}, 258 traceutil.Field{Key: "req_size", Value: p.Size()}, 259 ) 260 } 261 val, leaseID := p.Value, lease.LeaseID(p.Lease) 262 if txn == nil { 263 if leaseID != lease.NoLease { 264 if l := a.s.lessor.Lookup(leaseID); l == nil { 265 return nil, nil, lease.ErrLeaseNotFound 266 } 267 } 268 txn = a.s.KV().Write(trace) 269 defer txn.End() 270 } 271 272 var rr *mvcc.RangeResult 273 if p.IgnoreValue || p.IgnoreLease || p.PrevKv { 274 trace.StepWithFunction(func() { 275 rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) 276 }, "get previous kv pair") 277 278 if err != nil { 279 return nil, nil, err 280 } 281 } 282 if p.IgnoreValue || p.IgnoreLease { 283 if rr == nil || len(rr.KVs) == 0 { 284 // ignore_{lease,value} flag expects previous key-value pair 285 return nil, nil, ErrKeyNotFound 286 } 287 } 288 if p.IgnoreValue { 289 val = rr.KVs[0].Value 290 } 291 if p.IgnoreLease { 292 leaseID = lease.LeaseID(rr.KVs[0].Lease) 293 } 294 if p.PrevKv { 295 if rr != nil && len(rr.KVs) != 0 { 296 resp.PrevKv = &rr.KVs[0] 297 } 298 } 299 300 resp.Header.Revision = txn.Put(p.Key, val, leaseID) 301 trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) 302 return resp, trace, nil 303} 304 305func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { 306 resp := &pb.DeleteRangeResponse{} 307 resp.Header = &pb.ResponseHeader{} 308 end := mkGteRange(dr.RangeEnd) 309 310 if txn == nil { 311 txn = a.s.kv.Write(traceutil.TODO()) 312 defer txn.End() 313 } 314 315 if dr.PrevKv { 316 rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{}) 317 if err != nil { 318 return nil, err 319 } 320 if rr != nil { 321 resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs)) 322 for i := range rr.KVs { 323 resp.PrevKvs[i] = &rr.KVs[i] 324 } 325 } 326 } 327 328 resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end) 329 return resp, nil 330} 331 332func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { 333 trace := traceutil.Get(ctx) 334 335 resp := &pb.RangeResponse{} 336 resp.Header = &pb.ResponseHeader{} 337 338 if txn == nil { 339 txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace) 340 defer txn.End() 341 } 342 343 limit := r.Limit 344 if r.SortOrder != pb.RangeRequest_NONE || 345 r.MinModRevision != 0 || r.MaxModRevision != 0 || 346 r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 { 347 // fetch everything; sort and truncate afterwards 348 limit = 0 349 } 350 if limit > 0 { 351 // fetch one extra for 'more' flag 352 limit = limit + 1 353 } 354 355 ro := mvcc.RangeOptions{ 356 Limit: limit, 357 Rev: r.Revision, 358 Count: r.CountOnly, 359 } 360 361 rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) 362 if err != nil { 363 return nil, err 364 } 365 366 if r.MaxModRevision != 0 { 367 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision } 368 pruneKVs(rr, f) 369 } 370 if r.MinModRevision != 0 { 371 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision } 372 pruneKVs(rr, f) 373 } 374 if r.MaxCreateRevision != 0 { 375 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision } 376 pruneKVs(rr, f) 377 } 378 if r.MinCreateRevision != 0 { 379 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision } 380 pruneKVs(rr, f) 381 } 382 383 sortOrder := r.SortOrder 384 if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE { 385 // Since current mvcc.Range implementation returns results 386 // sorted by keys in lexiographically ascending order, 387 // sort ASCEND by default only when target is not 'KEY' 388 sortOrder = pb.RangeRequest_ASCEND 389 } 390 if sortOrder != pb.RangeRequest_NONE { 391 var sorter sort.Interface 392 switch { 393 case r.SortTarget == pb.RangeRequest_KEY: 394 sorter = &kvSortByKey{&kvSort{rr.KVs}} 395 case r.SortTarget == pb.RangeRequest_VERSION: 396 sorter = &kvSortByVersion{&kvSort{rr.KVs}} 397 case r.SortTarget == pb.RangeRequest_CREATE: 398 sorter = &kvSortByCreate{&kvSort{rr.KVs}} 399 case r.SortTarget == pb.RangeRequest_MOD: 400 sorter = &kvSortByMod{&kvSort{rr.KVs}} 401 case r.SortTarget == pb.RangeRequest_VALUE: 402 sorter = &kvSortByValue{&kvSort{rr.KVs}} 403 } 404 switch { 405 case sortOrder == pb.RangeRequest_ASCEND: 406 sort.Sort(sorter) 407 case sortOrder == pb.RangeRequest_DESCEND: 408 sort.Sort(sort.Reverse(sorter)) 409 } 410 } 411 412 if r.Limit > 0 && len(rr.KVs) > int(r.Limit) { 413 rr.KVs = rr.KVs[:r.Limit] 414 resp.More = true 415 } 416 trace.Step("filter and sort the key-value pairs") 417 resp.Header.Revision = rr.Rev 418 resp.Count = int64(rr.Count) 419 resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) 420 for i := range rr.KVs { 421 if r.KeysOnly { 422 rr.KVs[i].Value = nil 423 } 424 resp.Kvs[i] = &rr.KVs[i] 425 } 426 trace.Step("assemble the response") 427 return resp, nil 428} 429 430func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { 431 trace := traceutil.Get(ctx) 432 if trace.IsEmpty() { 433 trace = traceutil.New("transaction", a.s.Logger()) 434 ctx = context.WithValue(ctx, traceutil.TraceKey, trace) 435 } 436 isWrite := !isTxnReadonly(rt) 437 438 // When the transaction contains write operations, we use ReadTx instead of 439 // ConcurrentReadTx to avoid extra overhead of copying buffer. 440 var txn mvcc.TxnWrite 441 if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer { 442 txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace)) 443 } else { 444 txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace)) 445 } 446 447 var txnPath []bool 448 trace.StepWithFunction( 449 func() { 450 txnPath = compareToPath(txn, rt) 451 }, 452 "compare", 453 ) 454 455 if isWrite { 456 trace.AddField(traceutil.Field{Key: "read_only", Value: false}) 457 if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil { 458 txn.End() 459 return nil, nil, err 460 } 461 } 462 if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil { 463 txn.End() 464 return nil, nil, err 465 } 466 trace.Step("check requests") 467 txnResp, _ := newTxnResp(rt, txnPath) 468 469 // When executing mutable txn ops, etcd must hold the txn lock so 470 // readers do not see any intermediate results. Since writes are 471 // serialized on the raft loop, the revision in the read view will 472 // be the revision of the write txn. 473 if isWrite { 474 txn.End() 475 txn = a.s.KV().Write(trace) 476 } 477 a.applyTxn(ctx, txn, rt, txnPath, txnResp) 478 rev := txn.Rev() 479 if len(txn.Changes()) != 0 { 480 rev++ 481 } 482 txn.End() 483 484 txnResp.Header.Revision = rev 485 trace.AddField( 486 traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)}, 487 traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision}, 488 ) 489 return txnResp, trace, nil 490} 491 492// newTxnResp allocates a txn response for a txn request given a path. 493func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) { 494 reqs := rt.Success 495 if !txnPath[0] { 496 reqs = rt.Failure 497 } 498 resps := make([]*pb.ResponseOp, len(reqs)) 499 txnResp = &pb.TxnResponse{ 500 Responses: resps, 501 Succeeded: txnPath[0], 502 Header: &pb.ResponseHeader{}, 503 } 504 for i, req := range reqs { 505 switch tv := req.Request.(type) { 506 case *pb.RequestOp_RequestRange: 507 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}} 508 case *pb.RequestOp_RequestPut: 509 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}} 510 case *pb.RequestOp_RequestDeleteRange: 511 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}} 512 case *pb.RequestOp_RequestTxn: 513 resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:]) 514 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}} 515 txnPath = txnPath[1+txns:] 516 txnCount += txns + 1 517 default: 518 } 519 } 520 return txnResp, txnCount 521} 522 523func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool { 524 txnPath := make([]bool, 1) 525 ops := rt.Success 526 if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] { 527 ops = rt.Failure 528 } 529 for _, op := range ops { 530 tv, ok := op.Request.(*pb.RequestOp_RequestTxn) 531 if !ok || tv.RequestTxn == nil { 532 continue 533 } 534 txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...) 535 } 536 return txnPath 537} 538 539func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool { 540 for _, c := range cmps { 541 if !applyCompare(rv, c) { 542 return false 543 } 544 } 545 return true 546} 547 548// applyCompare applies the compare request. 549// If the comparison succeeds, it returns true. Otherwise, returns false. 550func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool { 551 // TODO: possible optimizations 552 // * chunk reads for large ranges to conserve memory 553 // * rewrite rules for common patterns: 554 // ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0" 555 // * caching 556 rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{}) 557 if err != nil { 558 return false 559 } 560 if len(rr.KVs) == 0 { 561 if c.Target == pb.Compare_VALUE { 562 // Always fail if comparing a value on a key/keys that doesn't exist; 563 // nil == empty string in grpc; no way to represent missing value 564 return false 565 } 566 return compareKV(c, mvccpb.KeyValue{}) 567 } 568 for _, kv := range rr.KVs { 569 if !compareKV(c, kv) { 570 return false 571 } 572 } 573 return true 574} 575 576func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool { 577 var result int 578 rev := int64(0) 579 switch c.Target { 580 case pb.Compare_VALUE: 581 v := []byte{} 582 if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil { 583 v = tv.Value 584 } 585 result = bytes.Compare(ckv.Value, v) 586 case pb.Compare_CREATE: 587 if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil { 588 rev = tv.CreateRevision 589 } 590 result = compareInt64(ckv.CreateRevision, rev) 591 case pb.Compare_MOD: 592 if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil { 593 rev = tv.ModRevision 594 } 595 result = compareInt64(ckv.ModRevision, rev) 596 case pb.Compare_VERSION: 597 if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil { 598 rev = tv.Version 599 } 600 result = compareInt64(ckv.Version, rev) 601 case pb.Compare_LEASE: 602 if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil { 603 rev = tv.Lease 604 } 605 result = compareInt64(ckv.Lease, rev) 606 } 607 switch c.Result { 608 case pb.Compare_EQUAL: 609 return result == 0 610 case pb.Compare_NOT_EQUAL: 611 return result != 0 612 case pb.Compare_GREATER: 613 return result > 0 614 case pb.Compare_LESS: 615 return result < 0 616 } 617 return true 618} 619 620func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { 621 trace := traceutil.Get(ctx) 622 reqs := rt.Success 623 if !txnPath[0] { 624 reqs = rt.Failure 625 } 626 627 lg := a.s.Logger() 628 for i, req := range reqs { 629 respi := tresp.Responses[i].Response 630 switch tv := req.Request.(type) { 631 case *pb.RequestOp_RequestRange: 632 trace.StartSubTrace( 633 traceutil.Field{Key: "req_type", Value: "range"}, 634 traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)}, 635 traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)}) 636 resp, err := a.Range(ctx, txn, tv.RequestRange) 637 if err != nil { 638 lg.Panic("unexpected error during txn", zap.Error(err)) 639 } 640 respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp 641 trace.StopSubTrace() 642 case *pb.RequestOp_RequestPut: 643 trace.StartSubTrace( 644 traceutil.Field{Key: "req_type", Value: "put"}, 645 traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)}, 646 traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()}) 647 resp, _, err := a.Put(ctx, txn, tv.RequestPut) 648 if err != nil { 649 lg.Panic("unexpected error during txn", zap.Error(err)) 650 } 651 respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp 652 trace.StopSubTrace() 653 case *pb.RequestOp_RequestDeleteRange: 654 resp, err := a.DeleteRange(txn, tv.RequestDeleteRange) 655 if err != nil { 656 lg.Panic("unexpected error during txn", zap.Error(err)) 657 } 658 respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp 659 case *pb.RequestOp_RequestTxn: 660 resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn 661 applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp) 662 txns += applyTxns + 1 663 txnPath = txnPath[applyTxns+1:] 664 default: 665 // empty union 666 } 667 } 668 return txns 669} 670 671func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { 672 resp := &pb.CompactionResponse{} 673 resp.Header = &pb.ResponseHeader{} 674 trace := traceutil.New("compact", 675 a.s.Logger(), 676 traceutil.Field{Key: "revision", Value: compaction.Revision}, 677 ) 678 679 ch, err := a.s.KV().Compact(trace, compaction.Revision) 680 if err != nil { 681 return nil, ch, nil, err 682 } 683 // get the current revision. which key to get is not important. 684 rr, _ := a.s.KV().Range(context.TODO(), []byte("compaction"), nil, mvcc.RangeOptions{}) 685 resp.Header.Revision = rr.Rev 686 return resp, ch, trace, err 687} 688 689func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 690 l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL) 691 resp := &pb.LeaseGrantResponse{} 692 if err == nil { 693 resp.ID = int64(l.ID) 694 resp.TTL = l.TTL() 695 resp.Header = newHeader(a.s) 696 } 697 return resp, err 698} 699 700func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { 701 err := a.s.lessor.Revoke(lease.LeaseID(lc.ID)) 702 return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err 703} 704 705func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) { 706 for _, c := range lc.Checkpoints { 707 err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL) 708 if err != nil { 709 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err 710 } 711 } 712 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil 713} 714 715func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { 716 resp := &pb.AlarmResponse{} 717 oldCount := len(a.s.alarmStore.Get(ar.Alarm)) 718 719 lg := a.s.Logger() 720 switch ar.Action { 721 case pb.AlarmRequest_GET: 722 resp.Alarms = a.s.alarmStore.Get(ar.Alarm) 723 case pb.AlarmRequest_ACTIVATE: 724 if ar.Alarm == pb.AlarmType_NONE { 725 break 726 } 727 m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm) 728 if m == nil { 729 break 730 } 731 resp.Alarms = append(resp.Alarms, m) 732 activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1 733 if !activated { 734 break 735 } 736 737 lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) 738 switch m.Alarm { 739 case pb.AlarmType_CORRUPT: 740 a.s.applyV3 = newApplierV3Corrupt(a) 741 case pb.AlarmType_NOSPACE: 742 a.s.applyV3 = newApplierV3Capped(a) 743 default: 744 lg.Panic("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m))) 745 } 746 case pb.AlarmRequest_DEACTIVATE: 747 m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm) 748 if m == nil { 749 break 750 } 751 resp.Alarms = append(resp.Alarms, m) 752 deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0 753 if !deactivated { 754 break 755 } 756 757 switch m.Alarm { 758 case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT: 759 // TODO: check kv hash before deactivating CORRUPT? 760 lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String())) 761 a.s.applyV3 = a.s.newApplierV3() 762 default: 763 lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m))) 764 } 765 default: 766 return nil, nil 767 } 768 return resp, nil 769} 770 771type applierV3Capped struct { 772 applierV3 773 q backendQuota 774} 775 776// newApplierV3Capped creates an applyV3 that will reject Puts and transactions 777// with Puts so that the number of keys in the store is capped. 778func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } 779 780func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { 781 return nil, nil, ErrNoSpace 782} 783 784func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { 785 if a.q.Cost(r) > 0 { 786 return nil, nil, ErrNoSpace 787 } 788 return a.applierV3.Txn(ctx, r) 789} 790 791func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 792 return nil, ErrNoSpace 793} 794 795func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) { 796 err := a.s.AuthStore().AuthEnable() 797 if err != nil { 798 return nil, err 799 } 800 return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil 801} 802 803func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) { 804 a.s.AuthStore().AuthDisable() 805 return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil 806} 807 808func (a *applierV3backend) AuthStatus() (*pb.AuthStatusResponse, error) { 809 enabled := a.s.AuthStore().IsAuthEnabled() 810 authRevision := a.s.AuthStore().Revision() 811 return &pb.AuthStatusResponse{Header: newHeader(a.s), Enabled: enabled, AuthRevision: authRevision}, nil 812} 813 814func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) { 815 ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken) 816 resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password) 817 if resp != nil { 818 resp.Header = newHeader(a.s) 819 } 820 return resp, err 821} 822 823func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { 824 resp, err := a.s.AuthStore().UserAdd(r) 825 if resp != nil { 826 resp.Header = newHeader(a.s) 827 } 828 return resp, err 829} 830 831func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { 832 resp, err := a.s.AuthStore().UserDelete(r) 833 if resp != nil { 834 resp.Header = newHeader(a.s) 835 } 836 return resp, err 837} 838 839func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { 840 resp, err := a.s.AuthStore().UserChangePassword(r) 841 if resp != nil { 842 resp.Header = newHeader(a.s) 843 } 844 return resp, err 845} 846 847func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { 848 resp, err := a.s.AuthStore().UserGrantRole(r) 849 if resp != nil { 850 resp.Header = newHeader(a.s) 851 } 852 return resp, err 853} 854 855func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { 856 resp, err := a.s.AuthStore().UserGet(r) 857 if resp != nil { 858 resp.Header = newHeader(a.s) 859 } 860 return resp, err 861} 862 863func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) { 864 resp, err := a.s.AuthStore().UserRevokeRole(r) 865 if resp != nil { 866 resp.Header = newHeader(a.s) 867 } 868 return resp, err 869} 870 871func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { 872 resp, err := a.s.AuthStore().RoleAdd(r) 873 if resp != nil { 874 resp.Header = newHeader(a.s) 875 } 876 return resp, err 877} 878 879func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { 880 resp, err := a.s.AuthStore().RoleGrantPermission(r) 881 if resp != nil { 882 resp.Header = newHeader(a.s) 883 } 884 return resp, err 885} 886 887func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { 888 resp, err := a.s.AuthStore().RoleGet(r) 889 if resp != nil { 890 resp.Header = newHeader(a.s) 891 } 892 return resp, err 893} 894 895func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { 896 resp, err := a.s.AuthStore().RoleRevokePermission(r) 897 if resp != nil { 898 resp.Header = newHeader(a.s) 899 } 900 return resp, err 901} 902 903func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) { 904 resp, err := a.s.AuthStore().RoleDelete(r) 905 if resp != nil { 906 resp.Header = newHeader(a.s) 907 } 908 return resp, err 909} 910 911func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { 912 resp, err := a.s.AuthStore().UserList(r) 913 if resp != nil { 914 resp.Header = newHeader(a.s) 915 } 916 return resp, err 917} 918 919func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { 920 resp, err := a.s.AuthStore().RoleList(r) 921 if resp != nil { 922 resp.Header = newHeader(a.s) 923 } 924 return resp, err 925} 926 927func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) { 928 a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3) 929} 930 931func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) { 932 a.s.cluster.UpdateAttributes( 933 types.ID(r.Member_ID), 934 membership.Attributes{ 935 Name: r.MemberAttributes.Name, 936 ClientURLs: r.MemberAttributes.ClientUrls, 937 }, 938 shouldApplyV3, 939 ) 940} 941 942func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) { 943 d := membership.DowngradeInfo{Enabled: false} 944 if r.Enabled { 945 d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver} 946 } 947 a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3) 948} 949 950type quotaApplierV3 struct { 951 applierV3 952 q Quota 953} 954 955func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { 956 return "aApplierV3{app, NewBackendQuota(s, "v3-applier")} 957} 958 959func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { 960 ok := a.q.Available(p) 961 resp, trace, err := a.applierV3.Put(ctx, txn, p) 962 if err == nil && !ok { 963 err = ErrNoSpace 964 } 965 return resp, trace, err 966} 967 968func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { 969 ok := a.q.Available(rt) 970 resp, trace, err := a.applierV3.Txn(ctx, rt) 971 if err == nil && !ok { 972 err = ErrNoSpace 973 } 974 return resp, trace, err 975} 976 977func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 978 ok := a.q.Available(lc) 979 resp, err := a.applierV3.LeaseGrant(lc) 980 if err == nil && !ok { 981 err = ErrNoSpace 982 } 983 return resp, err 984} 985 986type kvSort struct{ kvs []mvccpb.KeyValue } 987 988func (s *kvSort) Swap(i, j int) { 989 t := s.kvs[i] 990 s.kvs[i] = s.kvs[j] 991 s.kvs[j] = t 992} 993func (s *kvSort) Len() int { return len(s.kvs) } 994 995type kvSortByKey struct{ *kvSort } 996 997func (s *kvSortByKey) Less(i, j int) bool { 998 return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0 999} 1000 1001type kvSortByVersion struct{ *kvSort } 1002 1003func (s *kvSortByVersion) Less(i, j int) bool { 1004 return (s.kvs[i].Version - s.kvs[j].Version) < 0 1005} 1006 1007type kvSortByCreate struct{ *kvSort } 1008 1009func (s *kvSortByCreate) Less(i, j int) bool { 1010 return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0 1011} 1012 1013type kvSortByMod struct{ *kvSort } 1014 1015func (s *kvSortByMod) Less(i, j int) bool { 1016 return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0 1017} 1018 1019type kvSortByValue struct{ *kvSort } 1020 1021func (s *kvSortByValue) Less(i, j int) bool { 1022 return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0 1023} 1024 1025func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) { 1026 txnCount := 0 1027 reqs := rt.Success 1028 if !txnPath[0] { 1029 reqs = rt.Failure 1030 } 1031 for _, req := range reqs { 1032 if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil { 1033 txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f) 1034 if err != nil { 1035 return 0, err 1036 } 1037 txnCount += txns + 1 1038 txnPath = txnPath[txns+1:] 1039 continue 1040 } 1041 if err := f(rv, req); err != nil { 1042 return 0, err 1043 } 1044 } 1045 return txnCount, nil 1046} 1047 1048func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error { 1049 tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut) 1050 if !ok || tv.RequestPut == nil { 1051 return nil 1052 } 1053 req := tv.RequestPut 1054 if req.IgnoreValue || req.IgnoreLease { 1055 // expects previous key-value, error if not exist 1056 rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{}) 1057 if err != nil { 1058 return err 1059 } 1060 if rr == nil || len(rr.KVs) == 0 { 1061 return ErrKeyNotFound 1062 } 1063 } 1064 if lease.LeaseID(req.Lease) != lease.NoLease { 1065 if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil { 1066 return lease.ErrLeaseNotFound 1067 } 1068 } 1069 return nil 1070} 1071 1072func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error { 1073 tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange) 1074 if !ok || tv.RequestRange == nil { 1075 return nil 1076 } 1077 req := tv.RequestRange 1078 switch { 1079 case req.Revision == 0: 1080 return nil 1081 case req.Revision > rv.Rev(): 1082 return mvcc.ErrFutureRev 1083 case req.Revision < rv.FirstRev(): 1084 return mvcc.ErrCompacted 1085 } 1086 return nil 1087} 1088 1089func compareInt64(a, b int64) int { 1090 switch { 1091 case a < b: 1092 return -1 1093 case a > b: 1094 return 1 1095 default: 1096 return 0 1097 } 1098} 1099 1100// mkGteRange determines if the range end is a >= range. This works around grpc 1101// sending empty byte strings as nil; >= is encoded in the range end as '\0'. 1102// If it is a GTE range, then []byte{} is returned to indicate the empty byte 1103// string (vs nil being no byte string). 1104func mkGteRange(rangeEnd []byte) []byte { 1105 if len(rangeEnd) == 1 && rangeEnd[0] == 0 { 1106 return []byte{} 1107 } 1108 return rangeEnd 1109} 1110 1111func noSideEffect(r *pb.InternalRaftRequest) bool { 1112 return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil 1113} 1114 1115func removeNeedlessRangeReqs(txn *pb.TxnRequest) { 1116 f := func(ops []*pb.RequestOp) []*pb.RequestOp { 1117 j := 0 1118 for i := 0; i < len(ops); i++ { 1119 if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok { 1120 continue 1121 } 1122 ops[j] = ops[i] 1123 j++ 1124 } 1125 1126 return ops[:j] 1127 } 1128 1129 txn.Success = f(txn.Success) 1130 txn.Failure = f(txn.Failure) 1131} 1132 1133func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) { 1134 j := 0 1135 for i := range rr.KVs { 1136 rr.KVs[j] = rr.KVs[i] 1137 if !isPrunable(&rr.KVs[i]) { 1138 j++ 1139 } 1140 } 1141 rr.KVs = rr.KVs[:j] 1142} 1143 1144func newHeader(s *EtcdServer) *pb.ResponseHeader { 1145 return &pb.ResponseHeader{ 1146 ClusterId: uint64(s.Cluster().ID()), 1147 MemberId: uint64(s.ID()), 1148 Revision: s.KV().Rev(), 1149 RaftTerm: s.Term(), 1150 } 1151} 1152