1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"bytes"
19	"context"
20	"fmt"
21	"sort"
22	"strconv"
23	"time"
24
25	"github.com/coreos/go-semver/semver"
26	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
27	"go.etcd.io/etcd/api/v3/membershippb"
28	"go.etcd.io/etcd/api/v3/mvccpb"
29	"go.etcd.io/etcd/client/pkg/v3/types"
30	"go.etcd.io/etcd/pkg/v3/traceutil"
31	"go.etcd.io/etcd/server/v3/auth"
32	"go.etcd.io/etcd/server/v3/etcdserver/api"
33	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
34	"go.etcd.io/etcd/server/v3/lease"
35	"go.etcd.io/etcd/server/v3/mvcc"
36
37	"github.com/gogo/protobuf/proto"
38	"go.uber.org/zap"
39)
40
41const (
42	v3Version = "v3"
43)
44
45type applyResult struct {
46	resp proto.Message
47	err  error
48	// physc signals the physical effect of the request has completed in addition
49	// to being logically reflected by the node. Currently only used for
50	// Compaction requests.
51	physc <-chan struct{}
52	trace *traceutil.Trace
53}
54
55// applierV3Internal is the interface for processing internal V3 raft request
56type applierV3Internal interface {
57	ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3)
58	ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3)
59	DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3)
60}
61
62// applierV3 is the interface for processing V3 raft messages
63type applierV3 interface {
64	Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult
65
66	Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
67	Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
68	DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
69	Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
70	Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
71
72	LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
73	LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
74
75	LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)
76
77	Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
78
79	Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
80
81	AuthEnable() (*pb.AuthEnableResponse, error)
82	AuthDisable() (*pb.AuthDisableResponse, error)
83	AuthStatus() (*pb.AuthStatusResponse, error)
84
85	UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
86	UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
87	UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
88	UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
89	UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
90	UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
91	RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
92	RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
93	RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
94	RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
95	RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
96	UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
97	RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
98}
99
100type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
101
102type applierV3backend struct {
103	s *EtcdServer
104
105	checkPut   checkReqFunc
106	checkRange checkReqFunc
107}
108
109func (s *EtcdServer) newApplierV3Backend() applierV3 {
110	base := &applierV3backend{s: s}
111	base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
112		return base.checkRequestPut(rv, req)
113	}
114	base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
115		return base.checkRequestRange(rv, req)
116	}
117	return base
118}
119
120func (s *EtcdServer) newApplierV3Internal() applierV3Internal {
121	base := &applierV3backend{s: s}
122	return base
123}
124
125func (s *EtcdServer) newApplierV3() applierV3 {
126	return newAuthApplierV3(
127		s.AuthStore(),
128		newQuotaApplierV3(s, s.newApplierV3Backend()),
129		s.lessor,
130	)
131}
132
133func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
134	op := "unknown"
135	ar := &applyResult{}
136	defer func(start time.Time) {
137		success := ar.err == nil || ar.err == mvcc.ErrCompacted
138		applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
139		warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
140		if !success {
141			warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
142		}
143	}(time.Now())
144
145	switch {
146	case r.ClusterVersionSet != nil: // Implemented in 3.5.x
147		op = "ClusterVersionSet"
148		a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
149		return nil
150	case r.ClusterMemberAttrSet != nil:
151		op = "ClusterMemberAttrSet" // Implemented in 3.5.x
152		a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
153		return nil
154	case r.DowngradeInfoSet != nil:
155		op = "DowngradeInfoSet" // Implemented in 3.5.x
156		a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
157		return nil
158	}
159
160	if !shouldApplyV3 {
161		return nil
162	}
163
164	// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
165	switch {
166	case r.Range != nil:
167		op = "Range"
168		ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
169	case r.Put != nil:
170		op = "Put"
171		ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
172	case r.DeleteRange != nil:
173		op = "DeleteRange"
174		ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
175	case r.Txn != nil:
176		op = "Txn"
177		ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
178	case r.Compaction != nil:
179		op = "Compaction"
180		ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
181	case r.LeaseGrant != nil:
182		op = "LeaseGrant"
183		ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
184	case r.LeaseRevoke != nil:
185		op = "LeaseRevoke"
186		ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
187	case r.LeaseCheckpoint != nil:
188		op = "LeaseCheckpoint"
189		ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
190	case r.Alarm != nil:
191		op = "Alarm"
192		ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
193	case r.Authenticate != nil:
194		op = "Authenticate"
195		ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
196	case r.AuthEnable != nil:
197		op = "AuthEnable"
198		ar.resp, ar.err = a.s.applyV3.AuthEnable()
199	case r.AuthDisable != nil:
200		op = "AuthDisable"
201		ar.resp, ar.err = a.s.applyV3.AuthDisable()
202	case r.AuthStatus != nil:
203		ar.resp, ar.err = a.s.applyV3.AuthStatus()
204	case r.AuthUserAdd != nil:
205		op = "AuthUserAdd"
206		ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
207	case r.AuthUserDelete != nil:
208		op = "AuthUserDelete"
209		ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
210	case r.AuthUserChangePassword != nil:
211		op = "AuthUserChangePassword"
212		ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
213	case r.AuthUserGrantRole != nil:
214		op = "AuthUserGrantRole"
215		ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
216	case r.AuthUserGet != nil:
217		op = "AuthUserGet"
218		ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
219	case r.AuthUserRevokeRole != nil:
220		op = "AuthUserRevokeRole"
221		ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
222	case r.AuthRoleAdd != nil:
223		op = "AuthRoleAdd"
224		ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
225	case r.AuthRoleGrantPermission != nil:
226		op = "AuthRoleGrantPermission"
227		ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
228	case r.AuthRoleGet != nil:
229		op = "AuthRoleGet"
230		ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
231	case r.AuthRoleRevokePermission != nil:
232		op = "AuthRoleRevokePermission"
233		ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
234	case r.AuthRoleDelete != nil:
235		op = "AuthRoleDelete"
236		ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
237	case r.AuthUserList != nil:
238		op = "AuthUserList"
239		ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
240	case r.AuthRoleList != nil:
241		op = "AuthRoleList"
242		ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
243	default:
244		a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
245	}
246	return ar
247}
248
249func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
250	resp = &pb.PutResponse{}
251	resp.Header = &pb.ResponseHeader{}
252	trace = traceutil.Get(ctx)
253	// create put tracing if the trace in context is empty
254	if trace.IsEmpty() {
255		trace = traceutil.New("put",
256			a.s.Logger(),
257			traceutil.Field{Key: "key", Value: string(p.Key)},
258			traceutil.Field{Key: "req_size", Value: p.Size()},
259		)
260	}
261	val, leaseID := p.Value, lease.LeaseID(p.Lease)
262	if txn == nil {
263		if leaseID != lease.NoLease {
264			if l := a.s.lessor.Lookup(leaseID); l == nil {
265				return nil, nil, lease.ErrLeaseNotFound
266			}
267		}
268		txn = a.s.KV().Write(trace)
269		defer txn.End()
270	}
271
272	var rr *mvcc.RangeResult
273	if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
274		trace.StepWithFunction(func() {
275			rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
276		}, "get previous kv pair")
277
278		if err != nil {
279			return nil, nil, err
280		}
281	}
282	if p.IgnoreValue || p.IgnoreLease {
283		if rr == nil || len(rr.KVs) == 0 {
284			// ignore_{lease,value} flag expects previous key-value pair
285			return nil, nil, ErrKeyNotFound
286		}
287	}
288	if p.IgnoreValue {
289		val = rr.KVs[0].Value
290	}
291	if p.IgnoreLease {
292		leaseID = lease.LeaseID(rr.KVs[0].Lease)
293	}
294	if p.PrevKv {
295		if rr != nil && len(rr.KVs) != 0 {
296			resp.PrevKv = &rr.KVs[0]
297		}
298	}
299
300	resp.Header.Revision = txn.Put(p.Key, val, leaseID)
301	trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
302	return resp, trace, nil
303}
304
305func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
306	resp := &pb.DeleteRangeResponse{}
307	resp.Header = &pb.ResponseHeader{}
308	end := mkGteRange(dr.RangeEnd)
309
310	if txn == nil {
311		txn = a.s.kv.Write(traceutil.TODO())
312		defer txn.End()
313	}
314
315	if dr.PrevKv {
316		rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
317		if err != nil {
318			return nil, err
319		}
320		if rr != nil {
321			resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
322			for i := range rr.KVs {
323				resp.PrevKvs[i] = &rr.KVs[i]
324			}
325		}
326	}
327
328	resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
329	return resp, nil
330}
331
332func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
333	trace := traceutil.Get(ctx)
334
335	resp := &pb.RangeResponse{}
336	resp.Header = &pb.ResponseHeader{}
337
338	if txn == nil {
339		txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
340		defer txn.End()
341	}
342
343	limit := r.Limit
344	if r.SortOrder != pb.RangeRequest_NONE ||
345		r.MinModRevision != 0 || r.MaxModRevision != 0 ||
346		r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
347		// fetch everything; sort and truncate afterwards
348		limit = 0
349	}
350	if limit > 0 {
351		// fetch one extra for 'more' flag
352		limit = limit + 1
353	}
354
355	ro := mvcc.RangeOptions{
356		Limit: limit,
357		Rev:   r.Revision,
358		Count: r.CountOnly,
359	}
360
361	rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
362	if err != nil {
363		return nil, err
364	}
365
366	if r.MaxModRevision != 0 {
367		f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
368		pruneKVs(rr, f)
369	}
370	if r.MinModRevision != 0 {
371		f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
372		pruneKVs(rr, f)
373	}
374	if r.MaxCreateRevision != 0 {
375		f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
376		pruneKVs(rr, f)
377	}
378	if r.MinCreateRevision != 0 {
379		f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
380		pruneKVs(rr, f)
381	}
382
383	sortOrder := r.SortOrder
384	if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
385		// Since current mvcc.Range implementation returns results
386		// sorted by keys in lexiographically ascending order,
387		// sort ASCEND by default only when target is not 'KEY'
388		sortOrder = pb.RangeRequest_ASCEND
389	}
390	if sortOrder != pb.RangeRequest_NONE {
391		var sorter sort.Interface
392		switch {
393		case r.SortTarget == pb.RangeRequest_KEY:
394			sorter = &kvSortByKey{&kvSort{rr.KVs}}
395		case r.SortTarget == pb.RangeRequest_VERSION:
396			sorter = &kvSortByVersion{&kvSort{rr.KVs}}
397		case r.SortTarget == pb.RangeRequest_CREATE:
398			sorter = &kvSortByCreate{&kvSort{rr.KVs}}
399		case r.SortTarget == pb.RangeRequest_MOD:
400			sorter = &kvSortByMod{&kvSort{rr.KVs}}
401		case r.SortTarget == pb.RangeRequest_VALUE:
402			sorter = &kvSortByValue{&kvSort{rr.KVs}}
403		}
404		switch {
405		case sortOrder == pb.RangeRequest_ASCEND:
406			sort.Sort(sorter)
407		case sortOrder == pb.RangeRequest_DESCEND:
408			sort.Sort(sort.Reverse(sorter))
409		}
410	}
411
412	if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
413		rr.KVs = rr.KVs[:r.Limit]
414		resp.More = true
415	}
416	trace.Step("filter and sort the key-value pairs")
417	resp.Header.Revision = rr.Rev
418	resp.Count = int64(rr.Count)
419	resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
420	for i := range rr.KVs {
421		if r.KeysOnly {
422			rr.KVs[i].Value = nil
423		}
424		resp.Kvs[i] = &rr.KVs[i]
425	}
426	trace.Step("assemble the response")
427	return resp, nil
428}
429
430func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
431	trace := traceutil.Get(ctx)
432	if trace.IsEmpty() {
433		trace = traceutil.New("transaction", a.s.Logger())
434		ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
435	}
436	isWrite := !isTxnReadonly(rt)
437
438	// When the transaction contains write operations, we use ReadTx instead of
439	// ConcurrentReadTx to avoid extra overhead of copying buffer.
440	var txn mvcc.TxnWrite
441	if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer {
442		txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
443	} else {
444		txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
445	}
446
447	var txnPath []bool
448	trace.StepWithFunction(
449		func() {
450			txnPath = compareToPath(txn, rt)
451		},
452		"compare",
453	)
454
455	if isWrite {
456		trace.AddField(traceutil.Field{Key: "read_only", Value: false})
457		if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
458			txn.End()
459			return nil, nil, err
460		}
461	}
462	if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
463		txn.End()
464		return nil, nil, err
465	}
466	trace.Step("check requests")
467	txnResp, _ := newTxnResp(rt, txnPath)
468
469	// When executing mutable txn ops, etcd must hold the txn lock so
470	// readers do not see any intermediate results. Since writes are
471	// serialized on the raft loop, the revision in the read view will
472	// be the revision of the write txn.
473	if isWrite {
474		txn.End()
475		txn = a.s.KV().Write(trace)
476	}
477	a.applyTxn(ctx, txn, rt, txnPath, txnResp)
478	rev := txn.Rev()
479	if len(txn.Changes()) != 0 {
480		rev++
481	}
482	txn.End()
483
484	txnResp.Header.Revision = rev
485	trace.AddField(
486		traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
487		traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
488	)
489	return txnResp, trace, nil
490}
491
492// newTxnResp allocates a txn response for a txn request given a path.
493func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
494	reqs := rt.Success
495	if !txnPath[0] {
496		reqs = rt.Failure
497	}
498	resps := make([]*pb.ResponseOp, len(reqs))
499	txnResp = &pb.TxnResponse{
500		Responses: resps,
501		Succeeded: txnPath[0],
502		Header:    &pb.ResponseHeader{},
503	}
504	for i, req := range reqs {
505		switch tv := req.Request.(type) {
506		case *pb.RequestOp_RequestRange:
507			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
508		case *pb.RequestOp_RequestPut:
509			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
510		case *pb.RequestOp_RequestDeleteRange:
511			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
512		case *pb.RequestOp_RequestTxn:
513			resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
514			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
515			txnPath = txnPath[1+txns:]
516			txnCount += txns + 1
517		default:
518		}
519	}
520	return txnResp, txnCount
521}
522
523func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
524	txnPath := make([]bool, 1)
525	ops := rt.Success
526	if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
527		ops = rt.Failure
528	}
529	for _, op := range ops {
530		tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
531		if !ok || tv.RequestTxn == nil {
532			continue
533		}
534		txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
535	}
536	return txnPath
537}
538
539func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
540	for _, c := range cmps {
541		if !applyCompare(rv, c) {
542			return false
543		}
544	}
545	return true
546}
547
548// applyCompare applies the compare request.
549// If the comparison succeeds, it returns true. Otherwise, returns false.
550func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
551	// TODO: possible optimizations
552	// * chunk reads for large ranges to conserve memory
553	// * rewrite rules for common patterns:
554	//	ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
555	// * caching
556	rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
557	if err != nil {
558		return false
559	}
560	if len(rr.KVs) == 0 {
561		if c.Target == pb.Compare_VALUE {
562			// Always fail if comparing a value on a key/keys that doesn't exist;
563			// nil == empty string in grpc; no way to represent missing value
564			return false
565		}
566		return compareKV(c, mvccpb.KeyValue{})
567	}
568	for _, kv := range rr.KVs {
569		if !compareKV(c, kv) {
570			return false
571		}
572	}
573	return true
574}
575
576func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
577	var result int
578	rev := int64(0)
579	switch c.Target {
580	case pb.Compare_VALUE:
581		v := []byte{}
582		if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
583			v = tv.Value
584		}
585		result = bytes.Compare(ckv.Value, v)
586	case pb.Compare_CREATE:
587		if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
588			rev = tv.CreateRevision
589		}
590		result = compareInt64(ckv.CreateRevision, rev)
591	case pb.Compare_MOD:
592		if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
593			rev = tv.ModRevision
594		}
595		result = compareInt64(ckv.ModRevision, rev)
596	case pb.Compare_VERSION:
597		if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
598			rev = tv.Version
599		}
600		result = compareInt64(ckv.Version, rev)
601	case pb.Compare_LEASE:
602		if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
603			rev = tv.Lease
604		}
605		result = compareInt64(ckv.Lease, rev)
606	}
607	switch c.Result {
608	case pb.Compare_EQUAL:
609		return result == 0
610	case pb.Compare_NOT_EQUAL:
611		return result != 0
612	case pb.Compare_GREATER:
613		return result > 0
614	case pb.Compare_LESS:
615		return result < 0
616	}
617	return true
618}
619
620func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
621	trace := traceutil.Get(ctx)
622	reqs := rt.Success
623	if !txnPath[0] {
624		reqs = rt.Failure
625	}
626
627	lg := a.s.Logger()
628	for i, req := range reqs {
629		respi := tresp.Responses[i].Response
630		switch tv := req.Request.(type) {
631		case *pb.RequestOp_RequestRange:
632			trace.StartSubTrace(
633				traceutil.Field{Key: "req_type", Value: "range"},
634				traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
635				traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
636			resp, err := a.Range(ctx, txn, tv.RequestRange)
637			if err != nil {
638				lg.Panic("unexpected error during txn", zap.Error(err))
639			}
640			respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
641			trace.StopSubTrace()
642		case *pb.RequestOp_RequestPut:
643			trace.StartSubTrace(
644				traceutil.Field{Key: "req_type", Value: "put"},
645				traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
646				traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
647			resp, _, err := a.Put(ctx, txn, tv.RequestPut)
648			if err != nil {
649				lg.Panic("unexpected error during txn", zap.Error(err))
650			}
651			respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
652			trace.StopSubTrace()
653		case *pb.RequestOp_RequestDeleteRange:
654			resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
655			if err != nil {
656				lg.Panic("unexpected error during txn", zap.Error(err))
657			}
658			respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
659		case *pb.RequestOp_RequestTxn:
660			resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
661			applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
662			txns += applyTxns + 1
663			txnPath = txnPath[applyTxns+1:]
664		default:
665			// empty union
666		}
667	}
668	return txns
669}
670
671func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
672	resp := &pb.CompactionResponse{}
673	resp.Header = &pb.ResponseHeader{}
674	trace := traceutil.New("compact",
675		a.s.Logger(),
676		traceutil.Field{Key: "revision", Value: compaction.Revision},
677	)
678
679	ch, err := a.s.KV().Compact(trace, compaction.Revision)
680	if err != nil {
681		return nil, ch, nil, err
682	}
683	// get the current revision. which key to get is not important.
684	rr, _ := a.s.KV().Range(context.TODO(), []byte("compaction"), nil, mvcc.RangeOptions{})
685	resp.Header.Revision = rr.Rev
686	return resp, ch, trace, err
687}
688
689func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
690	l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
691	resp := &pb.LeaseGrantResponse{}
692	if err == nil {
693		resp.ID = int64(l.ID)
694		resp.TTL = l.TTL()
695		resp.Header = newHeader(a.s)
696	}
697	return resp, err
698}
699
700func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
701	err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
702	return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
703}
704
705func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
706	for _, c := range lc.Checkpoints {
707		err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
708		if err != nil {
709			return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err
710		}
711	}
712	return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil
713}
714
715func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
716	resp := &pb.AlarmResponse{}
717	oldCount := len(a.s.alarmStore.Get(ar.Alarm))
718
719	lg := a.s.Logger()
720	switch ar.Action {
721	case pb.AlarmRequest_GET:
722		resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
723	case pb.AlarmRequest_ACTIVATE:
724		if ar.Alarm == pb.AlarmType_NONE {
725			break
726		}
727		m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm)
728		if m == nil {
729			break
730		}
731		resp.Alarms = append(resp.Alarms, m)
732		activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
733		if !activated {
734			break
735		}
736
737		lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
738		switch m.Alarm {
739		case pb.AlarmType_CORRUPT:
740			a.s.applyV3 = newApplierV3Corrupt(a)
741		case pb.AlarmType_NOSPACE:
742			a.s.applyV3 = newApplierV3Capped(a)
743		default:
744			lg.Panic("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
745		}
746	case pb.AlarmRequest_DEACTIVATE:
747		m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
748		if m == nil {
749			break
750		}
751		resp.Alarms = append(resp.Alarms, m)
752		deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
753		if !deactivated {
754			break
755		}
756
757		switch m.Alarm {
758		case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
759			// TODO: check kv hash before deactivating CORRUPT?
760			lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
761			a.s.applyV3 = a.s.newApplierV3()
762		default:
763			lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
764		}
765	default:
766		return nil, nil
767	}
768	return resp, nil
769}
770
771type applierV3Capped struct {
772	applierV3
773	q backendQuota
774}
775
776// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
777// with Puts so that the number of keys in the store is capped.
778func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
779
780func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
781	return nil, nil, ErrNoSpace
782}
783
784func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
785	if a.q.Cost(r) > 0 {
786		return nil, nil, ErrNoSpace
787	}
788	return a.applierV3.Txn(ctx, r)
789}
790
791func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
792	return nil, ErrNoSpace
793}
794
795func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
796	err := a.s.AuthStore().AuthEnable()
797	if err != nil {
798		return nil, err
799	}
800	return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
801}
802
803func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
804	a.s.AuthStore().AuthDisable()
805	return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
806}
807
808func (a *applierV3backend) AuthStatus() (*pb.AuthStatusResponse, error) {
809	enabled := a.s.AuthStore().IsAuthEnabled()
810	authRevision := a.s.AuthStore().Revision()
811	return &pb.AuthStatusResponse{Header: newHeader(a.s), Enabled: enabled, AuthRevision: authRevision}, nil
812}
813
814func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
815	ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken)
816	resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
817	if resp != nil {
818		resp.Header = newHeader(a.s)
819	}
820	return resp, err
821}
822
823func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
824	resp, err := a.s.AuthStore().UserAdd(r)
825	if resp != nil {
826		resp.Header = newHeader(a.s)
827	}
828	return resp, err
829}
830
831func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
832	resp, err := a.s.AuthStore().UserDelete(r)
833	if resp != nil {
834		resp.Header = newHeader(a.s)
835	}
836	return resp, err
837}
838
839func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
840	resp, err := a.s.AuthStore().UserChangePassword(r)
841	if resp != nil {
842		resp.Header = newHeader(a.s)
843	}
844	return resp, err
845}
846
847func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
848	resp, err := a.s.AuthStore().UserGrantRole(r)
849	if resp != nil {
850		resp.Header = newHeader(a.s)
851	}
852	return resp, err
853}
854
855func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
856	resp, err := a.s.AuthStore().UserGet(r)
857	if resp != nil {
858		resp.Header = newHeader(a.s)
859	}
860	return resp, err
861}
862
863func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
864	resp, err := a.s.AuthStore().UserRevokeRole(r)
865	if resp != nil {
866		resp.Header = newHeader(a.s)
867	}
868	return resp, err
869}
870
871func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
872	resp, err := a.s.AuthStore().RoleAdd(r)
873	if resp != nil {
874		resp.Header = newHeader(a.s)
875	}
876	return resp, err
877}
878
879func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
880	resp, err := a.s.AuthStore().RoleGrantPermission(r)
881	if resp != nil {
882		resp.Header = newHeader(a.s)
883	}
884	return resp, err
885}
886
887func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
888	resp, err := a.s.AuthStore().RoleGet(r)
889	if resp != nil {
890		resp.Header = newHeader(a.s)
891	}
892	return resp, err
893}
894
895func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
896	resp, err := a.s.AuthStore().RoleRevokePermission(r)
897	if resp != nil {
898		resp.Header = newHeader(a.s)
899	}
900	return resp, err
901}
902
903func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
904	resp, err := a.s.AuthStore().RoleDelete(r)
905	if resp != nil {
906		resp.Header = newHeader(a.s)
907	}
908	return resp, err
909}
910
911func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
912	resp, err := a.s.AuthStore().UserList(r)
913	if resp != nil {
914		resp.Header = newHeader(a.s)
915	}
916	return resp, err
917}
918
919func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
920	resp, err := a.s.AuthStore().RoleList(r)
921	if resp != nil {
922		resp.Header = newHeader(a.s)
923	}
924	return resp, err
925}
926
927func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
928	a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3)
929}
930
931func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
932	a.s.cluster.UpdateAttributes(
933		types.ID(r.Member_ID),
934		membership.Attributes{
935			Name:       r.MemberAttributes.Name,
936			ClientURLs: r.MemberAttributes.ClientUrls,
937		},
938		shouldApplyV3,
939	)
940}
941
942func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
943	d := membership.DowngradeInfo{Enabled: false}
944	if r.Enabled {
945		d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
946	}
947	a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3)
948}
949
950type quotaApplierV3 struct {
951	applierV3
952	q Quota
953}
954
955func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
956	return &quotaApplierV3{app, NewBackendQuota(s, "v3-applier")}
957}
958
959func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
960	ok := a.q.Available(p)
961	resp, trace, err := a.applierV3.Put(ctx, txn, p)
962	if err == nil && !ok {
963		err = ErrNoSpace
964	}
965	return resp, trace, err
966}
967
968func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
969	ok := a.q.Available(rt)
970	resp, trace, err := a.applierV3.Txn(ctx, rt)
971	if err == nil && !ok {
972		err = ErrNoSpace
973	}
974	return resp, trace, err
975}
976
977func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
978	ok := a.q.Available(lc)
979	resp, err := a.applierV3.LeaseGrant(lc)
980	if err == nil && !ok {
981		err = ErrNoSpace
982	}
983	return resp, err
984}
985
986type kvSort struct{ kvs []mvccpb.KeyValue }
987
988func (s *kvSort) Swap(i, j int) {
989	t := s.kvs[i]
990	s.kvs[i] = s.kvs[j]
991	s.kvs[j] = t
992}
993func (s *kvSort) Len() int { return len(s.kvs) }
994
995type kvSortByKey struct{ *kvSort }
996
997func (s *kvSortByKey) Less(i, j int) bool {
998	return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
999}
1000
1001type kvSortByVersion struct{ *kvSort }
1002
1003func (s *kvSortByVersion) Less(i, j int) bool {
1004	return (s.kvs[i].Version - s.kvs[j].Version) < 0
1005}
1006
1007type kvSortByCreate struct{ *kvSort }
1008
1009func (s *kvSortByCreate) Less(i, j int) bool {
1010	return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
1011}
1012
1013type kvSortByMod struct{ *kvSort }
1014
1015func (s *kvSortByMod) Less(i, j int) bool {
1016	return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
1017}
1018
1019type kvSortByValue struct{ *kvSort }
1020
1021func (s *kvSortByValue) Less(i, j int) bool {
1022	return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
1023}
1024
1025func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
1026	txnCount := 0
1027	reqs := rt.Success
1028	if !txnPath[0] {
1029		reqs = rt.Failure
1030	}
1031	for _, req := range reqs {
1032		if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
1033			txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
1034			if err != nil {
1035				return 0, err
1036			}
1037			txnCount += txns + 1
1038			txnPath = txnPath[txns+1:]
1039			continue
1040		}
1041		if err := f(rv, req); err != nil {
1042			return 0, err
1043		}
1044	}
1045	return txnCount, nil
1046}
1047
1048func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
1049	tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
1050	if !ok || tv.RequestPut == nil {
1051		return nil
1052	}
1053	req := tv.RequestPut
1054	if req.IgnoreValue || req.IgnoreLease {
1055		// expects previous key-value, error if not exist
1056		rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
1057		if err != nil {
1058			return err
1059		}
1060		if rr == nil || len(rr.KVs) == 0 {
1061			return ErrKeyNotFound
1062		}
1063	}
1064	if lease.LeaseID(req.Lease) != lease.NoLease {
1065		if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
1066			return lease.ErrLeaseNotFound
1067		}
1068	}
1069	return nil
1070}
1071
1072func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
1073	tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
1074	if !ok || tv.RequestRange == nil {
1075		return nil
1076	}
1077	req := tv.RequestRange
1078	switch {
1079	case req.Revision == 0:
1080		return nil
1081	case req.Revision > rv.Rev():
1082		return mvcc.ErrFutureRev
1083	case req.Revision < rv.FirstRev():
1084		return mvcc.ErrCompacted
1085	}
1086	return nil
1087}
1088
1089func compareInt64(a, b int64) int {
1090	switch {
1091	case a < b:
1092		return -1
1093	case a > b:
1094		return 1
1095	default:
1096		return 0
1097	}
1098}
1099
1100// mkGteRange determines if the range end is a >= range. This works around grpc
1101// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
1102// If it is a GTE range, then []byte{} is returned to indicate the empty byte
1103// string (vs nil being no byte string).
1104func mkGteRange(rangeEnd []byte) []byte {
1105	if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
1106		return []byte{}
1107	}
1108	return rangeEnd
1109}
1110
1111func noSideEffect(r *pb.InternalRaftRequest) bool {
1112	return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
1113}
1114
1115func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
1116	f := func(ops []*pb.RequestOp) []*pb.RequestOp {
1117		j := 0
1118		for i := 0; i < len(ops); i++ {
1119			if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
1120				continue
1121			}
1122			ops[j] = ops[i]
1123			j++
1124		}
1125
1126		return ops[:j]
1127	}
1128
1129	txn.Success = f(txn.Success)
1130	txn.Failure = f(txn.Failure)
1131}
1132
1133func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
1134	j := 0
1135	for i := range rr.KVs {
1136		rr.KVs[j] = rr.KVs[i]
1137		if !isPrunable(&rr.KVs[i]) {
1138			j++
1139		}
1140	}
1141	rr.KVs = rr.KVs[:j]
1142}
1143
1144func newHeader(s *EtcdServer) *pb.ResponseHeader {
1145	return &pb.ResponseHeader{
1146		ClusterId: uint64(s.Cluster().ID()),
1147		MemberId:  uint64(s.ID()),
1148		Revision:  s.KV().Rev(),
1149		RaftTerm:  s.Term(),
1150	}
1151}
1152