1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"bytes"
19	"context"
20	"encoding/binary"
21	"time"
22
23	"github.com/coreos/etcd/auth"
24	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25	"github.com/coreos/etcd/etcdserver/membership"
26	"github.com/coreos/etcd/lease"
27	"github.com/coreos/etcd/lease/leasehttp"
28	"github.com/coreos/etcd/mvcc"
29	"github.com/coreos/etcd/raft"
30
31	"github.com/gogo/protobuf/proto"
32)
33
34const (
35	// In the health case, there might be a small gap (10s of entries) between
36	// the applied index and committed index.
37	// However, if the committed entries are very heavy to apply, the gap might grow.
38	// We should stop accepting new proposals if the gap growing to a certain point.
39	maxGapBetweenApplyAndCommitIndex = 5000
40)
41
42type RaftKV interface {
43	Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
44	Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
45	DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
46	Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
47	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
48}
49
50type Lessor interface {
51	// LeaseGrant sends LeaseGrant request to raft and apply it after committed.
52	LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
53	// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed.
54	LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
55
56	// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
57	// is returned.
58	LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
59
60	// LeaseTimeToLive retrieves lease information.
61	LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
62
63	// LeaseLeases lists all leases.
64	LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
65}
66
67type Authenticator interface {
68	AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
69	AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
70	Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
71	UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
72	UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
73	UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
74	UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
75	UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
76	UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
77	RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
78	RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
79	RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
80	RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
81	RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
82	UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
83	RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
84}
85
86func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
87	var resp *pb.RangeResponse
88	var err error
89	defer func(start time.Time) {
90		warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
91	}(time.Now())
92
93	if !r.Serializable {
94		err = s.linearizableReadNotify(ctx)
95		if err != nil {
96			return nil, err
97		}
98	}
99	chk := func(ai *auth.AuthInfo) error {
100		return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
101	}
102
103	get := func() { resp, err = s.applyV3Base.Range(nil, r) }
104	if serr := s.doSerialize(ctx, chk, get); serr != nil {
105		err = serr
106		return nil, err
107	}
108	return resp, err
109}
110
111func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
112	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
113	if err != nil {
114		return nil, err
115	}
116	return resp.(*pb.PutResponse), nil
117}
118
119func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
120	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
121	if err != nil {
122		return nil, err
123	}
124	return resp.(*pb.DeleteRangeResponse), nil
125}
126
127func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
128	if isTxnReadonly(r) {
129		if !isTxnSerializable(r) {
130			err := s.linearizableReadNotify(ctx)
131			if err != nil {
132				return nil, err
133			}
134		}
135		var resp *pb.TxnResponse
136		var err error
137		chk := func(ai *auth.AuthInfo) error {
138			return checkTxnAuth(s.authStore, ai, r)
139		}
140
141		defer func(start time.Time) {
142			warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
143		}(time.Now())
144
145		get := func() { resp, err = s.applyV3Base.Txn(r) }
146		if serr := s.doSerialize(ctx, chk, get); serr != nil {
147			return nil, serr
148		}
149		return resp, err
150	}
151
152	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
153	if err != nil {
154		return nil, err
155	}
156	return resp.(*pb.TxnResponse), nil
157}
158
159func isTxnSerializable(r *pb.TxnRequest) bool {
160	for _, u := range r.Success {
161		if r := u.GetRequestRange(); r == nil || !r.Serializable {
162			return false
163		}
164	}
165	for _, u := range r.Failure {
166		if r := u.GetRequestRange(); r == nil || !r.Serializable {
167			return false
168		}
169	}
170	return true
171}
172
173func isTxnReadonly(r *pb.TxnRequest) bool {
174	for _, u := range r.Success {
175		if r := u.GetRequestRange(); r == nil {
176			return false
177		}
178	}
179	for _, u := range r.Failure {
180		if r := u.GetRequestRange(); r == nil {
181			return false
182		}
183	}
184	return true
185}
186
187func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
188	result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
189	if r.Physical && result != nil && result.physc != nil {
190		<-result.physc
191		// The compaction is done deleting keys; the hash is now settled
192		// but the data is not necessarily committed. If there's a crash,
193		// the hash may revert to a hash prior to compaction completing
194		// if the compaction resumes. Force the finished compaction to
195		// commit so it won't resume following a crash.
196		s.be.ForceCommit()
197	}
198	if err != nil {
199		return nil, err
200	}
201	if result.err != nil {
202		return nil, result.err
203	}
204	resp := result.resp.(*pb.CompactionResponse)
205	if resp == nil {
206		resp = &pb.CompactionResponse{}
207	}
208	if resp.Header == nil {
209		resp.Header = &pb.ResponseHeader{}
210	}
211	resp.Header.Revision = s.kv.Rev()
212	return resp, nil
213}
214
215func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
216	// no id given? choose one
217	for r.ID == int64(lease.NoLease) {
218		// only use positive int64 id's
219		r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
220	}
221	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
222	if err != nil {
223		return nil, err
224	}
225	return resp.(*pb.LeaseGrantResponse), nil
226}
227
228func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
229	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
230	if err != nil {
231		return nil, err
232	}
233	return resp.(*pb.LeaseRevokeResponse), nil
234}
235
236func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
237	ttl, err := s.lessor.Renew(id)
238	if err == nil { // already requested to primary lessor(leader)
239		return ttl, nil
240	}
241	if err != lease.ErrNotPrimary {
242		return -1, err
243	}
244
245	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
246	defer cancel()
247
248	// renewals don't go through raft; forward to leader manually
249	for cctx.Err() == nil && err != nil {
250		leader, lerr := s.waitLeader(cctx)
251		if lerr != nil {
252			return -1, lerr
253		}
254		for _, url := range leader.PeerURLs {
255			lurl := url + leasehttp.LeasePrefix
256			ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
257			if err == nil || err == lease.ErrLeaseNotFound {
258				return ttl, err
259			}
260		}
261	}
262	return -1, ErrTimeout
263}
264
265func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
266	if s.Leader() == s.ID() {
267		// primary; timetolive directly from leader
268		le := s.lessor.Lookup(lease.LeaseID(r.ID))
269		if le == nil {
270			return nil, lease.ErrLeaseNotFound
271		}
272		// TODO: fill out ResponseHeader
273		resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
274		if r.Keys {
275			ks := le.Keys()
276			kbs := make([][]byte, len(ks))
277			for i := range ks {
278				kbs[i] = []byte(ks[i])
279			}
280			resp.Keys = kbs
281		}
282		return resp, nil
283	}
284
285	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
286	defer cancel()
287
288	// forward to leader
289	for cctx.Err() == nil {
290		leader, err := s.waitLeader(cctx)
291		if err != nil {
292			return nil, err
293		}
294		for _, url := range leader.PeerURLs {
295			lurl := url + leasehttp.LeaseInternalPrefix
296			resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
297			if err == nil {
298				return resp.LeaseTimeToLiveResponse, nil
299			}
300			if err == lease.ErrLeaseNotFound {
301				return nil, err
302			}
303		}
304	}
305	return nil, ErrTimeout
306}
307
308func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
309	ls := s.lessor.Leases()
310	lss := make([]*pb.LeaseStatus, len(ls))
311	for i := range ls {
312		lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
313	}
314	return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
315}
316
317func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
318	leader := s.cluster.Member(s.Leader())
319	for leader == nil {
320		// wait an election
321		dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
322		select {
323		case <-time.After(dur):
324			leader = s.cluster.Member(s.Leader())
325		case <-s.stopping:
326			return nil, ErrStopped
327		case <-ctx.Done():
328			return nil, ErrNoLeader
329		}
330	}
331	if leader == nil || len(leader.PeerURLs) == 0 {
332		return nil, ErrNoLeader
333	}
334	return leader, nil
335}
336
337func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
338	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
339	if err != nil {
340		return nil, err
341	}
342	return resp.(*pb.AlarmResponse), nil
343}
344
345func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
346	resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
347	if err != nil {
348		return nil, err
349	}
350	return resp.(*pb.AuthEnableResponse), nil
351}
352
353func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
354	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
355	if err != nil {
356		return nil, err
357	}
358	return resp.(*pb.AuthDisableResponse), nil
359}
360
361func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
362	if err := s.linearizableReadNotify(ctx); err != nil {
363		return nil, err
364	}
365
366	var resp proto.Message
367	for {
368		checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
369		if err != nil {
370			if err != auth.ErrAuthNotEnabled {
371				plog.Errorf("invalid authentication request to user %s was issued", r.Name)
372			}
373			return nil, err
374		}
375
376		st, err := s.AuthStore().GenTokenPrefix()
377		if err != nil {
378			return nil, err
379		}
380
381		internalReq := &pb.InternalAuthenticateRequest{
382			Name:        r.Name,
383			Password:    r.Password,
384			SimpleToken: st,
385		}
386
387		resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
388		if err != nil {
389			return nil, err
390		}
391		if checkedRevision == s.AuthStore().Revision() {
392			break
393		}
394		plog.Infof("revision when password checked is obsolete, retrying")
395	}
396
397	return resp.(*pb.AuthenticateResponse), nil
398}
399
400func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
401	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
402	if err != nil {
403		return nil, err
404	}
405	return resp.(*pb.AuthUserAddResponse), nil
406}
407
408func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
409	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
410	if err != nil {
411		return nil, err
412	}
413	return resp.(*pb.AuthUserDeleteResponse), nil
414}
415
416func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
417	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
418	if err != nil {
419		return nil, err
420	}
421	return resp.(*pb.AuthUserChangePasswordResponse), nil
422}
423
424func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
425	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
426	if err != nil {
427		return nil, err
428	}
429	return resp.(*pb.AuthUserGrantRoleResponse), nil
430}
431
432func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
433	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
434	if err != nil {
435		return nil, err
436	}
437	return resp.(*pb.AuthUserGetResponse), nil
438}
439
440func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
441	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
442	if err != nil {
443		return nil, err
444	}
445	return resp.(*pb.AuthUserListResponse), nil
446}
447
448func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
449	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
450	if err != nil {
451		return nil, err
452	}
453	return resp.(*pb.AuthUserRevokeRoleResponse), nil
454}
455
456func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
457	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
458	if err != nil {
459		return nil, err
460	}
461	return resp.(*pb.AuthRoleAddResponse), nil
462}
463
464func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
465	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
466	if err != nil {
467		return nil, err
468	}
469	return resp.(*pb.AuthRoleGrantPermissionResponse), nil
470}
471
472func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
473	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
474	if err != nil {
475		return nil, err
476	}
477	return resp.(*pb.AuthRoleGetResponse), nil
478}
479
480func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
481	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
482	if err != nil {
483		return nil, err
484	}
485	return resp.(*pb.AuthRoleListResponse), nil
486}
487
488func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
489	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
490	if err != nil {
491		return nil, err
492	}
493	return resp.(*pb.AuthRoleRevokePermissionResponse), nil
494}
495
496func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
497	resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
498	if err != nil {
499		return nil, err
500	}
501	return resp.(*pb.AuthRoleDeleteResponse), nil
502}
503
504func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
505	result, err := s.processInternalRaftRequestOnce(ctx, r)
506	if err != nil {
507		return nil, err
508	}
509	if result.err != nil {
510		return nil, result.err
511	}
512	return result.resp, nil
513}
514
515func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
516	return s.raftRequestOnce(ctx, r)
517}
518
519// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure.
520func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
521	ai, err := s.AuthInfoFromCtx(ctx)
522	if err != nil {
523		return err
524	}
525	if ai == nil {
526		// chk expects non-nil AuthInfo; use empty credentials
527		ai = &auth.AuthInfo{}
528	}
529	if err = chk(ai); err != nil {
530		return err
531	}
532	// fetch response for serialized request
533	get()
534	// check for stale token revision in case the auth store was updated while
535	// the request has been handled.
536	if ai.Revision != 0 && ai.Revision != s.authStore.Revision() {
537		return auth.ErrAuthOldRevision
538	}
539	return nil
540}
541
542func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
543	ai := s.getAppliedIndex()
544	ci := s.getCommittedIndex()
545	if ci > ai+maxGapBetweenApplyAndCommitIndex {
546		return nil, ErrTooManyRequests
547	}
548
549	r.Header = &pb.RequestHeader{
550		ID: s.reqIDGen.Next(),
551	}
552
553	authInfo, err := s.AuthInfoFromCtx(ctx)
554	if err != nil {
555		return nil, err
556	}
557	if authInfo != nil {
558		r.Header.Username = authInfo.Username
559		r.Header.AuthRevision = authInfo.Revision
560	}
561
562	data, err := r.Marshal()
563	if err != nil {
564		return nil, err
565	}
566
567	if len(data) > int(s.Cfg.MaxRequestBytes) {
568		return nil, ErrRequestTooLarge
569	}
570
571	id := r.ID
572	if id == 0 {
573		id = r.Header.ID
574	}
575	ch := s.w.Register(id)
576
577	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
578	defer cancel()
579
580	start := time.Now()
581	s.r.Propose(cctx, data)
582	proposalsPending.Inc()
583	defer proposalsPending.Dec()
584
585	select {
586	case x := <-ch:
587		return x.(*applyResult), nil
588	case <-cctx.Done():
589		proposalsFailed.Inc()
590		s.w.Trigger(id, nil) // GC wait
591		return nil, s.parseProposeCtxErr(cctx.Err(), start)
592	case <-s.done:
593		return nil, ErrStopped
594	}
595}
596
597// Watchable returns a watchable interface attached to the etcdserver.
598func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
599
600func (s *EtcdServer) linearizableReadLoop() {
601	var rs raft.ReadState
602
603	for {
604		ctxToSend := make([]byte, 8)
605		id1 := s.reqIDGen.Next()
606		binary.BigEndian.PutUint64(ctxToSend, id1)
607
608		leaderChangedNotifier := s.leaderChangedNotify()
609		select {
610		case <-leaderChangedNotifier:
611			continue
612		case <-s.readwaitc:
613		case <-s.stopping:
614			return
615		}
616
617		nextnr := newNotifier()
618
619		s.readMu.Lock()
620		nr := s.readNotifier
621		s.readNotifier = nextnr
622		s.readMu.Unlock()
623
624		cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
625		if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
626			cancel()
627			if err == raft.ErrStopped {
628				return
629			}
630			plog.Errorf("failed to get read index from raft: %v", err)
631			readIndexFailed.Inc()
632			nr.notify(err)
633			continue
634		}
635		cancel()
636
637		var (
638			timeout bool
639			done    bool
640		)
641		for !timeout && !done {
642			select {
643			case rs = <-s.r.readStateC:
644				done = bytes.Equal(rs.RequestCtx, ctxToSend)
645				if !done {
646					// a previous request might time out. now we should ignore the response of it and
647					// continue waiting for the response of the current requests.
648					id2 := uint64(0)
649					if len(rs.RequestCtx) == 8 {
650						id2 = binary.BigEndian.Uint64(rs.RequestCtx)
651					}
652					plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
653					slowReadIndex.Inc()
654				}
655
656			case <-leaderChangedNotifier:
657				timeout = true
658				readIndexFailed.Inc()
659				// return a retryable error.
660				nr.notify(ErrLeaderChanged)
661
662			case <-time.After(s.Cfg.ReqTimeout()):
663				plog.Warningf("timed out waiting for read index response (local node might have slow network)")
664				nr.notify(ErrTimeout)
665				timeout = true
666				slowReadIndex.Inc()
667
668			case <-s.stopping:
669				return
670			}
671		}
672		if !done {
673			continue
674		}
675
676		if ai := s.getAppliedIndex(); ai < rs.Index {
677			select {
678			case <-s.applyWait.Wait(rs.Index):
679			case <-s.stopping:
680				return
681			}
682		}
683		// unblock all l-reads requested at indices before rs.Index
684		nr.notify(nil)
685	}
686}
687
688func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
689	s.readMu.RLock()
690	nc := s.readNotifier
691	s.readMu.RUnlock()
692
693	// signal linearizable loop for current notify if it hasn't been already
694	select {
695	case s.readwaitc <- struct{}{}:
696	default:
697	}
698
699	// wait for read state notification
700	select {
701	case <-nc.c:
702		return nc.err
703	case <-ctx.Done():
704		return ctx.Err()
705	case <-s.done:
706		return ErrStopped
707	}
708}
709
710func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
711	authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
712	if authInfo != nil || err != nil {
713		return authInfo, err
714	}
715	if !s.Cfg.ClientCertAuthEnabled {
716		return nil, nil
717	}
718	authInfo = s.AuthStore().AuthInfoFromTLS(ctx)
719	return authInfo, nil
720}
721