1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "bytes" 19 "context" 20 "encoding/binary" 21 "time" 22 23 "go.etcd.io/etcd/auth" 24 "go.etcd.io/etcd/etcdserver/api/membership" 25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 26 "go.etcd.io/etcd/lease" 27 "go.etcd.io/etcd/lease/leasehttp" 28 "go.etcd.io/etcd/mvcc" 29 "go.etcd.io/etcd/pkg/traceutil" 30 "go.etcd.io/etcd/raft" 31 32 "github.com/gogo/protobuf/proto" 33 "go.uber.org/zap" 34) 35 36const ( 37 // In the health case, there might be a small gap (10s of entries) between 38 // the applied index and committed index. 39 // However, if the committed entries are very heavy to apply, the gap might grow. 40 // We should stop accepting new proposals if the gap growing to a certain point. 41 maxGapBetweenApplyAndCommitIndex = 5000 42 traceThreshold = 100 * time.Millisecond 43) 44 45type RaftKV interface { 46 Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) 47 Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) 48 DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) 49 Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) 50 Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) 51} 52 53type Lessor interface { 54 // LeaseGrant sends LeaseGrant request to raft and apply it after committed. 55 LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) 56 // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. 57 LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) 58 59 // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error 60 // is returned. 61 LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) 62 63 // LeaseTimeToLive retrieves lease information. 64 LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) 65 66 // LeaseLeases lists all leases. 67 LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) 68} 69 70type Authenticator interface { 71 AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) 72 AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) 73 Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) 74 UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) 75 UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) 76 UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) 77 UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) 78 UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) 79 UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) 80 RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) 81 RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) 82 RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) 83 RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) 84 RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) 85 UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) 86 RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) 87} 88 89func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { 90 trace := traceutil.New("range", 91 s.getLogger(), 92 traceutil.Field{Key: "range_begin", Value: string(r.Key)}, 93 traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)}, 94 ) 95 ctx = context.WithValue(ctx, traceutil.TraceKey, trace) 96 97 var resp *pb.RangeResponse 98 var err error 99 defer func(start time.Time) { 100 warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err) 101 if resp != nil { 102 trace.AddField( 103 traceutil.Field{Key: "response_count", Value: len(resp.Kvs)}, 104 traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}, 105 ) 106 } 107 trace.LogIfLong(traceThreshold) 108 }(time.Now()) 109 110 if !r.Serializable { 111 err = s.linearizableReadNotify(ctx) 112 trace.Step("agreement among raft nodes before linearized reading") 113 if err != nil { 114 return nil, err 115 } 116 } 117 chk := func(ai *auth.AuthInfo) error { 118 return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) 119 } 120 121 get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) } 122 if serr := s.doSerialize(ctx, chk, get); serr != nil { 123 err = serr 124 return nil, err 125 } 126 return resp, err 127} 128 129func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { 130 ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now()) 131 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) 132 if err != nil { 133 return nil, err 134 } 135 return resp.(*pb.PutResponse), nil 136} 137 138func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { 139 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r}) 140 if err != nil { 141 return nil, err 142 } 143 return resp.(*pb.DeleteRangeResponse), nil 144} 145 146func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { 147 if isTxnReadonly(r) { 148 if !isTxnSerializable(r) { 149 err := s.linearizableReadNotify(ctx) 150 if err != nil { 151 return nil, err 152 } 153 } 154 var resp *pb.TxnResponse 155 var err error 156 chk := func(ai *auth.AuthInfo) error { 157 return checkTxnAuth(s.authStore, ai, r) 158 } 159 160 defer func(start time.Time) { 161 warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err) 162 }(time.Now()) 163 164 get := func() { resp, err = s.applyV3Base.Txn(r) } 165 if serr := s.doSerialize(ctx, chk, get); serr != nil { 166 return nil, serr 167 } 168 return resp, err 169 } 170 171 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r}) 172 if err != nil { 173 return nil, err 174 } 175 return resp.(*pb.TxnResponse), nil 176} 177 178func isTxnSerializable(r *pb.TxnRequest) bool { 179 for _, u := range r.Success { 180 if r := u.GetRequestRange(); r == nil || !r.Serializable { 181 return false 182 } 183 } 184 for _, u := range r.Failure { 185 if r := u.GetRequestRange(); r == nil || !r.Serializable { 186 return false 187 } 188 } 189 return true 190} 191 192func isTxnReadonly(r *pb.TxnRequest) bool { 193 for _, u := range r.Success { 194 if r := u.GetRequestRange(); r == nil { 195 return false 196 } 197 } 198 for _, u := range r.Failure { 199 if r := u.GetRequestRange(); r == nil { 200 return false 201 } 202 } 203 return true 204} 205 206func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { 207 startTime := time.Now() 208 result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) 209 trace := traceutil.TODO() 210 if result != nil && result.trace != nil { 211 trace = result.trace 212 defer func() { 213 trace.LogIfLong(traceThreshold) 214 }() 215 applyStart := result.trace.GetStartTime() 216 result.trace.SetStartTime(startTime) 217 trace.InsertStep(0, applyStart, "process raft request") 218 } 219 if r.Physical && result != nil && result.physc != nil { 220 <-result.physc 221 // The compaction is done deleting keys; the hash is now settled 222 // but the data is not necessarily committed. If there's a crash, 223 // the hash may revert to a hash prior to compaction completing 224 // if the compaction resumes. Force the finished compaction to 225 // commit so it won't resume following a crash. 226 s.be.ForceCommit() 227 trace.Step("physically apply compaction") 228 } 229 if err != nil { 230 return nil, err 231 } 232 if result.err != nil { 233 return nil, result.err 234 } 235 resp := result.resp.(*pb.CompactionResponse) 236 if resp == nil { 237 resp = &pb.CompactionResponse{} 238 } 239 if resp.Header == nil { 240 resp.Header = &pb.ResponseHeader{} 241 } 242 resp.Header.Revision = s.kv.Rev() 243 trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) 244 return resp, nil 245} 246 247func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { 248 // no id given? choose one 249 for r.ID == int64(lease.NoLease) { 250 // only use positive int64 id's 251 r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) 252 } 253 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r}) 254 if err != nil { 255 return nil, err 256 } 257 return resp.(*pb.LeaseGrantResponse), nil 258} 259 260func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { 261 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) 262 if err != nil { 263 return nil, err 264 } 265 return resp.(*pb.LeaseRevokeResponse), nil 266} 267 268func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { 269 ttl, err := s.lessor.Renew(id) 270 if err == nil { // already requested to primary lessor(leader) 271 return ttl, nil 272 } 273 if err != lease.ErrNotPrimary { 274 return -1, err 275 } 276 277 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) 278 defer cancel() 279 280 // renewals don't go through raft; forward to leader manually 281 for cctx.Err() == nil && err != nil { 282 leader, lerr := s.waitLeader(cctx) 283 if lerr != nil { 284 return -1, lerr 285 } 286 for _, url := range leader.PeerURLs { 287 lurl := url + leasehttp.LeasePrefix 288 ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) 289 if err == nil || err == lease.ErrLeaseNotFound { 290 return ttl, err 291 } 292 } 293 } 294 295 if cctx.Err() == context.DeadlineExceeded { 296 return -1, ErrTimeout 297 } 298 return -1, ErrCanceled 299} 300 301func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { 302 if s.Leader() == s.ID() { 303 // primary; timetolive directly from leader 304 le := s.lessor.Lookup(lease.LeaseID(r.ID)) 305 if le == nil { 306 return nil, lease.ErrLeaseNotFound 307 } 308 // TODO: fill out ResponseHeader 309 resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()} 310 if r.Keys { 311 ks := le.Keys() 312 kbs := make([][]byte, len(ks)) 313 for i := range ks { 314 kbs[i] = []byte(ks[i]) 315 } 316 resp.Keys = kbs 317 } 318 return resp, nil 319 } 320 321 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) 322 defer cancel() 323 324 // forward to leader 325 for cctx.Err() == nil { 326 leader, err := s.waitLeader(cctx) 327 if err != nil { 328 return nil, err 329 } 330 for _, url := range leader.PeerURLs { 331 lurl := url + leasehttp.LeaseInternalPrefix 332 resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt) 333 if err == nil { 334 return resp.LeaseTimeToLiveResponse, nil 335 } 336 if err == lease.ErrLeaseNotFound { 337 return nil, err 338 } 339 } 340 } 341 342 if cctx.Err() == context.DeadlineExceeded { 343 return nil, ErrTimeout 344 } 345 return nil, ErrCanceled 346} 347 348func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { 349 ls := s.lessor.Leases() 350 lss := make([]*pb.LeaseStatus, len(ls)) 351 for i := range ls { 352 lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)} 353 } 354 return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil 355} 356 357func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { 358 leader := s.cluster.Member(s.Leader()) 359 for leader == nil { 360 // wait an election 361 dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond 362 select { 363 case <-time.After(dur): 364 leader = s.cluster.Member(s.Leader()) 365 case <-s.stopping: 366 return nil, ErrStopped 367 case <-ctx.Done(): 368 return nil, ErrNoLeader 369 } 370 } 371 if leader == nil || len(leader.PeerURLs) == 0 { 372 return nil, ErrNoLeader 373 } 374 return leader, nil 375} 376 377func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { 378 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r}) 379 if err != nil { 380 return nil, err 381 } 382 return resp.(*pb.AlarmResponse), nil 383} 384 385func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { 386 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r}) 387 if err != nil { 388 return nil, err 389 } 390 return resp.(*pb.AuthEnableResponse), nil 391} 392 393func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) { 394 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r}) 395 if err != nil { 396 return nil, err 397 } 398 return resp.(*pb.AuthDisableResponse), nil 399} 400 401func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) { 402 if err := s.linearizableReadNotify(ctx); err != nil { 403 return nil, err 404 } 405 406 lg := s.getLogger() 407 408 var resp proto.Message 409 for { 410 checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password) 411 if err != nil { 412 if err != auth.ErrAuthNotEnabled { 413 if lg != nil { 414 lg.Warn( 415 "invalid authentication was requested", 416 zap.String("user", r.Name), 417 zap.Error(err), 418 ) 419 } else { 420 plog.Errorf("invalid authentication request to user %s was issued", r.Name) 421 } 422 } 423 return nil, err 424 } 425 426 st, err := s.AuthStore().GenTokenPrefix() 427 if err != nil { 428 return nil, err 429 } 430 431 // internalReq doesn't need to have Password because the above s.AuthStore().CheckPassword() already did it. 432 // In addition, it will let a WAL entry not record password as a plain text. 433 internalReq := &pb.InternalAuthenticateRequest{ 434 Name: r.Name, 435 SimpleToken: st, 436 } 437 438 resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq}) 439 if err != nil { 440 return nil, err 441 } 442 if checkedRevision == s.AuthStore().Revision() { 443 break 444 } 445 446 if lg != nil { 447 lg.Info("revision when password checked became stale; retrying") 448 } else { 449 plog.Infof("revision when password checked is obsolete, retrying") 450 } 451 } 452 453 return resp.(*pb.AuthenticateResponse), nil 454} 455 456func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { 457 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r}) 458 if err != nil { 459 return nil, err 460 } 461 return resp.(*pb.AuthUserAddResponse), nil 462} 463 464func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { 465 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r}) 466 if err != nil { 467 return nil, err 468 } 469 return resp.(*pb.AuthUserDeleteResponse), nil 470} 471 472func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { 473 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r}) 474 if err != nil { 475 return nil, err 476 } 477 return resp.(*pb.AuthUserChangePasswordResponse), nil 478} 479 480func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { 481 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r}) 482 if err != nil { 483 return nil, err 484 } 485 return resp.(*pb.AuthUserGrantRoleResponse), nil 486} 487 488func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { 489 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r}) 490 if err != nil { 491 return nil, err 492 } 493 return resp.(*pb.AuthUserGetResponse), nil 494} 495 496func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { 497 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r}) 498 if err != nil { 499 return nil, err 500 } 501 return resp.(*pb.AuthUserListResponse), nil 502} 503 504func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) { 505 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r}) 506 if err != nil { 507 return nil, err 508 } 509 return resp.(*pb.AuthUserRevokeRoleResponse), nil 510} 511 512func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { 513 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r}) 514 if err != nil { 515 return nil, err 516 } 517 return resp.(*pb.AuthRoleAddResponse), nil 518} 519 520func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) { 521 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r}) 522 if err != nil { 523 return nil, err 524 } 525 return resp.(*pb.AuthRoleGrantPermissionResponse), nil 526} 527 528func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { 529 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r}) 530 if err != nil { 531 return nil, err 532 } 533 return resp.(*pb.AuthRoleGetResponse), nil 534} 535 536func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { 537 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r}) 538 if err != nil { 539 return nil, err 540 } 541 return resp.(*pb.AuthRoleListResponse), nil 542} 543 544func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { 545 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r}) 546 if err != nil { 547 return nil, err 548 } 549 return resp.(*pb.AuthRoleRevokePermissionResponse), nil 550} 551 552func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) { 553 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r}) 554 if err != nil { 555 return nil, err 556 } 557 return resp.(*pb.AuthRoleDeleteResponse), nil 558} 559 560func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { 561 result, err := s.processInternalRaftRequestOnce(ctx, r) 562 if err != nil { 563 return nil, err 564 } 565 if result.err != nil { 566 return nil, result.err 567 } 568 if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil { 569 applyStart := result.trace.GetStartTime() 570 // The trace object is created in apply. Here reset the start time to trace 571 // the raft request time by the difference between the request start time 572 // and apply start time 573 result.trace.SetStartTime(startTime) 574 result.trace.InsertStep(0, applyStart, "process raft request") 575 result.trace.LogIfLong(traceThreshold) 576 } 577 return result.resp, nil 578} 579 580func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { 581 return s.raftRequestOnce(ctx, r) 582} 583 584// doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure. 585func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { 586 trace := traceutil.Get(ctx) 587 ai, err := s.AuthInfoFromCtx(ctx) 588 if err != nil { 589 return err 590 } 591 if ai == nil { 592 // chk expects non-nil AuthInfo; use empty credentials 593 ai = &auth.AuthInfo{} 594 } 595 if err = chk(ai); err != nil { 596 return err 597 } 598 trace.Step("get authentication metadata") 599 // fetch response for serialized request 600 get() 601 // check for stale token revision in case the auth store was updated while 602 // the request has been handled. 603 if ai.Revision != 0 && ai.Revision != s.authStore.Revision() { 604 return auth.ErrAuthOldRevision 605 } 606 return nil 607} 608 609func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { 610 ai := s.getAppliedIndex() 611 ci := s.getCommittedIndex() 612 if ci > ai+maxGapBetweenApplyAndCommitIndex { 613 return nil, ErrTooManyRequests 614 } 615 616 r.Header = &pb.RequestHeader{ 617 ID: s.reqIDGen.Next(), 618 } 619 620 authInfo, err := s.AuthInfoFromCtx(ctx) 621 if err != nil { 622 return nil, err 623 } 624 if authInfo != nil { 625 r.Header.Username = authInfo.Username 626 r.Header.AuthRevision = authInfo.Revision 627 } 628 629 data, err := r.Marshal() 630 if err != nil { 631 return nil, err 632 } 633 634 if len(data) > int(s.Cfg.MaxRequestBytes) { 635 return nil, ErrRequestTooLarge 636 } 637 638 id := r.ID 639 if id == 0 { 640 id = r.Header.ID 641 } 642 ch := s.w.Register(id) 643 644 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) 645 defer cancel() 646 647 start := time.Now() 648 err = s.r.Propose(cctx, data) 649 if err != nil { 650 proposalsFailed.Inc() 651 s.w.Trigger(id, nil) // GC wait 652 return nil, err 653 } 654 proposalsPending.Inc() 655 defer proposalsPending.Dec() 656 657 select { 658 case x := <-ch: 659 return x.(*applyResult), nil 660 case <-cctx.Done(): 661 proposalsFailed.Inc() 662 s.w.Trigger(id, nil) // GC wait 663 return nil, s.parseProposeCtxErr(cctx.Err(), start) 664 case <-s.done: 665 return nil, ErrStopped 666 } 667} 668 669// Watchable returns a watchable interface attached to the etcdserver. 670func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } 671 672func (s *EtcdServer) linearizableReadLoop() { 673 var rs raft.ReadState 674 675 for { 676 ctxToSend := make([]byte, 8) 677 id1 := s.reqIDGen.Next() 678 binary.BigEndian.PutUint64(ctxToSend, id1) 679 leaderChangedNotifier := s.leaderChangedNotify() 680 select { 681 case <-leaderChangedNotifier: 682 continue 683 case <-s.readwaitc: 684 case <-s.stopping: 685 return 686 } 687 688 nextnr := newNotifier() 689 690 s.readMu.Lock() 691 nr := s.readNotifier 692 s.readNotifier = nextnr 693 s.readMu.Unlock() 694 695 lg := s.getLogger() 696 cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) 697 if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { 698 cancel() 699 if err == raft.ErrStopped { 700 return 701 } 702 if lg != nil { 703 lg.Warn("failed to get read index from Raft", zap.Error(err)) 704 } else { 705 plog.Errorf("failed to get read index from raft: %v", err) 706 } 707 readIndexFailed.Inc() 708 nr.notify(err) 709 continue 710 } 711 cancel() 712 713 var ( 714 timeout bool 715 done bool 716 ) 717 for !timeout && !done { 718 select { 719 case rs = <-s.r.readStateC: 720 done = bytes.Equal(rs.RequestCtx, ctxToSend) 721 if !done { 722 // a previous request might time out. now we should ignore the response of it and 723 // continue waiting for the response of the current requests. 724 id2 := uint64(0) 725 if len(rs.RequestCtx) == 8 { 726 id2 = binary.BigEndian.Uint64(rs.RequestCtx) 727 } 728 if lg != nil { 729 lg.Warn( 730 "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", 731 zap.Uint64("sent-request-id", id1), 732 zap.Uint64("received-request-id", id2), 733 ) 734 } else { 735 plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) 736 } 737 slowReadIndex.Inc() 738 } 739 case <-leaderChangedNotifier: 740 timeout = true 741 readIndexFailed.Inc() 742 // return a retryable error. 743 nr.notify(ErrLeaderChanged) 744 case <-time.After(s.Cfg.ReqTimeout()): 745 if lg != nil { 746 lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) 747 } else { 748 plog.Warningf("timed out waiting for read index response (local node might have slow network)") 749 } 750 nr.notify(ErrTimeout) 751 timeout = true 752 slowReadIndex.Inc() 753 case <-s.stopping: 754 return 755 } 756 } 757 if !done { 758 continue 759 } 760 761 if ai := s.getAppliedIndex(); ai < rs.Index { 762 select { 763 case <-s.applyWait.Wait(rs.Index): 764 case <-s.stopping: 765 return 766 } 767 } 768 // unblock all l-reads requested at indices before rs.Index 769 nr.notify(nil) 770 } 771} 772 773func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { 774 s.readMu.RLock() 775 nc := s.readNotifier 776 s.readMu.RUnlock() 777 778 // signal linearizable loop for current notify if it hasn't been already 779 select { 780 case s.readwaitc <- struct{}{}: 781 default: 782 } 783 784 // wait for read state notification 785 select { 786 case <-nc.c: 787 return nc.err 788 case <-ctx.Done(): 789 return ctx.Err() 790 case <-s.done: 791 return ErrStopped 792 } 793} 794 795func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) { 796 authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx) 797 if authInfo != nil || err != nil { 798 return authInfo, err 799 } 800 if !s.Cfg.ClientCertAuthEnabled { 801 return nil, nil 802 } 803 authInfo = s.AuthStore().AuthInfoFromTLS(ctx) 804 return authInfo, nil 805 806} 807