1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "fmt" 21 "sync" 22 "time" 23 24 "k8s.io/apimachinery/pkg/runtime" 25 "k8s.io/apimachinery/pkg/util/clock" 26 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 27 "k8s.io/apimachinery/pkg/util/wait" 28 "k8s.io/client-go/util/retry" 29 "k8s.io/utils/buffer" 30 31 "k8s.io/klog" 32) 33 34// SharedInformer provides eventually consistent linkage of its 35// clients to the authoritative state of a given collection of 36// objects. An object is identified by its API group, kind/resource, 37// namespace, and name; the `ObjectMeta.UID` is not part of an 38// object's ID as far as this contract is concerned. One 39// SharedInformer provides linkage to objects of a particular API 40// group and kind/resource. The linked object collection of a 41// SharedInformer may be further restricted to one namespace and/or by 42// label selector and/or field selector. 43// 44// The authoritative state of an object is what apiservers provide 45// access to, and an object goes through a strict sequence of states. 46// An object state is either "absent" or present with a 47// ResourceVersion and other appropriate content. 48// 49// A SharedInformer gets object states from apiservers using a 50// sequence of LIST and WATCH operations. Through this sequence the 51// apiservers provide a sequence of "collection states" to the 52// informer, where each collection state defines the state of every 53// object of the collection. No promise --- beyond what is implied by 54// other remarks here --- is made about how one informer's sequence of 55// collection states relates to a different informer's sequence of 56// collection states. 57// 58// A SharedInformer maintains a local cache, exposed by GetStore() and 59// by GetIndexer() in the case of an indexed informer, of the state of 60// each relevant object. This cache is eventually consistent with the 61// authoritative state. This means that, unless prevented by 62// persistent communication problems, if ever a particular object ID X 63// is authoritatively associated with a state S then for every 64// SharedInformer I whose collection includes (X, S) eventually either 65// (1) I's cache associates X with S or a later state of X, (2) I is 66// stopped, or (3) the authoritative state service for X terminates. 67// To be formally complete, we say that the absent state meets any 68// restriction by label selector or field selector. 69// 70// The local cache starts out empty, and gets populated and updated 71// during `Run()`. 72// 73// As a simple example, if a collection of objects is henceforeth 74// unchanging, a SharedInformer is created that links to that 75// collection, and that SharedInformer is `Run()` then that 76// SharedInformer's cache eventually holds an exact copy of that 77// collection (unless it is stopped too soon, the authoritative state 78// service ends, or communication problems between the two 79// persistently thwart achievement). 80// 81// As another simple example, if the local cache ever holds a 82// non-absent state for some object ID and the object is eventually 83// removed from the authoritative state then eventually the object is 84// removed from the local cache (unless the SharedInformer is stopped 85// too soon, the authoritative state service ends, or communication 86// problems persistently thwart the desired result). 87// 88// The keys in the Store are of the form namespace/name for namespaced 89// objects, and are simply the name for non-namespaced objects. 90// Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for 91// a given object, and `SplitMetaNamespaceKey(key)` to split a key 92// into its constituent parts. 93// 94// A client is identified here by a ResourceEventHandler. For every 95// update to the SharedInformer's local cache and for every client 96// added before `Run()`, eventually either the SharedInformer is 97// stopped or the client is notified of the update. A client added 98// after `Run()` starts gets a startup batch of notifications of 99// additions of the object existing in the cache at the time that 100// client was added; also, for every update to the SharedInformer's 101// local cache after that client was added, eventually either the 102// SharedInformer is stopped or that client is notified of that 103// update. Client notifications happen after the corresponding cache 104// update and, in the case of a SharedIndexInformer, after the 105// corresponding index updates. It is possible that additional cache 106// and index updates happen before such a prescribed notification. 107// For a given SharedInformer and client, the notifications are 108// delivered sequentially. For a given SharedInformer, client, and 109// object ID, the notifications are delivered in order. 110// 111// A client must process each notification promptly; a SharedInformer 112// is not engineered to deal well with a large backlog of 113// notifications to deliver. Lengthy processing should be passed off 114// to something else, for example through a 115// `client-go/util/workqueue`. 116// 117// Each query to an informer's local cache --- whether a single-object 118// lookup, a list operation, or a use of one of its indices --- is 119// answered entirely from one of the collection states received by 120// that informer. 121// 122// A delete notification exposes the last locally known non-absent 123// state, except that its ResourceVersion is replaced with a 124// ResourceVersion in which the object is actually absent. 125type SharedInformer interface { 126 // AddEventHandler adds an event handler to the shared informer using the shared informer's resync 127 // period. Events to a single handler are delivered sequentially, but there is no coordination 128 // between different handlers. 129 AddEventHandler(handler ResourceEventHandler) 130 // AddEventHandlerWithResyncPeriod adds an event handler to the 131 // shared informer using the specified resync period. The resync 132 // operation consists of delivering to the handler a create 133 // notification for every object in the informer's local cache; it 134 // does not add any interactions with the authoritative storage. 135 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) 136 // GetStore returns the informer's local cache as a Store. 137 GetStore() Store 138 // GetController gives back a synthetic interface that "votes" to start the informer 139 GetController() Controller 140 // Run starts and runs the shared informer, returning after it stops. 141 // The informer will be stopped when stopCh is closed. 142 Run(stopCh <-chan struct{}) 143 // HasSynced returns true if the shared informer's store has been 144 // informed by at least one full LIST of the authoritative state 145 // of the informer's object collection. This is unrelated to "resync". 146 HasSynced() bool 147 // LastSyncResourceVersion is the resource version observed when last synced with the underlying 148 // store. The value returned is not synchronized with access to the underlying store and is not 149 // thread-safe. 150 LastSyncResourceVersion() string 151} 152 153// SharedIndexInformer provides add and get Indexers ability based on SharedInformer. 154type SharedIndexInformer interface { 155 SharedInformer 156 // AddIndexers add indexers to the informer before it starts. 157 AddIndexers(indexers Indexers) error 158 GetIndexer() Indexer 159} 160 161// NewSharedInformer creates a new instance for the listwatcher. 162func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { 163 return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) 164} 165 166// NewSharedIndexInformer creates a new instance for the listwatcher. 167func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { 168 realClock := &clock.RealClock{} 169 sharedIndexInformer := &sharedIndexInformer{ 170 processor: &sharedProcessor{clock: realClock}, 171 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), 172 listerWatcher: lw, 173 objectType: objType, 174 resyncCheckPeriod: defaultEventHandlerResyncPeriod, 175 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, 176 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), 177 clock: realClock, 178 } 179 return sharedIndexInformer 180} 181 182// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. 183type InformerSynced func() bool 184 185const ( 186 // syncedPollPeriod controls how often you look at the status of your sync funcs 187 syncedPollPeriod = 100 * time.Millisecond 188 189 // initialBufferSize is the initial number of event notifications that can be buffered. 190 initialBufferSize = 1024 191) 192 193// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages 194// indicating that the caller identified by name is waiting for syncs, followed by 195// either a successful or failed sync. 196func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 197 klog.Infof("Waiting for caches to sync for %s", controllerName) 198 199 if !WaitForCacheSync(stopCh, cacheSyncs...) { 200 utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName)) 201 return false 202 } 203 204 klog.Infof("Caches are synced for %s ", controllerName) 205 return true 206} 207 208// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false 209// if the controller should shutdown 210// callers should prefer WaitForNamedCacheSync() 211func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 212 err := wait.PollImmediateUntil(syncedPollPeriod, 213 func() (bool, error) { 214 for _, syncFunc := range cacheSyncs { 215 if !syncFunc() { 216 return false, nil 217 } 218 } 219 return true, nil 220 }, 221 stopCh) 222 if err != nil { 223 klog.V(2).Infof("stop requested") 224 return false 225 } 226 227 klog.V(4).Infof("caches populated") 228 return true 229} 230 231type sharedIndexInformer struct { 232 indexer Indexer 233 controller Controller 234 235 processor *sharedProcessor 236 cacheMutationDetector MutationDetector 237 238 // This block is tracked to handle late initialization of the controller 239 listerWatcher ListerWatcher 240 objectType runtime.Object 241 242 // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call 243 // shouldResync to check if any of our listeners need a resync. 244 resyncCheckPeriod time.Duration 245 // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via 246 // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default 247 // value). 248 defaultEventHandlerResyncPeriod time.Duration 249 // clock allows for testability 250 clock clock.Clock 251 252 started, stopped bool 253 startedLock sync.Mutex 254 255 // blockDeltas gives a way to stop all event distribution so that a late event handler 256 // can safely join the shared informer. 257 blockDeltas sync.Mutex 258} 259 260// dummyController hides the fact that a SharedInformer is different from a dedicated one 261// where a caller can `Run`. The run method is disconnected in this case, because higher 262// level logic will decide when to start the SharedInformer and related controller. 263// Because returning information back is always asynchronous, the legacy callers shouldn't 264// notice any change in behavior. 265type dummyController struct { 266 informer *sharedIndexInformer 267} 268 269func (v *dummyController) Run(stopCh <-chan struct{}) { 270} 271 272func (v *dummyController) HasSynced() bool { 273 return v.informer.HasSynced() 274} 275 276func (v *dummyController) LastSyncResourceVersion() string { 277 return "" 278} 279 280type updateNotification struct { 281 oldObj interface{} 282 newObj interface{} 283} 284 285type addNotification struct { 286 newObj interface{} 287} 288 289type deleteNotification struct { 290 oldObj interface{} 291} 292 293func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { 294 defer utilruntime.HandleCrash() 295 296 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) 297 298 cfg := &Config{ 299 Queue: fifo, 300 ListerWatcher: s.listerWatcher, 301 ObjectType: s.objectType, 302 FullResyncPeriod: s.resyncCheckPeriod, 303 RetryOnError: false, 304 ShouldResync: s.processor.shouldResync, 305 306 Process: s.HandleDeltas, 307 } 308 309 func() { 310 s.startedLock.Lock() 311 defer s.startedLock.Unlock() 312 313 s.controller = New(cfg) 314 s.controller.(*controller).clock = s.clock 315 s.started = true 316 }() 317 318 // Separate stop channel because Processor should be stopped strictly after controller 319 processorStopCh := make(chan struct{}) 320 var wg wait.Group 321 defer wg.Wait() // Wait for Processor to stop 322 defer close(processorStopCh) // Tell Processor to stop 323 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) 324 wg.StartWithChannel(processorStopCh, s.processor.run) 325 326 defer func() { 327 s.startedLock.Lock() 328 defer s.startedLock.Unlock() 329 s.stopped = true // Don't want any new listeners 330 }() 331 s.controller.Run(stopCh) 332} 333 334func (s *sharedIndexInformer) HasSynced() bool { 335 s.startedLock.Lock() 336 defer s.startedLock.Unlock() 337 338 if s.controller == nil { 339 return false 340 } 341 return s.controller.HasSynced() 342} 343 344func (s *sharedIndexInformer) LastSyncResourceVersion() string { 345 s.startedLock.Lock() 346 defer s.startedLock.Unlock() 347 348 if s.controller == nil { 349 return "" 350 } 351 return s.controller.LastSyncResourceVersion() 352} 353 354func (s *sharedIndexInformer) GetStore() Store { 355 return s.indexer 356} 357 358func (s *sharedIndexInformer) GetIndexer() Indexer { 359 return s.indexer 360} 361 362func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { 363 s.startedLock.Lock() 364 defer s.startedLock.Unlock() 365 366 if s.started { 367 return fmt.Errorf("informer has already started") 368 } 369 370 return s.indexer.AddIndexers(indexers) 371} 372 373func (s *sharedIndexInformer) GetController() Controller { 374 return &dummyController{informer: s} 375} 376 377func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { 378 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) 379} 380 381func determineResyncPeriod(desired, check time.Duration) time.Duration { 382 if desired == 0 { 383 return desired 384 } 385 if check == 0 { 386 klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) 387 return 0 388 } 389 if desired < check { 390 klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) 391 return check 392 } 393 return desired 394} 395 396const minimumResyncPeriod = 1 * time.Second 397 398func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { 399 s.startedLock.Lock() 400 defer s.startedLock.Unlock() 401 402 if s.stopped { 403 klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) 404 return 405 } 406 407 if resyncPeriod > 0 { 408 if resyncPeriod < minimumResyncPeriod { 409 klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod) 410 resyncPeriod = minimumResyncPeriod 411 } 412 413 if resyncPeriod < s.resyncCheckPeriod { 414 if s.started { 415 klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) 416 resyncPeriod = s.resyncCheckPeriod 417 } else { 418 // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update 419 // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners 420 // accordingly 421 s.resyncCheckPeriod = resyncPeriod 422 s.processor.resyncCheckPeriodChanged(resyncPeriod) 423 } 424 } 425 } 426 427 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) 428 429 if !s.started { 430 s.processor.addListener(listener) 431 return 432 } 433 434 // in order to safely join, we have to 435 // 1. stop sending add/update/delete notifications 436 // 2. do a list against the store 437 // 3. send synthetic "Add" events to the new handler 438 // 4. unblock 439 s.blockDeltas.Lock() 440 defer s.blockDeltas.Unlock() 441 442 s.processor.addListener(listener) 443 for _, item := range s.indexer.List() { 444 listener.add(addNotification{newObj: item}) 445 } 446} 447 448func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 449 s.blockDeltas.Lock() 450 defer s.blockDeltas.Unlock() 451 452 // from oldest to newest 453 for _, d := range obj.(Deltas) { 454 switch d.Type { 455 case Sync, Added, Updated: 456 isSync := d.Type == Sync 457 s.cacheMutationDetector.AddObject(d.Object) 458 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { 459 if err := s.indexer.Update(d.Object); err != nil { 460 return err 461 } 462 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) 463 } else { 464 if err := s.indexer.Add(d.Object); err != nil { 465 return err 466 } 467 s.processor.distribute(addNotification{newObj: d.Object}, isSync) 468 } 469 case Deleted: 470 if err := s.indexer.Delete(d.Object); err != nil { 471 return err 472 } 473 s.processor.distribute(deleteNotification{oldObj: d.Object}, false) 474 } 475 } 476 return nil 477} 478 479type sharedProcessor struct { 480 listenersStarted bool 481 listenersLock sync.RWMutex 482 listeners []*processorListener 483 syncingListeners []*processorListener 484 clock clock.Clock 485 wg wait.Group 486} 487 488func (p *sharedProcessor) addListener(listener *processorListener) { 489 p.listenersLock.Lock() 490 defer p.listenersLock.Unlock() 491 492 p.addListenerLocked(listener) 493 if p.listenersStarted { 494 p.wg.Start(listener.run) 495 p.wg.Start(listener.pop) 496 } 497} 498 499func (p *sharedProcessor) addListenerLocked(listener *processorListener) { 500 p.listeners = append(p.listeners, listener) 501 p.syncingListeners = append(p.syncingListeners, listener) 502} 503 504func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 505 p.listenersLock.RLock() 506 defer p.listenersLock.RUnlock() 507 508 if sync { 509 for _, listener := range p.syncingListeners { 510 listener.add(obj) 511 } 512 } else { 513 for _, listener := range p.listeners { 514 listener.add(obj) 515 } 516 } 517} 518 519func (p *sharedProcessor) run(stopCh <-chan struct{}) { 520 func() { 521 p.listenersLock.RLock() 522 defer p.listenersLock.RUnlock() 523 for _, listener := range p.listeners { 524 p.wg.Start(listener.run) 525 p.wg.Start(listener.pop) 526 } 527 p.listenersStarted = true 528 }() 529 <-stopCh 530 p.listenersLock.RLock() 531 defer p.listenersLock.RUnlock() 532 for _, listener := range p.listeners { 533 close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop 534 } 535 p.wg.Wait() // Wait for all .pop() and .run() to stop 536} 537 538// shouldResync queries every listener to determine if any of them need a resync, based on each 539// listener's resyncPeriod. 540func (p *sharedProcessor) shouldResync() bool { 541 p.listenersLock.Lock() 542 defer p.listenersLock.Unlock() 543 544 p.syncingListeners = []*processorListener{} 545 546 resyncNeeded := false 547 now := p.clock.Now() 548 for _, listener := range p.listeners { 549 // need to loop through all the listeners to see if they need to resync so we can prepare any 550 // listeners that are going to be resyncing. 551 if listener.shouldResync(now) { 552 resyncNeeded = true 553 p.syncingListeners = append(p.syncingListeners, listener) 554 listener.determineNextResync(now) 555 } 556 } 557 return resyncNeeded 558} 559 560func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { 561 p.listenersLock.RLock() 562 defer p.listenersLock.RUnlock() 563 564 for _, listener := range p.listeners { 565 resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) 566 listener.setResyncPeriod(resyncPeriod) 567 } 568} 569 570type processorListener struct { 571 nextCh chan interface{} 572 addCh chan interface{} 573 574 handler ResourceEventHandler 575 576 // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. 577 // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications 578 // added until we OOM. 579 // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but 580 // we should try to do something better. 581 pendingNotifications buffer.RingGrowing 582 583 // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer 584 requestedResyncPeriod time.Duration 585 // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This 586 // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the 587 // informer's overall resync check period. 588 resyncPeriod time.Duration 589 // nextResync is the earliest time the listener should get a full resync 590 nextResync time.Time 591 // resyncLock guards access to resyncPeriod and nextResync 592 resyncLock sync.Mutex 593} 594 595func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { 596 ret := &processorListener{ 597 nextCh: make(chan interface{}), 598 addCh: make(chan interface{}), 599 handler: handler, 600 pendingNotifications: *buffer.NewRingGrowing(bufferSize), 601 requestedResyncPeriod: requestedResyncPeriod, 602 resyncPeriod: resyncPeriod, 603 } 604 605 ret.determineNextResync(now) 606 607 return ret 608} 609 610func (p *processorListener) add(notification interface{}) { 611 p.addCh <- notification 612} 613 614func (p *processorListener) pop() { 615 defer utilruntime.HandleCrash() 616 defer close(p.nextCh) // Tell .run() to stop 617 618 var nextCh chan<- interface{} 619 var notification interface{} 620 for { 621 select { 622 case nextCh <- notification: 623 // Notification dispatched 624 var ok bool 625 notification, ok = p.pendingNotifications.ReadOne() 626 if !ok { // Nothing to pop 627 nextCh = nil // Disable this select case 628 } 629 case notificationToAdd, ok := <-p.addCh: 630 if !ok { 631 return 632 } 633 if notification == nil { // No notification to pop (and pendingNotifications is empty) 634 // Optimize the case - skip adding to pendingNotifications 635 notification = notificationToAdd 636 nextCh = p.nextCh 637 } else { // There is already a notification waiting to be dispatched 638 p.pendingNotifications.WriteOne(notificationToAdd) 639 } 640 } 641 } 642} 643 644func (p *processorListener) run() { 645 // this call blocks until the channel is closed. When a panic happens during the notification 646 // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) 647 // the next notification will be attempted. This is usually better than the alternative of never 648 // delivering again. 649 stopCh := make(chan struct{}) 650 wait.Until(func() { 651 // this gives us a few quick retries before a long pause and then a few more quick retries 652 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { 653 for next := range p.nextCh { 654 switch notification := next.(type) { 655 case updateNotification: 656 p.handler.OnUpdate(notification.oldObj, notification.newObj) 657 case addNotification: 658 p.handler.OnAdd(notification.newObj) 659 case deleteNotification: 660 p.handler.OnDelete(notification.oldObj) 661 default: 662 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) 663 } 664 } 665 // the only way to get here is if the p.nextCh is empty and closed 666 return true, nil 667 }) 668 669 // the only way to get here is if the p.nextCh is empty and closed 670 if err == nil { 671 close(stopCh) 672 } 673 }, 1*time.Minute, stopCh) 674} 675 676// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, 677// this always returns false. 678func (p *processorListener) shouldResync(now time.Time) bool { 679 p.resyncLock.Lock() 680 defer p.resyncLock.Unlock() 681 682 if p.resyncPeriod == 0 { 683 return false 684 } 685 686 return now.After(p.nextResync) || now.Equal(p.nextResync) 687} 688 689func (p *processorListener) determineNextResync(now time.Time) { 690 p.resyncLock.Lock() 691 defer p.resyncLock.Unlock() 692 693 p.nextResync = now.Add(p.resyncPeriod) 694} 695 696func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) { 697 p.resyncLock.Lock() 698 defer p.resyncLock.Unlock() 699 700 p.resyncPeriod = resyncPeriod 701} 702