1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "bytes" 19 "context" 20 "encoding/binary" 21 "time" 22 23 "github.com/coreos/etcd/auth" 24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 25 "github.com/coreos/etcd/etcdserver/membership" 26 "github.com/coreos/etcd/lease" 27 "github.com/coreos/etcd/lease/leasehttp" 28 "github.com/coreos/etcd/mvcc" 29 "github.com/coreos/etcd/raft" 30 31 "github.com/gogo/protobuf/proto" 32) 33 34const ( 35 // In the health case, there might be a small gap (10s of entries) between 36 // the applied index and committed index. 37 // However, if the committed entries are very heavy to apply, the gap might grow. 38 // We should stop accepting new proposals if the gap growing to a certain point. 39 maxGapBetweenApplyAndCommitIndex = 5000 40) 41 42type RaftKV interface { 43 Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) 44 Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) 45 DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) 46 Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) 47 Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) 48} 49 50type Lessor interface { 51 // LeaseGrant sends LeaseGrant request to raft and apply it after committed. 52 LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) 53 // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. 54 LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) 55 56 // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error 57 // is returned. 58 LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) 59 60 // LeaseTimeToLive retrieves lease information. 61 LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) 62 63 // LeaseLeases lists all leases. 64 LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) 65} 66 67type Authenticator interface { 68 AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) 69 AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) 70 Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) 71 UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) 72 UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) 73 UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) 74 UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) 75 UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) 76 UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) 77 RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) 78 RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) 79 RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) 80 RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) 81 RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) 82 UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) 83 RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) 84} 85 86func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { 87 var resp *pb.RangeResponse 88 var err error 89 defer func(start time.Time) { 90 warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err) 91 }(time.Now()) 92 93 if !r.Serializable { 94 err = s.linearizableReadNotify(ctx) 95 if err != nil { 96 return nil, err 97 } 98 } 99 chk := func(ai *auth.AuthInfo) error { 100 return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) 101 } 102 103 get := func() { resp, err = s.applyV3Base.Range(nil, r) } 104 if serr := s.doSerialize(ctx, chk, get); serr != nil { 105 err = serr 106 return nil, err 107 } 108 return resp, err 109} 110 111func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { 112 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) 113 if err != nil { 114 return nil, err 115 } 116 return resp.(*pb.PutResponse), nil 117} 118 119func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { 120 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r}) 121 if err != nil { 122 return nil, err 123 } 124 return resp.(*pb.DeleteRangeResponse), nil 125} 126 127func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { 128 if isTxnReadonly(r) { 129 if !isTxnSerializable(r) { 130 err := s.linearizableReadNotify(ctx) 131 if err != nil { 132 return nil, err 133 } 134 } 135 var resp *pb.TxnResponse 136 var err error 137 chk := func(ai *auth.AuthInfo) error { 138 return checkTxnAuth(s.authStore, ai, r) 139 } 140 141 defer func(start time.Time) { 142 warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err) 143 }(time.Now()) 144 145 get := func() { resp, err = s.applyV3Base.Txn(r) } 146 if serr := s.doSerialize(ctx, chk, get); serr != nil { 147 return nil, serr 148 } 149 return resp, err 150 } 151 152 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r}) 153 if err != nil { 154 return nil, err 155 } 156 return resp.(*pb.TxnResponse), nil 157} 158 159func isTxnSerializable(r *pb.TxnRequest) bool { 160 for _, u := range r.Success { 161 if r := u.GetRequestRange(); r == nil || !r.Serializable { 162 return false 163 } 164 } 165 for _, u := range r.Failure { 166 if r := u.GetRequestRange(); r == nil || !r.Serializable { 167 return false 168 } 169 } 170 return true 171} 172 173func isTxnReadonly(r *pb.TxnRequest) bool { 174 for _, u := range r.Success { 175 if r := u.GetRequestRange(); r == nil { 176 return false 177 } 178 } 179 for _, u := range r.Failure { 180 if r := u.GetRequestRange(); r == nil { 181 return false 182 } 183 } 184 return true 185} 186 187func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { 188 result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) 189 if r.Physical && result != nil && result.physc != nil { 190 <-result.physc 191 // The compaction is done deleting keys; the hash is now settled 192 // but the data is not necessarily committed. If there's a crash, 193 // the hash may revert to a hash prior to compaction completing 194 // if the compaction resumes. Force the finished compaction to 195 // commit so it won't resume following a crash. 196 s.be.ForceCommit() 197 } 198 if err != nil { 199 return nil, err 200 } 201 if result.err != nil { 202 return nil, result.err 203 } 204 resp := result.resp.(*pb.CompactionResponse) 205 if resp == nil { 206 resp = &pb.CompactionResponse{} 207 } 208 if resp.Header == nil { 209 resp.Header = &pb.ResponseHeader{} 210 } 211 resp.Header.Revision = s.kv.Rev() 212 return resp, nil 213} 214 215func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 216 // no id given? choose one 217 for r.ID == int64(lease.NoLease) { 218 // only use positive int64 id's 219 r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) 220 } 221 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r}) 222 if err != nil { 223 return nil, err 224 } 225 return resp.(*pb.LeaseGrantResponse), nil 226} 227 228func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { 229 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) 230 if err != nil { 231 return nil, err 232 } 233 return resp.(*pb.LeaseRevokeResponse), nil 234} 235 236func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { 237 ttl, err := s.lessor.Renew(id) 238 if err == nil { // already requested to primary lessor(leader) 239 return ttl, nil 240 } 241 if err != lease.ErrNotPrimary { 242 return -1, err 243 } 244 245 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) 246 defer cancel() 247 248 // renewals don't go through raft; forward to leader manually 249 for cctx.Err() == nil && err != nil { 250 leader, lerr := s.waitLeader(cctx) 251 if lerr != nil { 252 return -1, lerr 253 } 254 for _, url := range leader.PeerURLs { 255 lurl := url + leasehttp.LeasePrefix 256 ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) 257 if err == nil || err == lease.ErrLeaseNotFound { 258 return ttl, err 259 } 260 } 261 } 262 return -1, ErrTimeout 263} 264 265func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { 266 if s.Leader() == s.ID() { 267 // primary; timetolive directly from leader 268 le := s.lessor.Lookup(lease.LeaseID(r.ID)) 269 if le == nil { 270 return nil, lease.ErrLeaseNotFound 271 } 272 // TODO: fill out ResponseHeader 273 resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()} 274 if r.Keys { 275 ks := le.Keys() 276 kbs := make([][]byte, len(ks)) 277 for i := range ks { 278 kbs[i] = []byte(ks[i]) 279 } 280 resp.Keys = kbs 281 } 282 return resp, nil 283 } 284 285 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) 286 defer cancel() 287 288 // forward to leader 289 for cctx.Err() == nil { 290 leader, err := s.waitLeader(cctx) 291 if err != nil { 292 return nil, err 293 } 294 for _, url := range leader.PeerURLs { 295 lurl := url + leasehttp.LeaseInternalPrefix 296 resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) 297 if err == nil { 298 return resp.LeaseTimeToLiveResponse, nil 299 } 300 if err == lease.ErrLeaseNotFound { 301 return nil, err 302 } 303 } 304 } 305 return nil, ErrTimeout 306} 307 308func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { 309 ls := s.lessor.Leases() 310 lss := make([]*pb.LeaseStatus, len(ls)) 311 for i := range ls { 312 lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)} 313 } 314 return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil 315} 316 317func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { 318 leader := s.cluster.Member(s.Leader()) 319 for leader == nil { 320 // wait an election 321 dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond 322 select { 323 case <-time.After(dur): 324 leader = s.cluster.Member(s.Leader()) 325 case <-s.stopping: 326 return nil, ErrStopped 327 case <-ctx.Done(): 328 return nil, ErrNoLeader 329 } 330 } 331 if leader == nil || len(leader.PeerURLs) == 0 { 332 return nil, ErrNoLeader 333 } 334 return leader, nil 335} 336 337func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { 338 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r}) 339 if err != nil { 340 return nil, err 341 } 342 return resp.(*pb.AlarmResponse), nil 343} 344 345func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { 346 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r}) 347 if err != nil { 348 return nil, err 349 } 350 return resp.(*pb.AuthEnableResponse), nil 351} 352 353func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) { 354 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r}) 355 if err != nil { 356 return nil, err 357 } 358 return resp.(*pb.AuthDisableResponse), nil 359} 360 361func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) { 362 if err := s.linearizableReadNotify(ctx); err != nil { 363 return nil, err 364 } 365 366 var resp proto.Message 367 for { 368 checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password) 369 if err != nil { 370 if err != auth.ErrAuthNotEnabled { 371 plog.Errorf("invalid authentication request to user %s was issued", r.Name) 372 } 373 return nil, err 374 } 375 376 st, err := s.AuthStore().GenTokenPrefix() 377 if err != nil { 378 return nil, err 379 } 380 381 internalReq := &pb.InternalAuthenticateRequest{ 382 Name: r.Name, 383 Password: r.Password, 384 SimpleToken: st, 385 } 386 387 resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq}) 388 if err != nil { 389 return nil, err 390 } 391 if checkedRevision == s.AuthStore().Revision() { 392 break 393 } 394 plog.Infof("revision when password checked is obsolete, retrying") 395 } 396 397 return resp.(*pb.AuthenticateResponse), nil 398} 399 400func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { 401 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r}) 402 if err != nil { 403 return nil, err 404 } 405 return resp.(*pb.AuthUserAddResponse), nil 406} 407 408func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { 409 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r}) 410 if err != nil { 411 return nil, err 412 } 413 return resp.(*pb.AuthUserDeleteResponse), nil 414} 415 416func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { 417 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r}) 418 if err != nil { 419 return nil, err 420 } 421 return resp.(*pb.AuthUserChangePasswordResponse), nil 422} 423 424func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { 425 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r}) 426 if err != nil { 427 return nil, err 428 } 429 return resp.(*pb.AuthUserGrantRoleResponse), nil 430} 431 432func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { 433 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r}) 434 if err != nil { 435 return nil, err 436 } 437 return resp.(*pb.AuthUserGetResponse), nil 438} 439 440func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { 441 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r}) 442 if err != nil { 443 return nil, err 444 } 445 return resp.(*pb.AuthUserListResponse), nil 446} 447 448func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) { 449 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r}) 450 if err != nil { 451 return nil, err 452 } 453 return resp.(*pb.AuthUserRevokeRoleResponse), nil 454} 455 456func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { 457 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r}) 458 if err != nil { 459 return nil, err 460 } 461 return resp.(*pb.AuthRoleAddResponse), nil 462} 463 464func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { 465 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r}) 466 if err != nil { 467 return nil, err 468 } 469 return resp.(*pb.AuthRoleGrantPermissionResponse), nil 470} 471 472func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { 473 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r}) 474 if err != nil { 475 return nil, err 476 } 477 return resp.(*pb.AuthRoleGetResponse), nil 478} 479 480func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { 481 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r}) 482 if err != nil { 483 return nil, err 484 } 485 return resp.(*pb.AuthRoleListResponse), nil 486} 487 488func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { 489 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r}) 490 if err != nil { 491 return nil, err 492 } 493 return resp.(*pb.AuthRoleRevokePermissionResponse), nil 494} 495 496func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) { 497 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r}) 498 if err != nil { 499 return nil, err 500 } 501 return resp.(*pb.AuthRoleDeleteResponse), nil 502} 503 504func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { 505 result, err := s.processInternalRaftRequestOnce(ctx, r) 506 if err != nil { 507 return nil, err 508 } 509 if result.err != nil { 510 return nil, result.err 511 } 512 return result.resp, nil 513} 514 515func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { 516 return s.raftRequestOnce(ctx, r) 517} 518 519// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure. 520func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { 521 ai, err := s.AuthInfoFromCtx(ctx) 522 if err != nil { 523 return err 524 } 525 if ai == nil { 526 // chk expects non-nil AuthInfo; use empty credentials 527 ai = &auth.AuthInfo{} 528 } 529 if err = chk(ai); err != nil { 530 return err 531 } 532 // fetch response for serialized request 533 get() 534 // check for stale token revision in case the auth store was updated while 535 // the request has been handled. 536 if ai.Revision != 0 && ai.Revision != s.authStore.Revision() { 537 return auth.ErrAuthOldRevision 538 } 539 return nil 540} 541 542func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { 543 ai := s.getAppliedIndex() 544 ci := s.getCommittedIndex() 545 if ci > ai+maxGapBetweenApplyAndCommitIndex { 546 return nil, ErrTooManyRequests 547 } 548 549 r.Header = &pb.RequestHeader{ 550 ID: s.reqIDGen.Next(), 551 } 552 553 authInfo, err := s.AuthInfoFromCtx(ctx) 554 if err != nil { 555 return nil, err 556 } 557 if authInfo != nil { 558 r.Header.Username = authInfo.Username 559 r.Header.AuthRevision = authInfo.Revision 560 } 561 562 data, err := r.Marshal() 563 if err != nil { 564 return nil, err 565 } 566 567 if len(data) > int(s.Cfg.MaxRequestBytes) { 568 return nil, ErrRequestTooLarge 569 } 570 571 id := r.ID 572 if id == 0 { 573 id = r.Header.ID 574 } 575 ch := s.w.Register(id) 576 577 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) 578 defer cancel() 579 580 start := time.Now() 581 s.r.Propose(cctx, data) 582 proposalsPending.Inc() 583 defer proposalsPending.Dec() 584 585 select { 586 case x := <-ch: 587 return x.(*applyResult), nil 588 case <-cctx.Done(): 589 proposalsFailed.Inc() 590 s.w.Trigger(id, nil) // GC wait 591 return nil, s.parseProposeCtxErr(cctx.Err(), start) 592 case <-s.done: 593 return nil, ErrStopped 594 } 595} 596 597// Watchable returns a watchable interface attached to the etcdserver. 598func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } 599 600func (s *EtcdServer) linearizableReadLoop() { 601 var rs raft.ReadState 602 603 for { 604 ctxToSend := make([]byte, 8) 605 id1 := s.reqIDGen.Next() 606 binary.BigEndian.PutUint64(ctxToSend, id1) 607 608 leaderChangedNotifier := s.leaderChangedNotify() 609 select { 610 case <-leaderChangedNotifier: 611 continue 612 case <-s.readwaitc: 613 case <-s.stopping: 614 return 615 } 616 617 nextnr := newNotifier() 618 619 s.readMu.Lock() 620 nr := s.readNotifier 621 s.readNotifier = nextnr 622 s.readMu.Unlock() 623 624 cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) 625 if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { 626 cancel() 627 if err == raft.ErrStopped { 628 return 629 } 630 plog.Errorf("failed to get read index from raft: %v", err) 631 readIndexFailed.Inc() 632 nr.notify(err) 633 continue 634 } 635 cancel() 636 637 var ( 638 timeout bool 639 done bool 640 ) 641 for !timeout && !done { 642 select { 643 case rs = <-s.r.readStateC: 644 done = bytes.Equal(rs.RequestCtx, ctxToSend) 645 if !done { 646 // a previous request might time out. now we should ignore the response of it and 647 // continue waiting for the response of the current requests. 648 id2 := uint64(0) 649 if len(rs.RequestCtx) == 8 { 650 id2 = binary.BigEndian.Uint64(rs.RequestCtx) 651 } 652 plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) 653 slowReadIndex.Inc() 654 } 655 656 case <-leaderChangedNotifier: 657 timeout = true 658 readIndexFailed.Inc() 659 // return a retryable error. 660 nr.notify(ErrLeaderChanged) 661 662 case <-time.After(s.Cfg.ReqTimeout()): 663 plog.Warningf("timed out waiting for read index response (local node might have slow network)") 664 nr.notify(ErrTimeout) 665 timeout = true 666 slowReadIndex.Inc() 667 668 case <-s.stopping: 669 return 670 } 671 } 672 if !done { 673 continue 674 } 675 676 if ai := s.getAppliedIndex(); ai < rs.Index { 677 select { 678 case <-s.applyWait.Wait(rs.Index): 679 case <-s.stopping: 680 return 681 } 682 } 683 // unblock all l-reads requested at indices before rs.Index 684 nr.notify(nil) 685 } 686} 687 688func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { 689 s.readMu.RLock() 690 nc := s.readNotifier 691 s.readMu.RUnlock() 692 693 // signal linearizable loop for current notify if it hasn't been already 694 select { 695 case s.readwaitc <- struct{}{}: 696 default: 697 } 698 699 // wait for read state notification 700 select { 701 case <-nc.c: 702 return nc.err 703 case <-ctx.Done(): 704 return ctx.Err() 705 case <-s.done: 706 return ErrStopped 707 } 708} 709 710func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) { 711 authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx) 712 if authInfo != nil || err != nil { 713 return authInfo, err 714 } 715 if !s.Cfg.ClientCertAuthEnabled { 716 return nil, nil 717 } 718 authInfo = s.AuthStore().AuthInfoFromTLS(ctx) 719 return authInfo, nil 720} 721