1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 "sync" 20 "time" 21 22 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 23 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 24 25 "go.uber.org/zap" 26 "google.golang.org/grpc" 27 "google.golang.org/grpc/metadata" 28) 29 30type ( 31 LeaseRevokeResponse pb.LeaseRevokeResponse 32 LeaseID int64 33) 34 35// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse. 36type LeaseGrantResponse struct { 37 *pb.ResponseHeader 38 ID LeaseID 39 TTL int64 40 Error string 41} 42 43// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse. 44type LeaseKeepAliveResponse struct { 45 *pb.ResponseHeader 46 ID LeaseID 47 TTL int64 48} 49 50// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse. 51type LeaseTimeToLiveResponse struct { 52 *pb.ResponseHeader 53 ID LeaseID `json:"id"` 54 55 // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1. 56 TTL int64 `json:"ttl"` 57 58 // GrantedTTL is the initial granted time in seconds upon lease creation/renewal. 59 GrantedTTL int64 `json:"granted-ttl"` 60 61 // Keys is the list of keys attached to this lease. 62 Keys [][]byte `json:"keys"` 63} 64 65// LeaseStatus represents a lease status. 66type LeaseStatus struct { 67 ID LeaseID `json:"id"` 68 // TODO: TTL int64 69} 70 71// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse. 72type LeaseLeasesResponse struct { 73 *pb.ResponseHeader 74 Leases []LeaseStatus `json:"leases"` 75} 76 77const ( 78 // defaultTTL is the assumed lease TTL used for the first keepalive 79 // deadline before the actual TTL is known to the client. 80 defaultTTL = 5 * time.Second 81 // NoLease is a lease ID for the absence of a lease. 82 NoLease LeaseID = 0 83 84 // retryConnWait is how long to wait before retrying request due to an error 85 retryConnWait = 500 * time.Millisecond 86) 87 88// LeaseResponseChSize is the size of buffer to store unsent lease responses. 89// WARNING: DO NOT UPDATE. 90// Only for testing purposes. 91var LeaseResponseChSize = 16 92 93// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error. 94// 95// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected. 96type ErrKeepAliveHalted struct { 97 Reason error 98} 99 100func (e ErrKeepAliveHalted) Error() string { 101 s := "etcdclient: leases keep alive halted" 102 if e.Reason != nil { 103 s += ": " + e.Reason.Error() 104 } 105 return s 106} 107 108type Lease interface { 109 // Grant creates a new lease. 110 Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) 111 112 // Revoke revokes the given lease. 113 Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) 114 115 // TimeToLive retrieves the lease information of the given lease ID. 116 TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) 117 118 // Leases retrieves all leases. 119 Leases(ctx context.Context) (*LeaseLeasesResponse, error) 120 121 // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted 122 // to the channel are not consumed promptly the channel may become full. When full, the lease 123 // client will continue sending keep alive requests to the etcd server, but will drop responses 124 // until there is capacity on the channel to send more responses. 125 // 126 // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or 127 // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error 128 // containing the error reason. 129 // 130 // The returned "LeaseKeepAliveResponse" channel closes if underlying keep 131 // alive stream is interrupted in some way the client cannot handle itself; 132 // given context "ctx" is canceled or timed out. 133 // 134 // TODO(v4.0): post errors to last keep alive message before closing 135 // (see https://github.com/etcd-io/etcd/pull/7866) 136 KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) 137 138 // KeepAliveOnce renews the lease once. The response corresponds to the 139 // first message from calling KeepAlive. If the response has a recoverable 140 // error, KeepAliveOnce will retry the RPC with a new keep alive message. 141 // 142 // In most of the cases, Keepalive should be used instead of KeepAliveOnce. 143 KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) 144 145 // Close releases all resources Lease keeps for efficient communication 146 // with the etcd server. 147 Close() error 148} 149 150type lessor struct { 151 mu sync.Mutex // guards all fields 152 153 // donec is closed and loopErr is set when recvKeepAliveLoop stops 154 donec chan struct{} 155 loopErr error 156 157 remote pb.LeaseClient 158 159 stream pb.Lease_LeaseKeepAliveClient 160 streamCancel context.CancelFunc 161 162 stopCtx context.Context 163 stopCancel context.CancelFunc 164 165 keepAlives map[LeaseID]*keepAlive 166 167 // firstKeepAliveTimeout is the timeout for the first keepalive request 168 // before the actual TTL is known to the lease client 169 firstKeepAliveTimeout time.Duration 170 171 // firstKeepAliveOnce ensures stream starts after first KeepAlive call. 172 firstKeepAliveOnce sync.Once 173 174 callOpts []grpc.CallOption 175 176 lg *zap.Logger 177} 178 179// keepAlive multiplexes a keepalive for a lease over multiple channels 180type keepAlive struct { 181 chs []chan<- *LeaseKeepAliveResponse 182 ctxs []context.Context 183 // deadline is the time the keep alive channels close if no response 184 deadline time.Time 185 // nextKeepAlive is when to send the next keep alive message 186 nextKeepAlive time.Time 187 // donec is closed on lease revoke, expiration, or cancel. 188 donec chan struct{} 189} 190 191func NewLease(c *Client) Lease { 192 return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second) 193} 194 195func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease { 196 l := &lessor{ 197 donec: make(chan struct{}), 198 keepAlives: make(map[LeaseID]*keepAlive), 199 remote: remote, 200 firstKeepAliveTimeout: keepAliveTimeout, 201 lg: c.lg, 202 } 203 if l.firstKeepAliveTimeout == time.Second { 204 l.firstKeepAliveTimeout = defaultTTL 205 } 206 if c != nil { 207 l.callOpts = c.callOpts 208 } 209 reqLeaderCtx := WithRequireLeader(context.Background()) 210 l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx) 211 return l 212} 213 214func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { 215 r := &pb.LeaseGrantRequest{TTL: ttl} 216 resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...) 217 if err == nil { 218 gresp := &LeaseGrantResponse{ 219 ResponseHeader: resp.GetHeader(), 220 ID: LeaseID(resp.ID), 221 TTL: resp.TTL, 222 Error: resp.Error, 223 } 224 return gresp, nil 225 } 226 return nil, toErr(ctx, err) 227} 228 229func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { 230 r := &pb.LeaseRevokeRequest{ID: int64(id)} 231 resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...) 232 if err == nil { 233 return (*LeaseRevokeResponse)(resp), nil 234 } 235 return nil, toErr(ctx, err) 236} 237 238func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { 239 r := toLeaseTimeToLiveRequest(id, opts...) 240 resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...) 241 if err == nil { 242 gresp := &LeaseTimeToLiveResponse{ 243 ResponseHeader: resp.GetHeader(), 244 ID: LeaseID(resp.ID), 245 TTL: resp.TTL, 246 GrantedTTL: resp.GrantedTTL, 247 Keys: resp.Keys, 248 } 249 return gresp, nil 250 } 251 return nil, toErr(ctx, err) 252} 253 254func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { 255 resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...) 256 if err == nil { 257 leases := make([]LeaseStatus, len(resp.Leases)) 258 for i := range resp.Leases { 259 leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)} 260 } 261 return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil 262 } 263 return nil, toErr(ctx, err) 264} 265 266func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { 267 ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize) 268 269 l.mu.Lock() 270 // ensure that recvKeepAliveLoop is still running 271 select { 272 case <-l.donec: 273 err := l.loopErr 274 l.mu.Unlock() 275 close(ch) 276 return ch, ErrKeepAliveHalted{Reason: err} 277 default: 278 } 279 ka, ok := l.keepAlives[id] 280 if !ok { 281 // create fresh keep alive 282 ka = &keepAlive{ 283 chs: []chan<- *LeaseKeepAliveResponse{ch}, 284 ctxs: []context.Context{ctx}, 285 deadline: time.Now().Add(l.firstKeepAliveTimeout), 286 nextKeepAlive: time.Now(), 287 donec: make(chan struct{}), 288 } 289 l.keepAlives[id] = ka 290 } else { 291 // add channel and context to existing keep alive 292 ka.ctxs = append(ka.ctxs, ctx) 293 ka.chs = append(ka.chs, ch) 294 } 295 l.mu.Unlock() 296 297 go l.keepAliveCtxCloser(ctx, id, ka.donec) 298 l.firstKeepAliveOnce.Do(func() { 299 go l.recvKeepAliveLoop() 300 go l.deadlineLoop() 301 }) 302 303 return ch, nil 304} 305 306func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { 307 for { 308 resp, err := l.keepAliveOnce(ctx, id) 309 if err == nil { 310 if resp.TTL <= 0 { 311 err = rpctypes.ErrLeaseNotFound 312 } 313 return resp, err 314 } 315 if isHaltErr(ctx, err) { 316 return nil, toErr(ctx, err) 317 } 318 } 319} 320 321func (l *lessor) Close() error { 322 l.stopCancel() 323 // close for synchronous teardown if stream goroutines never launched 324 l.firstKeepAliveOnce.Do(func() { close(l.donec) }) 325 <-l.donec 326 return nil 327} 328 329func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) { 330 select { 331 case <-donec: 332 return 333 case <-l.donec: 334 return 335 case <-ctx.Done(): 336 } 337 338 l.mu.Lock() 339 defer l.mu.Unlock() 340 341 ka, ok := l.keepAlives[id] 342 if !ok { 343 return 344 } 345 346 // close channel and remove context if still associated with keep alive 347 for i, c := range ka.ctxs { 348 if c == ctx { 349 close(ka.chs[i]) 350 ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...) 351 ka.chs = append(ka.chs[:i], ka.chs[i+1:]...) 352 break 353 } 354 } 355 // remove if no one more listeners 356 if len(ka.chs) == 0 { 357 delete(l.keepAlives, id) 358 } 359} 360 361// closeRequireLeader scans keepAlives for ctxs that have require leader 362// and closes the associated channels. 363func (l *lessor) closeRequireLeader() { 364 l.mu.Lock() 365 defer l.mu.Unlock() 366 for _, ka := range l.keepAlives { 367 reqIdxs := 0 368 // find all required leader channels, close, mark as nil 369 for i, ctx := range ka.ctxs { 370 md, ok := metadata.FromOutgoingContext(ctx) 371 if !ok { 372 continue 373 } 374 ks := md[rpctypes.MetadataRequireLeaderKey] 375 if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader { 376 continue 377 } 378 close(ka.chs[i]) 379 ka.chs[i] = nil 380 reqIdxs++ 381 } 382 if reqIdxs == 0 { 383 continue 384 } 385 // remove all channels that required a leader from keepalive 386 newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs) 387 newCtxs := make([]context.Context, len(newChs)) 388 newIdx := 0 389 for i := range ka.chs { 390 if ka.chs[i] == nil { 391 continue 392 } 393 newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx] 394 newIdx++ 395 } 396 ka.chs, ka.ctxs = newChs, newCtxs 397 } 398} 399 400func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { 401 cctx, cancel := context.WithCancel(ctx) 402 defer cancel() 403 404 stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...) 405 if err != nil { 406 return nil, toErr(ctx, err) 407 } 408 409 err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) 410 if err != nil { 411 return nil, toErr(ctx, err) 412 } 413 414 resp, rerr := stream.Recv() 415 if rerr != nil { 416 return nil, toErr(ctx, rerr) 417 } 418 419 karesp := &LeaseKeepAliveResponse{ 420 ResponseHeader: resp.GetHeader(), 421 ID: LeaseID(resp.ID), 422 TTL: resp.TTL, 423 } 424 return karesp, nil 425} 426 427func (l *lessor) recvKeepAliveLoop() (gerr error) { 428 defer func() { 429 l.mu.Lock() 430 close(l.donec) 431 l.loopErr = gerr 432 for _, ka := range l.keepAlives { 433 ka.close() 434 } 435 l.keepAlives = make(map[LeaseID]*keepAlive) 436 l.mu.Unlock() 437 }() 438 439 for { 440 stream, err := l.resetRecv() 441 if err != nil { 442 if canceledByCaller(l.stopCtx, err) { 443 return err 444 } 445 } else { 446 for { 447 resp, err := stream.Recv() 448 if err != nil { 449 if canceledByCaller(l.stopCtx, err) { 450 return err 451 } 452 453 if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader { 454 l.closeRequireLeader() 455 } 456 break 457 } 458 459 l.recvKeepAlive(resp) 460 } 461 } 462 463 select { 464 case <-time.After(retryConnWait): 465 case <-l.stopCtx.Done(): 466 return l.stopCtx.Err() 467 } 468 } 469} 470 471// resetRecv opens a new lease stream and starts sending keep alive requests. 472func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { 473 sctx, cancel := context.WithCancel(l.stopCtx) 474 stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...) 475 if err != nil { 476 cancel() 477 return nil, err 478 } 479 480 l.mu.Lock() 481 defer l.mu.Unlock() 482 if l.stream != nil && l.streamCancel != nil { 483 l.streamCancel() 484 } 485 486 l.streamCancel = cancel 487 l.stream = stream 488 489 go l.sendKeepAliveLoop(stream) 490 return stream, nil 491} 492 493// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse 494func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { 495 karesp := &LeaseKeepAliveResponse{ 496 ResponseHeader: resp.GetHeader(), 497 ID: LeaseID(resp.ID), 498 TTL: resp.TTL, 499 } 500 501 l.mu.Lock() 502 defer l.mu.Unlock() 503 504 ka, ok := l.keepAlives[karesp.ID] 505 if !ok { 506 return 507 } 508 509 if karesp.TTL <= 0 { 510 // lease expired; close all keep alive channels 511 delete(l.keepAlives, karesp.ID) 512 ka.close() 513 return 514 } 515 516 // send update to all channels 517 nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0) 518 ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second) 519 for _, ch := range ka.chs { 520 select { 521 case ch <- karesp: 522 default: 523 if l.lg != nil { 524 l.lg.Warn("lease keepalive response queue is full; dropping response send", 525 zap.Int("queue-size", len(ch)), 526 zap.Int("queue-capacity", cap(ch)), 527 ) 528 } 529 } 530 // still advance in order to rate-limit keep-alive sends 531 ka.nextKeepAlive = nextKeepAlive 532 } 533} 534 535// deadlineLoop reaps any keep alive channels that have not received a response 536// within the lease TTL 537func (l *lessor) deadlineLoop() { 538 for { 539 select { 540 case <-time.After(time.Second): 541 case <-l.donec: 542 return 543 } 544 now := time.Now() 545 l.mu.Lock() 546 for id, ka := range l.keepAlives { 547 if ka.deadline.Before(now) { 548 // waited too long for response; lease may be expired 549 ka.close() 550 delete(l.keepAlives, id) 551 } 552 } 553 l.mu.Unlock() 554 } 555} 556 557// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream. 558func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { 559 for { 560 var tosend []LeaseID 561 562 now := time.Now() 563 l.mu.Lock() 564 for id, ka := range l.keepAlives { 565 if ka.nextKeepAlive.Before(now) { 566 tosend = append(tosend, id) 567 } 568 } 569 l.mu.Unlock() 570 571 for _, id := range tosend { 572 r := &pb.LeaseKeepAliveRequest{ID: int64(id)} 573 if err := stream.Send(r); err != nil { 574 // TODO do something with this error? 575 return 576 } 577 } 578 579 select { 580 case <-time.After(retryConnWait): 581 case <-stream.Context().Done(): 582 return 583 case <-l.donec: 584 return 585 case <-l.stopCtx.Done(): 586 return 587 } 588 } 589} 590 591func (ka *keepAlive) close() { 592 close(ka.donec) 593 for _, ch := range ka.chs { 594 close(ch) 595 } 596} 597