1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"bytes"
19	"context"
20	"encoding/binary"
21	"time"
22
23	"go.etcd.io/etcd/auth"
24	"go.etcd.io/etcd/etcdserver/api/membership"
25	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26	"go.etcd.io/etcd/lease"
27	"go.etcd.io/etcd/lease/leasehttp"
28	"go.etcd.io/etcd/mvcc"
29	"go.etcd.io/etcd/pkg/traceutil"
30	"go.etcd.io/etcd/raft"
31
32	"github.com/gogo/protobuf/proto"
33	"go.uber.org/zap"
34)
35
36const (
37	// In the health case, there might be a small gap (10s of entries) between
38	// the applied index and committed index.
39	// However, if the committed entries are very heavy to apply, the gap might grow.
40	// We should stop accepting new proposals if the gap growing to a certain point.
41	maxGapBetweenApplyAndCommitIndex = 5000
42	traceThreshold                   = 100 * time.Millisecond
43)
44
45type RaftKV interface {
46	Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
47	Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
48	DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
49	Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
50	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
51}
52
53type Lessor interface {
54	// LeaseGrant sends LeaseGrant request to raft and apply it after committed.
55	LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
56	// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
57	LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
58
59	// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
60	// is returned.
61	LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
62
63	// LeaseTimeToLive retrieves lease information.
64	LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
65
66	// LeaseLeases lists all leases.
67	LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
68}
69
70type Authenticator interface {
71	AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
72	AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
73	Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
74	UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
75	UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
76	UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
77	UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
78	UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
79	UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
80	RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
81	RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
82	RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
83	RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
84	RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
85	UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
86	RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
87}
88
89func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
90	trace := traceutil.New("range",
91		s.getLogger(),
92		traceutil.Field{Key: "range_begin", Value: string(r.Key)},
93		traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
94	)
95	ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
96
97	var resp *pb.RangeResponse
98	var err error
99	defer func(start time.Time) {
100		warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
101		if resp != nil {
102			trace.AddField(
103				traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
104				traceutil.Field{Key: "response_revision", Value: resp.Header.Revision},
105			)
106		}
107		trace.LogIfLong(traceThreshold)
108	}(time.Now())
109
110	if !r.Serializable {
111		err = s.linearizableReadNotify(ctx)
112		trace.Step("agreement among raft nodes before linearized reading")
113		if err != nil {
114			return nil, err
115		}
116	}
117	chk := func(ai *auth.AuthInfo) error {
118		return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
119	}
120
121	get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
122	if serr := s.doSerialize(ctx, chk, get); serr != nil {
123		err = serr
124		return nil, err
125	}
126	return resp, err
127}
128
129func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
130	ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
131	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
132	if err != nil {
133		return nil, err
134	}
135	return resp.(*pb.PutResponse), nil
136}
137
138func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
139	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
140	if err != nil {
141		return nil, err
142	}
143	return resp.(*pb.DeleteRangeResponse), nil
144}
145
146func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
147	if isTxnReadonly(r) {
148		if !isTxnSerializable(r) {
149			err := s.linearizableReadNotify(ctx)
150			if err != nil {
151				return nil, err
152			}
153		}
154		var resp *pb.TxnResponse
155		var err error
156		chk := func(ai *auth.AuthInfo) error {
157			return checkTxnAuth(s.authStore, ai, r)
158		}
159
160		defer func(start time.Time) {
161			warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
162		}(time.Now())
163
164		get := func() { resp, err = s.applyV3Base.Txn(r) }
165		if serr := s.doSerialize(ctx, chk, get); serr != nil {
166			return nil, serr
167		}
168		return resp, err
169	}
170
171	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
172	if err != nil {
173		return nil, err
174	}
175	return resp.(*pb.TxnResponse), nil
176}
177
178func isTxnSerializable(r *pb.TxnRequest) bool {
179	for _, u := range r.Success {
180		if r := u.GetRequestRange(); r == nil || !r.Serializable {
181			return false
182		}
183	}
184	for _, u := range r.Failure {
185		if r := u.GetRequestRange(); r == nil || !r.Serializable {
186			return false
187		}
188	}
189	return true
190}
191
192func isTxnReadonly(r *pb.TxnRequest) bool {
193	for _, u := range r.Success {
194		if r := u.GetRequestRange(); r == nil {
195			return false
196		}
197	}
198	for _, u := range r.Failure {
199		if r := u.GetRequestRange(); r == nil {
200			return false
201		}
202	}
203	return true
204}
205
206func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
207	startTime := time.Now()
208	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
209	trace := traceutil.TODO()
210	if result != nil && result.trace != nil {
211		trace = result.trace
212		defer func() {
213			trace.LogIfLong(traceThreshold)
214		}()
215		applyStart := result.trace.GetStartTime()
216		result.trace.SetStartTime(startTime)
217		trace.InsertStep(0, applyStart, "process raft request")
218	}
219	if r.Physical && result != nil && result.physc != nil {
220		<-result.physc
221		// The compaction is done deleting keys; the hash is now settled
222		// but the data is not necessarily committed. If there's a crash,
223		// the hash may revert to a hash prior to compaction completing
224		// if the compaction resumes. Force the finished compaction to
225		// commit so it won't resume following a crash.
226		s.be.ForceCommit()
227		trace.Step("physically apply compaction")
228	}
229	if err != nil {
230		return nil, err
231	}
232	if result.err != nil {
233		return nil, result.err
234	}
235	resp := result.resp.(*pb.CompactionResponse)
236	if resp == nil {
237		resp = &pb.CompactionResponse{}
238	}
239	if resp.Header == nil {
240		resp.Header = &pb.ResponseHeader{}
241	}
242	resp.Header.Revision = s.kv.Rev()
243	trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
244	return resp, nil
245}
246
247func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
248	// no id given? choose one
249	for r.ID == int64(lease.NoLease) {
250		// only use positive int64 id's
251		r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
252	}
253	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
254	if err != nil {
255		return nil, err
256	}
257	return resp.(*pb.LeaseGrantResponse), nil
258}
259
260func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
261	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
262	if err != nil {
263		return nil, err
264	}
265	return resp.(*pb.LeaseRevokeResponse), nil
266}
267
268func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
269	ttl, err := s.lessor.Renew(id)
270	if err == nil { // already requested to primary lessor(leader)
271		return ttl, nil
272	}
273	if err != lease.ErrNotPrimary {
274		return -1, err
275	}
276
277	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
278	defer cancel()
279
280	// renewals don't go through raft; forward to leader manually
281	for cctx.Err() == nil && err != nil {
282		leader, lerr := s.waitLeader(cctx)
283		if lerr != nil {
284			return -1, lerr
285		}
286		for _, url := range leader.PeerURLs {
287			lurl := url + leasehttp.LeasePrefix
288			ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
289			if err == nil || err == lease.ErrLeaseNotFound {
290				return ttl, err
291			}
292		}
293	}
294
295	if cctx.Err() == context.DeadlineExceeded {
296		return -1, ErrTimeout
297	}
298	return -1, ErrCanceled
299}
300
301func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
302	if s.Leader() == s.ID() {
303		// primary; timetolive directly from leader
304		le := s.lessor.Lookup(lease.LeaseID(r.ID))
305		if le == nil {
306			return nil, lease.ErrLeaseNotFound
307		}
308		// TODO: fill out ResponseHeader
309		resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
310		if r.Keys {
311			ks := le.Keys()
312			kbs := make([][]byte, len(ks))
313			for i := range ks {
314				kbs[i] = []byte(ks[i])
315			}
316			resp.Keys = kbs
317		}
318		return resp, nil
319	}
320
321	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
322	defer cancel()
323
324	// forward to leader
325	for cctx.Err() == nil {
326		leader, err := s.waitLeader(cctx)
327		if err != nil {
328			return nil, err
329		}
330		for _, url := range leader.PeerURLs {
331			lurl := url + leasehttp.LeaseInternalPrefix
332			resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
333			if err == nil {
334				return resp.LeaseTimeToLiveResponse, nil
335			}
336			if err == lease.ErrLeaseNotFound {
337				return nil, err
338			}
339		}
340	}
341
342	if cctx.Err() == context.DeadlineExceeded {
343		return nil, ErrTimeout
344	}
345	return nil, ErrCanceled
346}
347
348func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
349	ls := s.lessor.Leases()
350	lss := make([]*pb.LeaseStatus, len(ls))
351	for i := range ls {
352		lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
353	}
354	return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
355}
356
357func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
358	leader := s.cluster.Member(s.Leader())
359	for leader == nil {
360		// wait an election
361		dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
362		select {
363		case <-time.After(dur):
364			leader = s.cluster.Member(s.Leader())
365		case <-s.stopping:
366			return nil, ErrStopped
367		case <-ctx.Done():
368			return nil, ErrNoLeader
369		}
370	}
371	if leader == nil || len(leader.PeerURLs) == 0 {
372		return nil, ErrNoLeader
373	}
374	return leader, nil
375}
376
377func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
378	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
379	if err != nil {
380		return nil, err
381	}
382	return resp.(*pb.AlarmResponse), nil
383}
384
385func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
386	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
387	if err != nil {
388		return nil, err
389	}
390	return resp.(*pb.AuthEnableResponse), nil
391}
392
393func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
394	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
395	if err != nil {
396		return nil, err
397	}
398	return resp.(*pb.AuthDisableResponse), nil
399}
400
401func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
402	if err := s.linearizableReadNotify(ctx); err != nil {
403		return nil, err
404	}
405
406	lg := s.getLogger()
407
408	var resp proto.Message
409	for {
410		checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
411		if err != nil {
412			if err != auth.ErrAuthNotEnabled {
413				if lg != nil {
414					lg.Warn(
415						"invalid authentication was requested",
416						zap.String("user", r.Name),
417						zap.Error(err),
418					)
419				} else {
420					plog.Errorf("invalid authentication request to user %s was issued", r.Name)
421				}
422			}
423			return nil, err
424		}
425
426		st, err := s.AuthStore().GenTokenPrefix()
427		if err != nil {
428			return nil, err
429		}
430
431		// internalReq doesn't need to have Password because the above s.AuthStore().CheckPassword() already did it.
432		// In addition, it will let a WAL entry not record password as a plain text.
433		internalReq := &pb.InternalAuthenticateRequest{
434			Name:        r.Name,
435			SimpleToken: st,
436		}
437
438		resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
439		if err != nil {
440			return nil, err
441		}
442		if checkedRevision == s.AuthStore().Revision() {
443			break
444		}
445
446		if lg != nil {
447			lg.Info("revision when password checked became stale; retrying")
448		} else {
449			plog.Infof("revision when password checked is obsolete, retrying")
450		}
451	}
452
453	return resp.(*pb.AuthenticateResponse), nil
454}
455
456func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
457	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
458	if err != nil {
459		return nil, err
460	}
461	return resp.(*pb.AuthUserAddResponse), nil
462}
463
464func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
465	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
466	if err != nil {
467		return nil, err
468	}
469	return resp.(*pb.AuthUserDeleteResponse), nil
470}
471
472func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
473	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
474	if err != nil {
475		return nil, err
476	}
477	return resp.(*pb.AuthUserChangePasswordResponse), nil
478}
479
480func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
481	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
482	if err != nil {
483		return nil, err
484	}
485	return resp.(*pb.AuthUserGrantRoleResponse), nil
486}
487
488func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
489	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
490	if err != nil {
491		return nil, err
492	}
493	return resp.(*pb.AuthUserGetResponse), nil
494}
495
496func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
497	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
498	if err != nil {
499		return nil, err
500	}
501	return resp.(*pb.AuthUserListResponse), nil
502}
503
504func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
505	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
506	if err != nil {
507		return nil, err
508	}
509	return resp.(*pb.AuthUserRevokeRoleResponse), nil
510}
511
512func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
513	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
514	if err != nil {
515		return nil, err
516	}
517	return resp.(*pb.AuthRoleAddResponse), nil
518}
519
520func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
521	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
522	if err != nil {
523		return nil, err
524	}
525	return resp.(*pb.AuthRoleGrantPermissionResponse), nil
526}
527
528func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
529	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
530	if err != nil {
531		return nil, err
532	}
533	return resp.(*pb.AuthRoleGetResponse), nil
534}
535
536func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
537	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
538	if err != nil {
539		return nil, err
540	}
541	return resp.(*pb.AuthRoleListResponse), nil
542}
543
544func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
545	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
546	if err != nil {
547		return nil, err
548	}
549	return resp.(*pb.AuthRoleRevokePermissionResponse), nil
550}
551
552func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
553	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
554	if err != nil {
555		return nil, err
556	}
557	return resp.(*pb.AuthRoleDeleteResponse), nil
558}
559
560func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
561	result, err := s.processInternalRaftRequestOnce(ctx, r)
562	if err != nil {
563		return nil, err
564	}
565	if result.err != nil {
566		return nil, result.err
567	}
568	if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil {
569		applyStart := result.trace.GetStartTime()
570		// The trace object is created in apply. Here reset the start time to trace
571		// the raft request time by the difference between the request start time
572		// and apply start time
573		result.trace.SetStartTime(startTime)
574		result.trace.InsertStep(0, applyStart, "process raft request")
575		result.trace.LogIfLong(traceThreshold)
576	}
577	return result.resp, nil
578}
579
580func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
581	return s.raftRequestOnce(ctx, r)
582}
583
584// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
585func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
586	trace := traceutil.Get(ctx)
587	ai, err := s.AuthInfoFromCtx(ctx)
588	if err != nil {
589		return err
590	}
591	if ai == nil {
592		// chk expects non-nil AuthInfo; use empty credentials
593		ai = &auth.AuthInfo{}
594	}
595	if err = chk(ai); err != nil {
596		return err
597	}
598	trace.Step("get authentication metadata")
599	// fetch response for serialized request
600	get()
601	// check for stale token revision in case the auth store was updated while
602	// the request has been handled.
603	if ai.Revision != 0 && ai.Revision != s.authStore.Revision() {
604		return auth.ErrAuthOldRevision
605	}
606	return nil
607}
608
609func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
610	ai := s.getAppliedIndex()
611	ci := s.getCommittedIndex()
612	if ci > ai+maxGapBetweenApplyAndCommitIndex {
613		return nil, ErrTooManyRequests
614	}
615
616	r.Header = &pb.RequestHeader{
617		ID: s.reqIDGen.Next(),
618	}
619
620	authInfo, err := s.AuthInfoFromCtx(ctx)
621	if err != nil {
622		return nil, err
623	}
624	if authInfo != nil {
625		r.Header.Username = authInfo.Username
626		r.Header.AuthRevision = authInfo.Revision
627	}
628
629	data, err := r.Marshal()
630	if err != nil {
631		return nil, err
632	}
633
634	if len(data) > int(s.Cfg.MaxRequestBytes) {
635		return nil, ErrRequestTooLarge
636	}
637
638	id := r.ID
639	if id == 0 {
640		id = r.Header.ID
641	}
642	ch := s.w.Register(id)
643
644	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
645	defer cancel()
646
647	start := time.Now()
648	err = s.r.Propose(cctx, data)
649	if err != nil {
650		proposalsFailed.Inc()
651		s.w.Trigger(id, nil) // GC wait
652		return nil, err
653	}
654	proposalsPending.Inc()
655	defer proposalsPending.Dec()
656
657	select {
658	case x := <-ch:
659		return x.(*applyResult), nil
660	case <-cctx.Done():
661		proposalsFailed.Inc()
662		s.w.Trigger(id, nil) // GC wait
663		return nil, s.parseProposeCtxErr(cctx.Err(), start)
664	case <-s.done:
665		return nil, ErrStopped
666	}
667}
668
669// Watchable returns a watchable interface attached to the etcdserver.
670func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
671
672func (s *EtcdServer) linearizableReadLoop() {
673	var rs raft.ReadState
674
675	for {
676		ctxToSend := make([]byte, 8)
677		id1 := s.reqIDGen.Next()
678		binary.BigEndian.PutUint64(ctxToSend, id1)
679		leaderChangedNotifier := s.leaderChangedNotify()
680		select {
681		case <-leaderChangedNotifier:
682			continue
683		case <-s.readwaitc:
684		case <-s.stopping:
685			return
686		}
687
688		nextnr := newNotifier()
689
690		s.readMu.Lock()
691		nr := s.readNotifier
692		s.readNotifier = nextnr
693		s.readMu.Unlock()
694
695		lg := s.getLogger()
696		cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
697		if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
698			cancel()
699			if err == raft.ErrStopped {
700				return
701			}
702			if lg != nil {
703				lg.Warn("failed to get read index from Raft", zap.Error(err))
704			} else {
705				plog.Errorf("failed to get read index from raft: %v", err)
706			}
707			readIndexFailed.Inc()
708			nr.notify(err)
709			continue
710		}
711		cancel()
712
713		var (
714			timeout bool
715			done    bool
716		)
717		for !timeout && !done {
718			select {
719			case rs = <-s.r.readStateC:
720				done = bytes.Equal(rs.RequestCtx, ctxToSend)
721				if !done {
722					// a previous request might time out. now we should ignore the response of it and
723					// continue waiting for the response of the current requests.
724					id2 := uint64(0)
725					if len(rs.RequestCtx) == 8 {
726						id2 = binary.BigEndian.Uint64(rs.RequestCtx)
727					}
728					if lg != nil {
729						lg.Warn(
730							"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
731							zap.Uint64("sent-request-id", id1),
732							zap.Uint64("received-request-id", id2),
733						)
734					} else {
735						plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
736					}
737					slowReadIndex.Inc()
738				}
739			case <-leaderChangedNotifier:
740				timeout = true
741				readIndexFailed.Inc()
742				// return a retryable error.
743				nr.notify(ErrLeaderChanged)
744			case <-time.After(s.Cfg.ReqTimeout()):
745				if lg != nil {
746					lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
747				} else {
748					plog.Warningf("timed out waiting for read index response (local node might have slow network)")
749				}
750				nr.notify(ErrTimeout)
751				timeout = true
752				slowReadIndex.Inc()
753			case <-s.stopping:
754				return
755			}
756		}
757		if !done {
758			continue
759		}
760
761		if ai := s.getAppliedIndex(); ai < rs.Index {
762			select {
763			case <-s.applyWait.Wait(rs.Index):
764			case <-s.stopping:
765				return
766			}
767		}
768		// unblock all l-reads requested at indices before rs.Index
769		nr.notify(nil)
770	}
771}
772
773func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
774	s.readMu.RLock()
775	nc := s.readNotifier
776	s.readMu.RUnlock()
777
778	// signal linearizable loop for current notify if it hasn't been already
779	select {
780	case s.readwaitc <- struct{}{}:
781	default:
782	}
783
784	// wait for read state notification
785	select {
786	case <-nc.c:
787		return nc.err
788	case <-ctx.Done():
789		return ctx.Err()
790	case <-s.done:
791		return ErrStopped
792	}
793}
794
795func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
796	authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
797	if authInfo != nil || err != nil {
798		return authInfo, err
799	}
800	if !s.Cfg.ClientCertAuthEnabled {
801		return nil, nil
802	}
803	authInfo = s.AuthStore().AuthInfoFromTLS(ctx)
804	return authInfo, nil
805
806}
807