1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "fmt" 21 "sync" 22 "time" 23 24 "k8s.io/apimachinery/pkg/runtime" 25 "k8s.io/apimachinery/pkg/util/clock" 26 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 27 "k8s.io/apimachinery/pkg/util/wait" 28 "k8s.io/client-go/util/retry" 29 "k8s.io/utils/buffer" 30 31 "k8s.io/klog" 32) 33 34// SharedInformer provides eventually consistent linkage of its 35// clients to the authoritative state of a given collection of 36// objects. An object is identified by its API group, kind/resource, 37// namespace, and name. One SharedInfomer provides linkage to objects 38// of a particular API group and kind/resource. The linked object 39// collection of a SharedInformer may be further restricted to one 40// namespace and/or by label selector and/or field selector. 41// 42// The authoritative state of an object is what apiservers provide 43// access to, and an object goes through a strict sequence of states. 44// A state is either "absent" or present with a ResourceVersion and 45// other appropriate content. 46// 47// A SharedInformer maintains a local cache, exposed by Store(), of 48// the state of each relevant object. This cache is eventually 49// consistent with the authoritative state. This means that, unless 50// prevented by persistent communication problems, if ever a 51// particular object ID X is authoritatively associated with a state S 52// then for every SharedInformer I whose collection includes (X, S) 53// eventually either (1) I's cache associates X with S or a later 54// state of X, (2) I is stopped, or (3) the authoritative state 55// service for X terminates. To be formally complete, we say that the 56// absent state meets any restriction by label selector or field 57// selector. 58// 59// As a simple example, if a collection of objects is henceforeth 60// unchanging and a SharedInformer is created that links to that 61// collection then that SharedInformer's cache eventually holds an 62// exact copy of that collection (unless it is stopped too soon, the 63// authoritative state service ends, or communication problems between 64// the two persistently thwart achievement). 65// 66// As another simple example, if the local cache ever holds a 67// non-absent state for some object ID and the object is eventually 68// removed from the authoritative state then eventually the object is 69// removed from the local cache (unless the SharedInformer is stopped 70// too soon, the authoritative state service emnds, or communication 71// problems persistently thwart the desired result). 72// 73// The keys in Store() are of the form namespace/name for namespaced 74// objects, and are simply the name for non-namespaced objects. 75// 76// A client is identified here by a ResourceEventHandler. For every 77// update to the SharedInformer's local cache and for every client, 78// eventually either the SharedInformer is stopped or the client is 79// notified of the update. These notifications happen after the 80// corresponding cache update and, in the case of a 81// SharedIndexInformer, after the corresponding index updates. It is 82// possible that additional cache and index updates happen before such 83// a prescribed notification. For a given SharedInformer and client, 84// all notifications are delivered sequentially. For a given 85// SharedInformer, client, and object ID, the notifications are 86// delivered in order. 87// 88// A delete notification exposes the last locally known non-absent 89// state, except that its ResourceVersion is replaced with a 90// ResourceVersion in which the object is actually absent. 91type SharedInformer interface { 92 // AddEventHandler adds an event handler to the shared informer using the shared informer's resync 93 // period. Events to a single handler are delivered sequentially, but there is no coordination 94 // between different handlers. 95 AddEventHandler(handler ResourceEventHandler) 96 // AddEventHandlerWithResyncPeriod adds an event handler to the 97 // shared informer using the specified resync period. The resync 98 // operation consists of delivering to the handler a create 99 // notification for every object in the informer's local cache; it 100 // does not add any interactions with the authoritative storage. 101 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) 102 // GetStore returns the informer's local cache as a Store. 103 GetStore() Store 104 // GetController gives back a synthetic interface that "votes" to start the informer 105 GetController() Controller 106 // Run starts and runs the shared informer, returning after it stops. 107 // The informer will be stopped when stopCh is closed. 108 Run(stopCh <-chan struct{}) 109 // HasSynced returns true if the shared informer's store has been 110 // informed by at least one full LIST of the authoritative state 111 // of the informer's object collection. This is unrelated to "resync". 112 HasSynced() bool 113 // LastSyncResourceVersion is the resource version observed when last synced with the underlying 114 // store. The value returned is not synchronized with access to the underlying store and is not 115 // thread-safe. 116 LastSyncResourceVersion() string 117} 118 119type SharedIndexInformer interface { 120 SharedInformer 121 // AddIndexers add indexers to the informer before it starts. 122 AddIndexers(indexers Indexers) error 123 GetIndexer() Indexer 124} 125 126// NewSharedInformer creates a new instance for the listwatcher. 127func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { 128 return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) 129} 130 131// NewSharedIndexInformer creates a new instance for the listwatcher. 132func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { 133 realClock := &clock.RealClock{} 134 sharedIndexInformer := &sharedIndexInformer{ 135 processor: &sharedProcessor{clock: realClock}, 136 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), 137 listerWatcher: lw, 138 objectType: objType, 139 resyncCheckPeriod: defaultEventHandlerResyncPeriod, 140 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, 141 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), 142 clock: realClock, 143 } 144 return sharedIndexInformer 145} 146 147// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. 148type InformerSynced func() bool 149 150const ( 151 // syncedPollPeriod controls how often you look at the status of your sync funcs 152 syncedPollPeriod = 100 * time.Millisecond 153 154 // initialBufferSize is the initial number of event notifications that can be buffered. 155 initialBufferSize = 1024 156) 157 158// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false 159// if the controller should shutdown 160func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 161 err := wait.PollUntil(syncedPollPeriod, 162 func() (bool, error) { 163 for _, syncFunc := range cacheSyncs { 164 if !syncFunc() { 165 return false, nil 166 } 167 } 168 return true, nil 169 }, 170 stopCh) 171 if err != nil { 172 klog.V(2).Infof("stop requested") 173 return false 174 } 175 176 klog.V(4).Infof("caches populated") 177 return true 178} 179 180type sharedIndexInformer struct { 181 indexer Indexer 182 controller Controller 183 184 processor *sharedProcessor 185 cacheMutationDetector CacheMutationDetector 186 187 // This block is tracked to handle late initialization of the controller 188 listerWatcher ListerWatcher 189 objectType runtime.Object 190 191 // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call 192 // shouldResync to check if any of our listeners need a resync. 193 resyncCheckPeriod time.Duration 194 // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via 195 // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default 196 // value). 197 defaultEventHandlerResyncPeriod time.Duration 198 // clock allows for testability 199 clock clock.Clock 200 201 started, stopped bool 202 startedLock sync.Mutex 203 204 // blockDeltas gives a way to stop all event distribution so that a late event handler 205 // can safely join the shared informer. 206 blockDeltas sync.Mutex 207} 208 209// dummyController hides the fact that a SharedInformer is different from a dedicated one 210// where a caller can `Run`. The run method is disconnected in this case, because higher 211// level logic will decide when to start the SharedInformer and related controller. 212// Because returning information back is always asynchronous, the legacy callers shouldn't 213// notice any change in behavior. 214type dummyController struct { 215 informer *sharedIndexInformer 216} 217 218func (v *dummyController) Run(stopCh <-chan struct{}) { 219} 220 221func (v *dummyController) HasSynced() bool { 222 return v.informer.HasSynced() 223} 224 225func (c *dummyController) LastSyncResourceVersion() string { 226 return "" 227} 228 229type updateNotification struct { 230 oldObj interface{} 231 newObj interface{} 232} 233 234type addNotification struct { 235 newObj interface{} 236} 237 238type deleteNotification struct { 239 oldObj interface{} 240} 241 242func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { 243 defer utilruntime.HandleCrash() 244 245 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) 246 247 cfg := &Config{ 248 Queue: fifo, 249 ListerWatcher: s.listerWatcher, 250 ObjectType: s.objectType, 251 FullResyncPeriod: s.resyncCheckPeriod, 252 RetryOnError: false, 253 ShouldResync: s.processor.shouldResync, 254 255 Process: s.HandleDeltas, 256 } 257 258 func() { 259 s.startedLock.Lock() 260 defer s.startedLock.Unlock() 261 262 s.controller = New(cfg) 263 s.controller.(*controller).clock = s.clock 264 s.started = true 265 }() 266 267 // Separate stop channel because Processor should be stopped strictly after controller 268 processorStopCh := make(chan struct{}) 269 var wg wait.Group 270 defer wg.Wait() // Wait for Processor to stop 271 defer close(processorStopCh) // Tell Processor to stop 272 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) 273 wg.StartWithChannel(processorStopCh, s.processor.run) 274 275 defer func() { 276 s.startedLock.Lock() 277 defer s.startedLock.Unlock() 278 s.stopped = true // Don't want any new listeners 279 }() 280 s.controller.Run(stopCh) 281} 282 283func (s *sharedIndexInformer) HasSynced() bool { 284 s.startedLock.Lock() 285 defer s.startedLock.Unlock() 286 287 if s.controller == nil { 288 return false 289 } 290 return s.controller.HasSynced() 291} 292 293func (s *sharedIndexInformer) LastSyncResourceVersion() string { 294 s.startedLock.Lock() 295 defer s.startedLock.Unlock() 296 297 if s.controller == nil { 298 return "" 299 } 300 return s.controller.LastSyncResourceVersion() 301} 302 303func (s *sharedIndexInformer) GetStore() Store { 304 return s.indexer 305} 306 307func (s *sharedIndexInformer) GetIndexer() Indexer { 308 return s.indexer 309} 310 311func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { 312 s.startedLock.Lock() 313 defer s.startedLock.Unlock() 314 315 if s.started { 316 return fmt.Errorf("informer has already started") 317 } 318 319 return s.indexer.AddIndexers(indexers) 320} 321 322func (s *sharedIndexInformer) GetController() Controller { 323 return &dummyController{informer: s} 324} 325 326func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { 327 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) 328} 329 330func determineResyncPeriod(desired, check time.Duration) time.Duration { 331 if desired == 0 { 332 return desired 333 } 334 if check == 0 { 335 klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) 336 return 0 337 } 338 if desired < check { 339 klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) 340 return check 341 } 342 return desired 343} 344 345const minimumResyncPeriod = 1 * time.Second 346 347func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { 348 s.startedLock.Lock() 349 defer s.startedLock.Unlock() 350 351 if s.stopped { 352 klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) 353 return 354 } 355 356 if resyncPeriod > 0 { 357 if resyncPeriod < minimumResyncPeriod { 358 klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod) 359 resyncPeriod = minimumResyncPeriod 360 } 361 362 if resyncPeriod < s.resyncCheckPeriod { 363 if s.started { 364 klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) 365 resyncPeriod = s.resyncCheckPeriod 366 } else { 367 // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update 368 // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners 369 // accordingly 370 s.resyncCheckPeriod = resyncPeriod 371 s.processor.resyncCheckPeriodChanged(resyncPeriod) 372 } 373 } 374 } 375 376 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) 377 378 if !s.started { 379 s.processor.addListener(listener) 380 return 381 } 382 383 // in order to safely join, we have to 384 // 1. stop sending add/update/delete notifications 385 // 2. do a list against the store 386 // 3. send synthetic "Add" events to the new handler 387 // 4. unblock 388 s.blockDeltas.Lock() 389 defer s.blockDeltas.Unlock() 390 391 s.processor.addListener(listener) 392 for _, item := range s.indexer.List() { 393 listener.add(addNotification{newObj: item}) 394 } 395} 396 397func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 398 s.blockDeltas.Lock() 399 defer s.blockDeltas.Unlock() 400 401 // from oldest to newest 402 for _, d := range obj.(Deltas) { 403 switch d.Type { 404 case Sync, Added, Updated: 405 isSync := d.Type == Sync 406 s.cacheMutationDetector.AddObject(d.Object) 407 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { 408 if err := s.indexer.Update(d.Object); err != nil { 409 return err 410 } 411 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) 412 } else { 413 if err := s.indexer.Add(d.Object); err != nil { 414 return err 415 } 416 s.processor.distribute(addNotification{newObj: d.Object}, isSync) 417 } 418 case Deleted: 419 if err := s.indexer.Delete(d.Object); err != nil { 420 return err 421 } 422 s.processor.distribute(deleteNotification{oldObj: d.Object}, false) 423 } 424 } 425 return nil 426} 427 428type sharedProcessor struct { 429 listenersStarted bool 430 listenersLock sync.RWMutex 431 listeners []*processorListener 432 syncingListeners []*processorListener 433 clock clock.Clock 434 wg wait.Group 435} 436 437func (p *sharedProcessor) addListener(listener *processorListener) { 438 p.listenersLock.Lock() 439 defer p.listenersLock.Unlock() 440 441 p.addListenerLocked(listener) 442 if p.listenersStarted { 443 p.wg.Start(listener.run) 444 p.wg.Start(listener.pop) 445 } 446} 447 448func (p *sharedProcessor) addListenerLocked(listener *processorListener) { 449 p.listeners = append(p.listeners, listener) 450 p.syncingListeners = append(p.syncingListeners, listener) 451} 452 453func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 454 p.listenersLock.RLock() 455 defer p.listenersLock.RUnlock() 456 457 if sync { 458 for _, listener := range p.syncingListeners { 459 listener.add(obj) 460 } 461 } else { 462 for _, listener := range p.listeners { 463 listener.add(obj) 464 } 465 } 466} 467 468func (p *sharedProcessor) run(stopCh <-chan struct{}) { 469 func() { 470 p.listenersLock.RLock() 471 defer p.listenersLock.RUnlock() 472 for _, listener := range p.listeners { 473 p.wg.Start(listener.run) 474 p.wg.Start(listener.pop) 475 } 476 p.listenersStarted = true 477 }() 478 <-stopCh 479 p.listenersLock.RLock() 480 defer p.listenersLock.RUnlock() 481 for _, listener := range p.listeners { 482 close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop 483 } 484 p.wg.Wait() // Wait for all .pop() and .run() to stop 485} 486 487// shouldResync queries every listener to determine if any of them need a resync, based on each 488// listener's resyncPeriod. 489func (p *sharedProcessor) shouldResync() bool { 490 p.listenersLock.Lock() 491 defer p.listenersLock.Unlock() 492 493 p.syncingListeners = []*processorListener{} 494 495 resyncNeeded := false 496 now := p.clock.Now() 497 for _, listener := range p.listeners { 498 // need to loop through all the listeners to see if they need to resync so we can prepare any 499 // listeners that are going to be resyncing. 500 if listener.shouldResync(now) { 501 resyncNeeded = true 502 p.syncingListeners = append(p.syncingListeners, listener) 503 listener.determineNextResync(now) 504 } 505 } 506 return resyncNeeded 507} 508 509func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { 510 p.listenersLock.RLock() 511 defer p.listenersLock.RUnlock() 512 513 for _, listener := range p.listeners { 514 resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) 515 listener.setResyncPeriod(resyncPeriod) 516 } 517} 518 519type processorListener struct { 520 nextCh chan interface{} 521 addCh chan interface{} 522 523 handler ResourceEventHandler 524 525 // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. 526 // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications 527 // added until we OOM. 528 // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but 529 // we should try to do something better. 530 pendingNotifications buffer.RingGrowing 531 532 // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer 533 requestedResyncPeriod time.Duration 534 // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This 535 // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the 536 // informer's overall resync check period. 537 resyncPeriod time.Duration 538 // nextResync is the earliest time the listener should get a full resync 539 nextResync time.Time 540 // resyncLock guards access to resyncPeriod and nextResync 541 resyncLock sync.Mutex 542} 543 544func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { 545 ret := &processorListener{ 546 nextCh: make(chan interface{}), 547 addCh: make(chan interface{}), 548 handler: handler, 549 pendingNotifications: *buffer.NewRingGrowing(bufferSize), 550 requestedResyncPeriod: requestedResyncPeriod, 551 resyncPeriod: resyncPeriod, 552 } 553 554 ret.determineNextResync(now) 555 556 return ret 557} 558 559func (p *processorListener) add(notification interface{}) { 560 p.addCh <- notification 561} 562 563func (p *processorListener) pop() { 564 defer utilruntime.HandleCrash() 565 defer close(p.nextCh) // Tell .run() to stop 566 567 var nextCh chan<- interface{} 568 var notification interface{} 569 for { 570 select { 571 case nextCh <- notification: 572 // Notification dispatched 573 var ok bool 574 notification, ok = p.pendingNotifications.ReadOne() 575 if !ok { // Nothing to pop 576 nextCh = nil // Disable this select case 577 } 578 case notificationToAdd, ok := <-p.addCh: 579 if !ok { 580 return 581 } 582 if notification == nil { // No notification to pop (and pendingNotifications is empty) 583 // Optimize the case - skip adding to pendingNotifications 584 notification = notificationToAdd 585 nextCh = p.nextCh 586 } else { // There is already a notification waiting to be dispatched 587 p.pendingNotifications.WriteOne(notificationToAdd) 588 } 589 } 590 } 591} 592 593func (p *processorListener) run() { 594 // this call blocks until the channel is closed. When a panic happens during the notification 595 // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) 596 // the next notification will be attempted. This is usually better than the alternative of never 597 // delivering again. 598 stopCh := make(chan struct{}) 599 wait.Until(func() { 600 // this gives us a few quick retries before a long pause and then a few more quick retries 601 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { 602 for next := range p.nextCh { 603 switch notification := next.(type) { 604 case updateNotification: 605 p.handler.OnUpdate(notification.oldObj, notification.newObj) 606 case addNotification: 607 p.handler.OnAdd(notification.newObj) 608 case deleteNotification: 609 p.handler.OnDelete(notification.oldObj) 610 default: 611 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) 612 } 613 } 614 // the only way to get here is if the p.nextCh is empty and closed 615 return true, nil 616 }) 617 618 // the only way to get here is if the p.nextCh is empty and closed 619 if err == nil { 620 close(stopCh) 621 } 622 }, 1*time.Minute, stopCh) 623} 624 625// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, 626// this always returns false. 627func (p *processorListener) shouldResync(now time.Time) bool { 628 p.resyncLock.Lock() 629 defer p.resyncLock.Unlock() 630 631 if p.resyncPeriod == 0 { 632 return false 633 } 634 635 return now.After(p.nextResync) || now.Equal(p.nextResync) 636} 637 638func (p *processorListener) determineNextResync(now time.Time) { 639 p.resyncLock.Lock() 640 defer p.resyncLock.Unlock() 641 642 p.nextResync = now.Add(p.resyncPeriod) 643} 644 645func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) { 646 p.resyncLock.Lock() 647 defer p.resyncLock.Unlock() 648 649 p.resyncPeriod = resyncPeriod 650} 651