1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"bytes"
19	"context"
20	"fmt"
21	"sort"
22	"strconv"
23	"time"
24
25	"github.com/coreos/go-semver/semver"
26	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
27	"go.etcd.io/etcd/api/v3/membershippb"
28	"go.etcd.io/etcd/api/v3/mvccpb"
29	"go.etcd.io/etcd/pkg/v3/traceutil"
30	"go.etcd.io/etcd/pkg/v3/types"
31	"go.etcd.io/etcd/server/v3/auth"
32	"go.etcd.io/etcd/server/v3/etcdserver/api"
33	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
34	"go.etcd.io/etcd/server/v3/lease"
35	"go.etcd.io/etcd/server/v3/mvcc"
36
37	"github.com/gogo/protobuf/proto"
38	"go.uber.org/zap"
39)
40
41const (
42	v3Version = "v3"
43)
44
45type applyResult struct {
46	resp proto.Message
47	err  error
48	// physc signals the physical effect of the request has completed in addition
49	// to being logically reflected by the node. Currently only used for
50	// Compaction requests.
51	physc <-chan struct{}
52	trace *traceutil.Trace
53}
54
55// applierV3Internal is the interface for processing internal V3 raft request
56type applierV3Internal interface {
57	ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
58	ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
59	DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest)
60}
61
62// applierV3 is the interface for processing V3 raft messages
63type applierV3 interface {
64	Apply(r *pb.InternalRaftRequest) *applyResult
65
66	Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
67	Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
68	DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
69	Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
70	Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
71
72	LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
73	LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
74
75	LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)
76
77	Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
78
79	Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
80
81	AuthEnable() (*pb.AuthEnableResponse, error)
82	AuthDisable() (*pb.AuthDisableResponse, error)
83	AuthStatus() (*pb.AuthStatusResponse, error)
84
85	UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
86	UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
87	UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
88	UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
89	UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
90	UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
91	RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
92	RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
93	RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
94	RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
95	RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
96	UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
97	RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
98}
99
100type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
101
102type applierV3backend struct {
103	s *EtcdServer
104
105	checkPut   checkReqFunc
106	checkRange checkReqFunc
107}
108
109func (s *EtcdServer) newApplierV3Backend() applierV3 {
110	base := &applierV3backend{s: s}
111	base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
112		return base.checkRequestPut(rv, req)
113	}
114	base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
115		return base.checkRequestRange(rv, req)
116	}
117	return base
118}
119
120func (s *EtcdServer) newApplierV3Internal() applierV3Internal {
121	base := &applierV3backend{s: s}
122	return base
123}
124
125func (s *EtcdServer) newApplierV3() applierV3 {
126	return newAuthApplierV3(
127		s.AuthStore(),
128		newQuotaApplierV3(s, s.newApplierV3Backend()),
129		s.lessor,
130	)
131}
132
133func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
134	op := "unknown"
135	ar := &applyResult{}
136	defer func(start time.Time) {
137		success := ar.err == nil || ar.err == mvcc.ErrCompacted
138		applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
139		warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
140		if !success {
141			warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
142		}
143	}(time.Now())
144
145	// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
146	switch {
147	case r.Range != nil:
148		op = "Range"
149		ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
150	case r.Put != nil:
151		op = "Put"
152		ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
153	case r.DeleteRange != nil:
154		op = "DeleteRange"
155		ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
156	case r.Txn != nil:
157		op = "Txn"
158		ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
159	case r.Compaction != nil:
160		op = "Compaction"
161		ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
162	case r.LeaseGrant != nil:
163		op = "LeaseGrant"
164		ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
165	case r.LeaseRevoke != nil:
166		op = "LeaseRevoke"
167		ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
168	case r.LeaseCheckpoint != nil:
169		op = "LeaseCheckpoint"
170		ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
171	case r.Alarm != nil:
172		op = "Alarm"
173		ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
174	case r.Authenticate != nil:
175		op = "Authenticate"
176		ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
177	case r.AuthEnable != nil:
178		op = "AuthEnable"
179		ar.resp, ar.err = a.s.applyV3.AuthEnable()
180	case r.AuthDisable != nil:
181		op = "AuthDisable"
182		ar.resp, ar.err = a.s.applyV3.AuthDisable()
183	case r.AuthStatus != nil:
184		ar.resp, ar.err = a.s.applyV3.AuthStatus()
185	case r.AuthUserAdd != nil:
186		op = "AuthUserAdd"
187		ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
188	case r.AuthUserDelete != nil:
189		op = "AuthUserDelete"
190		ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
191	case r.AuthUserChangePassword != nil:
192		op = "AuthUserChangePassword"
193		ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
194	case r.AuthUserGrantRole != nil:
195		op = "AuthUserGrantRole"
196		ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
197	case r.AuthUserGet != nil:
198		op = "AuthUserGet"
199		ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
200	case r.AuthUserRevokeRole != nil:
201		op = "AuthUserRevokeRole"
202		ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
203	case r.AuthRoleAdd != nil:
204		op = "AuthRoleAdd"
205		ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
206	case r.AuthRoleGrantPermission != nil:
207		op = "AuthRoleGrantPermission"
208		ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
209	case r.AuthRoleGet != nil:
210		op = "AuthRoleGet"
211		ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
212	case r.AuthRoleRevokePermission != nil:
213		op = "AuthRoleRevokePermission"
214		ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
215	case r.AuthRoleDelete != nil:
216		op = "AuthRoleDelete"
217		ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
218	case r.AuthUserList != nil:
219		op = "AuthUserList"
220		ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
221	case r.AuthRoleList != nil:
222		op = "AuthRoleList"
223		ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
224	case r.ClusterVersionSet != nil:
225		op = "ClusterVersionSet"
226		a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
227	case r.ClusterMemberAttrSet != nil:
228		op = "ClusterMemberAttrSet"
229		a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
230	case r.DowngradeInfoSet != nil:
231		op = "DowngradeInfoSet"
232		a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet)
233	default:
234		panic("not implemented")
235	}
236	return ar
237}
238
239func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
240	resp = &pb.PutResponse{}
241	resp.Header = &pb.ResponseHeader{}
242	trace = traceutil.Get(ctx)
243	// create put tracing if the trace in context is empty
244	if trace.IsEmpty() {
245		trace = traceutil.New("put",
246			a.s.getLogger(),
247			traceutil.Field{Key: "key", Value: string(p.Key)},
248			traceutil.Field{Key: "req_size", Value: proto.Size(p)},
249		)
250	}
251	val, leaseID := p.Value, lease.LeaseID(p.Lease)
252	if txn == nil {
253		if leaseID != lease.NoLease {
254			if l := a.s.lessor.Lookup(leaseID); l == nil {
255				return nil, nil, lease.ErrLeaseNotFound
256			}
257		}
258		txn = a.s.KV().Write(trace)
259		defer txn.End()
260	}
261
262	var rr *mvcc.RangeResult
263	if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
264		trace.StepWithFunction(func() {
265			rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
266		}, "get previous kv pair")
267
268		if err != nil {
269			return nil, nil, err
270		}
271	}
272	if p.IgnoreValue || p.IgnoreLease {
273		if rr == nil || len(rr.KVs) == 0 {
274			// ignore_{lease,value} flag expects previous key-value pair
275			return nil, nil, ErrKeyNotFound
276		}
277	}
278	if p.IgnoreValue {
279		val = rr.KVs[0].Value
280	}
281	if p.IgnoreLease {
282		leaseID = lease.LeaseID(rr.KVs[0].Lease)
283	}
284	if p.PrevKv {
285		if rr != nil && len(rr.KVs) != 0 {
286			resp.PrevKv = &rr.KVs[0]
287		}
288	}
289
290	resp.Header.Revision = txn.Put(p.Key, val, leaseID)
291	trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
292	return resp, trace, nil
293}
294
295func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
296	resp := &pb.DeleteRangeResponse{}
297	resp.Header = &pb.ResponseHeader{}
298	end := mkGteRange(dr.RangeEnd)
299
300	if txn == nil {
301		txn = a.s.kv.Write(traceutil.TODO())
302		defer txn.End()
303	}
304
305	if dr.PrevKv {
306		rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
307		if err != nil {
308			return nil, err
309		}
310		if rr != nil {
311			resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
312			for i := range rr.KVs {
313				resp.PrevKvs[i] = &rr.KVs[i]
314			}
315		}
316	}
317
318	resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
319	return resp, nil
320}
321
322func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
323	trace := traceutil.Get(ctx)
324
325	resp := &pb.RangeResponse{}
326	resp.Header = &pb.ResponseHeader{}
327
328	if txn == nil {
329		txn = a.s.kv.Read(trace)
330		defer txn.End()
331	}
332
333	limit := r.Limit
334	if r.SortOrder != pb.RangeRequest_NONE ||
335		r.MinModRevision != 0 || r.MaxModRevision != 0 ||
336		r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
337		// fetch everything; sort and truncate afterwards
338		limit = 0
339	}
340	if limit > 0 {
341		// fetch one extra for 'more' flag
342		limit = limit + 1
343	}
344
345	ro := mvcc.RangeOptions{
346		Limit: limit,
347		Rev:   r.Revision,
348		Count: r.CountOnly,
349	}
350
351	rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
352	if err != nil {
353		return nil, err
354	}
355
356	if r.MaxModRevision != 0 {
357		f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
358		pruneKVs(rr, f)
359	}
360	if r.MinModRevision != 0 {
361		f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
362		pruneKVs(rr, f)
363	}
364	if r.MaxCreateRevision != 0 {
365		f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
366		pruneKVs(rr, f)
367	}
368	if r.MinCreateRevision != 0 {
369		f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
370		pruneKVs(rr, f)
371	}
372
373	sortOrder := r.SortOrder
374	if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
375		// Since current mvcc.Range implementation returns results
376		// sorted by keys in lexiographically ascending order,
377		// sort ASCEND by default only when target is not 'KEY'
378		sortOrder = pb.RangeRequest_ASCEND
379	}
380	if sortOrder != pb.RangeRequest_NONE {
381		var sorter sort.Interface
382		switch {
383		case r.SortTarget == pb.RangeRequest_KEY:
384			sorter = &kvSortByKey{&kvSort{rr.KVs}}
385		case r.SortTarget == pb.RangeRequest_VERSION:
386			sorter = &kvSortByVersion{&kvSort{rr.KVs}}
387		case r.SortTarget == pb.RangeRequest_CREATE:
388			sorter = &kvSortByCreate{&kvSort{rr.KVs}}
389		case r.SortTarget == pb.RangeRequest_MOD:
390			sorter = &kvSortByMod{&kvSort{rr.KVs}}
391		case r.SortTarget == pb.RangeRequest_VALUE:
392			sorter = &kvSortByValue{&kvSort{rr.KVs}}
393		}
394		switch {
395		case sortOrder == pb.RangeRequest_ASCEND:
396			sort.Sort(sorter)
397		case sortOrder == pb.RangeRequest_DESCEND:
398			sort.Sort(sort.Reverse(sorter))
399		}
400	}
401
402	if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
403		rr.KVs = rr.KVs[:r.Limit]
404		resp.More = true
405	}
406	trace.Step("filter and sort the key-value pairs")
407	resp.Header.Revision = rr.Rev
408	resp.Count = int64(rr.Count)
409	resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
410	for i := range rr.KVs {
411		if r.KeysOnly {
412			rr.KVs[i].Value = nil
413		}
414		resp.Kvs[i] = &rr.KVs[i]
415	}
416	trace.Step("assemble the response")
417	return resp, nil
418}
419
420func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
421	trace := traceutil.Get(ctx)
422	if trace.IsEmpty() {
423		trace = traceutil.New("transaction", a.s.getLogger())
424		ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
425	}
426	isWrite := !isTxnReadonly(rt)
427	txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))
428
429	var txnPath []bool
430	trace.StepWithFunction(
431		func() {
432			txnPath = compareToPath(txn, rt)
433		},
434		"compare",
435	)
436
437	if isWrite {
438		trace.AddField(traceutil.Field{Key: "read_only", Value: false})
439		if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
440			txn.End()
441			return nil, nil, err
442		}
443	}
444	if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
445		txn.End()
446		return nil, nil, err
447	}
448	trace.Step("check requests")
449	txnResp, _ := newTxnResp(rt, txnPath)
450
451	// When executing mutable txn ops, etcd must hold the txn lock so
452	// readers do not see any intermediate results. Since writes are
453	// serialized on the raft loop, the revision in the read view will
454	// be the revision of the write txn.
455	if isWrite {
456		txn.End()
457		txn = a.s.KV().Write(trace)
458	}
459	a.applyTxn(ctx, txn, rt, txnPath, txnResp)
460	rev := txn.Rev()
461	if len(txn.Changes()) != 0 {
462		rev++
463	}
464	txn.End()
465
466	txnResp.Header.Revision = rev
467	trace.AddField(
468		traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
469		traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
470	)
471	return txnResp, trace, nil
472}
473
474// newTxnResp allocates a txn response for a txn request given a path.
475func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
476	reqs := rt.Success
477	if !txnPath[0] {
478		reqs = rt.Failure
479	}
480	resps := make([]*pb.ResponseOp, len(reqs))
481	txnResp = &pb.TxnResponse{
482		Responses: resps,
483		Succeeded: txnPath[0],
484		Header:    &pb.ResponseHeader{},
485	}
486	for i, req := range reqs {
487		switch tv := req.Request.(type) {
488		case *pb.RequestOp_RequestRange:
489			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
490		case *pb.RequestOp_RequestPut:
491			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
492		case *pb.RequestOp_RequestDeleteRange:
493			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
494		case *pb.RequestOp_RequestTxn:
495			resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
496			resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
497			txnPath = txnPath[1+txns:]
498			txnCount += txns + 1
499		default:
500		}
501	}
502	return txnResp, txnCount
503}
504
505func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
506	txnPath := make([]bool, 1)
507	ops := rt.Success
508	if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
509		ops = rt.Failure
510	}
511	for _, op := range ops {
512		tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
513		if !ok || tv.RequestTxn == nil {
514			continue
515		}
516		txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
517	}
518	return txnPath
519}
520
521func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
522	for _, c := range cmps {
523		if !applyCompare(rv, c) {
524			return false
525		}
526	}
527	return true
528}
529
530// applyCompare applies the compare request.
531// If the comparison succeeds, it returns true. Otherwise, returns false.
532func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
533	// TODO: possible optimizations
534	// * chunk reads for large ranges to conserve memory
535	// * rewrite rules for common patterns:
536	//	ex. "[a, b) createrev > 0" => "limit 1 /\ kvs > 0"
537	// * caching
538	rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
539	if err != nil {
540		return false
541	}
542	if len(rr.KVs) == 0 {
543		if c.Target == pb.Compare_VALUE {
544			// Always fail if comparing a value on a key/keys that doesn't exist;
545			// nil == empty string in grpc; no way to represent missing value
546			return false
547		}
548		return compareKV(c, mvccpb.KeyValue{})
549	}
550	for _, kv := range rr.KVs {
551		if !compareKV(c, kv) {
552			return false
553		}
554	}
555	return true
556}
557
558func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
559	var result int
560	rev := int64(0)
561	switch c.Target {
562	case pb.Compare_VALUE:
563		v := []byte{}
564		if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
565			v = tv.Value
566		}
567		result = bytes.Compare(ckv.Value, v)
568	case pb.Compare_CREATE:
569		if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
570			rev = tv.CreateRevision
571		}
572		result = compareInt64(ckv.CreateRevision, rev)
573	case pb.Compare_MOD:
574		if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
575			rev = tv.ModRevision
576		}
577		result = compareInt64(ckv.ModRevision, rev)
578	case pb.Compare_VERSION:
579		if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
580			rev = tv.Version
581		}
582		result = compareInt64(ckv.Version, rev)
583	case pb.Compare_LEASE:
584		if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
585			rev = tv.Lease
586		}
587		result = compareInt64(ckv.Lease, rev)
588	}
589	switch c.Result {
590	case pb.Compare_EQUAL:
591		return result == 0
592	case pb.Compare_NOT_EQUAL:
593		return result != 0
594	case pb.Compare_GREATER:
595		return result > 0
596	case pb.Compare_LESS:
597		return result < 0
598	}
599	return true
600}
601
602func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
603	trace := traceutil.Get(ctx)
604	reqs := rt.Success
605	if !txnPath[0] {
606		reqs = rt.Failure
607	}
608
609	lg := a.s.getLogger()
610	for i, req := range reqs {
611		respi := tresp.Responses[i].Response
612		switch tv := req.Request.(type) {
613		case *pb.RequestOp_RequestRange:
614			trace.StartSubTrace(
615				traceutil.Field{Key: "req_type", Value: "range"},
616				traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
617				traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
618			resp, err := a.Range(ctx, txn, tv.RequestRange)
619			if err != nil {
620				lg.Panic("unexpected error during txn", zap.Error(err))
621			}
622			respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
623			trace.StopSubTrace()
624		case *pb.RequestOp_RequestPut:
625			trace.StartSubTrace(
626				traceutil.Field{Key: "req_type", Value: "put"},
627				traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
628				traceutil.Field{Key: "req_size", Value: proto.Size(tv.RequestPut)})
629			resp, _, err := a.Put(ctx, txn, tv.RequestPut)
630			if err != nil {
631				lg.Panic("unexpected error during txn", zap.Error(err))
632			}
633			respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
634			trace.StopSubTrace()
635		case *pb.RequestOp_RequestDeleteRange:
636			resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
637			if err != nil {
638				lg.Panic("unexpected error during txn", zap.Error(err))
639			}
640			respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
641		case *pb.RequestOp_RequestTxn:
642			resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
643			applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
644			txns += applyTxns + 1
645			txnPath = txnPath[applyTxns+1:]
646		default:
647			// empty union
648		}
649	}
650	return txns
651}
652
653func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
654	resp := &pb.CompactionResponse{}
655	resp.Header = &pb.ResponseHeader{}
656	trace := traceutil.New("compact",
657		a.s.getLogger(),
658		traceutil.Field{Key: "revision", Value: compaction.Revision},
659	)
660
661	ch, err := a.s.KV().Compact(trace, compaction.Revision)
662	if err != nil {
663		return nil, ch, nil, err
664	}
665	// get the current revision. which key to get is not important.
666	rr, _ := a.s.KV().Range(context.TODO(), []byte("compaction"), nil, mvcc.RangeOptions{})
667	resp.Header.Revision = rr.Rev
668	return resp, ch, trace, err
669}
670
671func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
672	l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
673	resp := &pb.LeaseGrantResponse{}
674	if err == nil {
675		resp.ID = int64(l.ID)
676		resp.TTL = l.TTL()
677		resp.Header = newHeader(a.s)
678	}
679	return resp, err
680}
681
682func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
683	err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
684	return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
685}
686
687func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
688	for _, c := range lc.Checkpoints {
689		err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
690		if err != nil {
691			return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err
692		}
693	}
694	return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil
695}
696
697func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
698	resp := &pb.AlarmResponse{}
699	oldCount := len(a.s.alarmStore.Get(ar.Alarm))
700
701	lg := a.s.getLogger()
702	switch ar.Action {
703	case pb.AlarmRequest_GET:
704		resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
705	case pb.AlarmRequest_ACTIVATE:
706		m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm)
707		if m == nil {
708			break
709		}
710		resp.Alarms = append(resp.Alarms, m)
711		activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
712		if !activated {
713			break
714		}
715
716		lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
717		switch m.Alarm {
718		case pb.AlarmType_CORRUPT:
719			a.s.applyV3 = newApplierV3Corrupt(a)
720		case pb.AlarmType_NOSPACE:
721			a.s.applyV3 = newApplierV3Capped(a)
722		default:
723			lg.Warn("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
724		}
725	case pb.AlarmRequest_DEACTIVATE:
726		m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
727		if m == nil {
728			break
729		}
730		resp.Alarms = append(resp.Alarms, m)
731		deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
732		if !deactivated {
733			break
734		}
735
736		switch m.Alarm {
737		case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
738			// TODO: check kv hash before deactivating CORRUPT?
739			lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
740			a.s.applyV3 = a.s.newApplierV3()
741		default:
742			lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
743		}
744	default:
745		return nil, nil
746	}
747	return resp, nil
748}
749
750type applierV3Capped struct {
751	applierV3
752	q backendQuota
753}
754
755// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
756// with Puts so that the number of keys in the store is capped.
757func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
758
759func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
760	return nil, nil, ErrNoSpace
761}
762
763func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
764	if a.q.Cost(r) > 0 {
765		return nil, nil, ErrNoSpace
766	}
767	return a.applierV3.Txn(ctx, r)
768}
769
770func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
771	return nil, ErrNoSpace
772}
773
774func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
775	err := a.s.AuthStore().AuthEnable()
776	if err != nil {
777		return nil, err
778	}
779	return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
780}
781
782func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
783	a.s.AuthStore().AuthDisable()
784	return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
785}
786
787func (a *applierV3backend) AuthStatus() (*pb.AuthStatusResponse, error) {
788	enabled := a.s.AuthStore().IsAuthEnabled()
789	authRevision := a.s.AuthStore().Revision()
790	return &pb.AuthStatusResponse{Header: newHeader(a.s), Enabled: enabled, AuthRevision: authRevision}, nil
791}
792
793func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
794	ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken)
795	resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
796	if resp != nil {
797		resp.Header = newHeader(a.s)
798	}
799	return resp, err
800}
801
802func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
803	resp, err := a.s.AuthStore().UserAdd(r)
804	if resp != nil {
805		resp.Header = newHeader(a.s)
806	}
807	return resp, err
808}
809
810func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
811	resp, err := a.s.AuthStore().UserDelete(r)
812	if resp != nil {
813		resp.Header = newHeader(a.s)
814	}
815	return resp, err
816}
817
818func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
819	resp, err := a.s.AuthStore().UserChangePassword(r)
820	if resp != nil {
821		resp.Header = newHeader(a.s)
822	}
823	return resp, err
824}
825
826func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
827	resp, err := a.s.AuthStore().UserGrantRole(r)
828	if resp != nil {
829		resp.Header = newHeader(a.s)
830	}
831	return resp, err
832}
833
834func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
835	resp, err := a.s.AuthStore().UserGet(r)
836	if resp != nil {
837		resp.Header = newHeader(a.s)
838	}
839	return resp, err
840}
841
842func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
843	resp, err := a.s.AuthStore().UserRevokeRole(r)
844	if resp != nil {
845		resp.Header = newHeader(a.s)
846	}
847	return resp, err
848}
849
850func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
851	resp, err := a.s.AuthStore().RoleAdd(r)
852	if resp != nil {
853		resp.Header = newHeader(a.s)
854	}
855	return resp, err
856}
857
858func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
859	resp, err := a.s.AuthStore().RoleGrantPermission(r)
860	if resp != nil {
861		resp.Header = newHeader(a.s)
862	}
863	return resp, err
864}
865
866func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
867	resp, err := a.s.AuthStore().RoleGet(r)
868	if resp != nil {
869		resp.Header = newHeader(a.s)
870	}
871	return resp, err
872}
873
874func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
875	resp, err := a.s.AuthStore().RoleRevokePermission(r)
876	if resp != nil {
877		resp.Header = newHeader(a.s)
878	}
879	return resp, err
880}
881
882func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
883	resp, err := a.s.AuthStore().RoleDelete(r)
884	if resp != nil {
885		resp.Header = newHeader(a.s)
886	}
887	return resp, err
888}
889
890func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
891	resp, err := a.s.AuthStore().UserList(r)
892	if resp != nil {
893		resp.Header = newHeader(a.s)
894	}
895	return resp, err
896}
897
898func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
899	resp, err := a.s.AuthStore().RoleList(r)
900	if resp != nil {
901		resp.Header = newHeader(a.s)
902	}
903	return resp, err
904}
905
906func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) {
907	a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability)
908}
909
910func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) {
911	a.s.cluster.UpdateAttributes(
912		types.ID(r.Member_ID),
913		membership.Attributes{
914			Name:       r.MemberAttributes.Name,
915			ClientURLs: r.MemberAttributes.ClientUrls,
916		},
917	)
918}
919
920func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest) {
921	d := membership.DowngradeInfo{Enabled: false}
922	if r.Enabled {
923		d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
924	}
925	a.s.cluster.SetDowngradeInfo(&d)
926}
927
928type quotaApplierV3 struct {
929	applierV3
930	q Quota
931}
932
933func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
934	return &quotaApplierV3{app, NewBackendQuota(s, "v3-applier")}
935}
936
937func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
938	ok := a.q.Available(p)
939	resp, trace, err := a.applierV3.Put(ctx, txn, p)
940	if err == nil && !ok {
941		err = ErrNoSpace
942	}
943	return resp, trace, err
944}
945
946func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
947	ok := a.q.Available(rt)
948	resp, trace, err := a.applierV3.Txn(ctx, rt)
949	if err == nil && !ok {
950		err = ErrNoSpace
951	}
952	return resp, trace, err
953}
954
955func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
956	ok := a.q.Available(lc)
957	resp, err := a.applierV3.LeaseGrant(lc)
958	if err == nil && !ok {
959		err = ErrNoSpace
960	}
961	return resp, err
962}
963
964type kvSort struct{ kvs []mvccpb.KeyValue }
965
966func (s *kvSort) Swap(i, j int) {
967	t := s.kvs[i]
968	s.kvs[i] = s.kvs[j]
969	s.kvs[j] = t
970}
971func (s *kvSort) Len() int { return len(s.kvs) }
972
973type kvSortByKey struct{ *kvSort }
974
975func (s *kvSortByKey) Less(i, j int) bool {
976	return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
977}
978
979type kvSortByVersion struct{ *kvSort }
980
981func (s *kvSortByVersion) Less(i, j int) bool {
982	return (s.kvs[i].Version - s.kvs[j].Version) < 0
983}
984
985type kvSortByCreate struct{ *kvSort }
986
987func (s *kvSortByCreate) Less(i, j int) bool {
988	return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
989}
990
991type kvSortByMod struct{ *kvSort }
992
993func (s *kvSortByMod) Less(i, j int) bool {
994	return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
995}
996
997type kvSortByValue struct{ *kvSort }
998
999func (s *kvSortByValue) Less(i, j int) bool {
1000	return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
1001}
1002
1003func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
1004	txnCount := 0
1005	reqs := rt.Success
1006	if !txnPath[0] {
1007		reqs = rt.Failure
1008	}
1009	for _, req := range reqs {
1010		if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
1011			txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
1012			if err != nil {
1013				return 0, err
1014			}
1015			txnCount += txns + 1
1016			txnPath = txnPath[txns+1:]
1017			continue
1018		}
1019		if err := f(rv, req); err != nil {
1020			return 0, err
1021		}
1022	}
1023	return txnCount, nil
1024}
1025
1026func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
1027	tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
1028	if !ok || tv.RequestPut == nil {
1029		return nil
1030	}
1031	req := tv.RequestPut
1032	if req.IgnoreValue || req.IgnoreLease {
1033		// expects previous key-value, error if not exist
1034		rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
1035		if err != nil {
1036			return err
1037		}
1038		if rr == nil || len(rr.KVs) == 0 {
1039			return ErrKeyNotFound
1040		}
1041	}
1042	if lease.LeaseID(req.Lease) != lease.NoLease {
1043		if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
1044			return lease.ErrLeaseNotFound
1045		}
1046	}
1047	return nil
1048}
1049
1050func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
1051	tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
1052	if !ok || tv.RequestRange == nil {
1053		return nil
1054	}
1055	req := tv.RequestRange
1056	switch {
1057	case req.Revision == 0:
1058		return nil
1059	case req.Revision > rv.Rev():
1060		return mvcc.ErrFutureRev
1061	case req.Revision < rv.FirstRev():
1062		return mvcc.ErrCompacted
1063	}
1064	return nil
1065}
1066
1067func compareInt64(a, b int64) int {
1068	switch {
1069	case a < b:
1070		return -1
1071	case a > b:
1072		return 1
1073	default:
1074		return 0
1075	}
1076}
1077
1078// mkGteRange determines if the range end is a >= range. This works around grpc
1079// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
1080// If it is a GTE range, then []byte{} is returned to indicate the empty byte
1081// string (vs nil being no byte string).
1082func mkGteRange(rangeEnd []byte) []byte {
1083	if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
1084		return []byte{}
1085	}
1086	return rangeEnd
1087}
1088
1089func noSideEffect(r *pb.InternalRaftRequest) bool {
1090	return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
1091}
1092
1093func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
1094	f := func(ops []*pb.RequestOp) []*pb.RequestOp {
1095		j := 0
1096		for i := 0; i < len(ops); i++ {
1097			if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
1098				continue
1099			}
1100			ops[j] = ops[i]
1101			j++
1102		}
1103
1104		return ops[:j]
1105	}
1106
1107	txn.Success = f(txn.Success)
1108	txn.Failure = f(txn.Failure)
1109}
1110
1111func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
1112	j := 0
1113	for i := range rr.KVs {
1114		rr.KVs[j] = rr.KVs[i]
1115		if !isPrunable(&rr.KVs[i]) {
1116			j++
1117		}
1118	}
1119	rr.KVs = rr.KVs[:j]
1120}
1121
1122func newHeader(s *EtcdServer) *pb.ResponseHeader {
1123	return &pb.ResponseHeader{
1124		ClusterId: uint64(s.Cluster().ID()),
1125		MemberId:  uint64(s.ID()),
1126		Revision:  s.KV().Rev(),
1127		RaftTerm:  s.Term(),
1128	}
1129}
1130