1/* 2Copyright 2014 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "errors" 21 "fmt" 22 "io" 23 "math/rand" 24 "net" 25 "net/url" 26 "reflect" 27 "strings" 28 "sync" 29 "syscall" 30 "time" 31 32 apierrs "k8s.io/apimachinery/pkg/api/errors" 33 "k8s.io/apimachinery/pkg/api/meta" 34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 35 "k8s.io/apimachinery/pkg/runtime" 36 "k8s.io/apimachinery/pkg/util/clock" 37 "k8s.io/apimachinery/pkg/util/naming" 38 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 39 "k8s.io/apimachinery/pkg/util/wait" 40 "k8s.io/apimachinery/pkg/watch" 41 "k8s.io/klog" 42 "k8s.io/utils/trace" 43) 44 45// Reflector watches a specified resource and causes all changes to be reflected in the given store. 46type Reflector struct { 47 // name identifies this reflector. By default it will be a file:line if possible. 48 name string 49 // metrics tracks basic metric information about the reflector 50 metrics *reflectorMetrics 51 52 // The type of object we expect to place in the store. 53 expectedType reflect.Type 54 // The destination to sync up with the watch source 55 store Store 56 // listerWatcher is used to perform lists and watches. 57 listerWatcher ListerWatcher 58 // period controls timing between one watch ending and 59 // the beginning of the next one. 60 period time.Duration 61 resyncPeriod time.Duration 62 ShouldResync func() bool 63 // clock allows tests to manipulate time 64 clock clock.Clock 65 // lastSyncResourceVersion is the resource version token last 66 // observed when doing a sync with the underlying store 67 // it is thread safe, but not synchronized with the underlying store 68 lastSyncResourceVersion string 69 // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion 70 lastSyncResourceVersionMutex sync.RWMutex 71} 72 73var ( 74 // We try to spread the load on apiserver by setting timeouts for 75 // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. 76 minWatchTimeout = 5 * time.Minute 77) 78 79// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector 80// The indexer is configured to key on namespace 81func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { 82 indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc}) 83 reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) 84 return indexer, reflector 85} 86 87// NewReflector creates a new Reflector object which will keep the given store up to 88// date with the server's contents for the given resource. Reflector promises to 89// only put things in the store that have the type of expectedType, unless expectedType 90// is nil. If resyncPeriod is non-zero, then lists will be executed after every 91// resyncPeriod, so that you can use reflectors to periodically process everything as 92// well as incrementally processing the things that change. 93func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { 94 return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) 95} 96 97// NewNamedReflector same as NewReflector, but with a specified name for logging 98func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { 99 r := &Reflector{ 100 name: name, 101 listerWatcher: lw, 102 store: store, 103 expectedType: reflect.TypeOf(expectedType), 104 period: time.Second, 105 resyncPeriod: resyncPeriod, 106 clock: &clock.RealClock{}, 107 } 108 return r 109} 110 111func makeValidPrometheusMetricLabel(in string) string { 112 // this isn't perfect, but it removes our common characters 113 return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in) 114} 115 116// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common 117// call chains to NewReflector, so they'd be low entropy names for reflectors 118var internalPackages = []string{"client-go/tools/cache/"} 119 120// Run starts a watch and handles watch events. Will restart the watch if it is closed. 121// Run will exit when stopCh is closed. 122func (r *Reflector) Run(stopCh <-chan struct{}) { 123 klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) 124 wait.Until(func() { 125 if err := r.ListAndWatch(stopCh); err != nil { 126 utilruntime.HandleError(err) 127 } 128 }, r.period, stopCh) 129} 130 131var ( 132 // nothing will ever be sent down this channel 133 neverExitWatch <-chan time.Time = make(chan time.Time) 134 135 // Used to indicate that watching stopped so that a resync could happen. 136 errorResyncRequested = errors.New("resync channel fired") 137 138 // Used to indicate that watching stopped because of a signal from the stop 139 // channel passed in from a client of the reflector. 140 errorStopRequested = errors.New("Stop requested") 141) 142 143// resyncChan returns a channel which will receive something when a resync is 144// required, and a cleanup function. 145func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { 146 if r.resyncPeriod == 0 { 147 return neverExitWatch, func() bool { return false } 148 } 149 // The cleanup function is required: imagine the scenario where watches 150 // always fail so we end up listing frequently. Then, if we don't 151 // manually stop the timer, we could end up with many timers active 152 // concurrently. 153 t := r.clock.NewTimer(r.resyncPeriod) 154 return t.C(), t.Stop 155} 156 157// ListAndWatch first lists all items and get the resource version at the moment of call, 158// and then use the resource version to watch. 159// It returns error if ListAndWatch didn't even try to initialize watch. 160func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { 161 klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name) 162 var resourceVersion string 163 164 // Explicitly set "0" as resource version - it's fine for the List() 165 // to be served from cache and potentially be delayed relative to 166 // etcd contents. Reflector framework will catch up via Watch() eventually. 167 options := metav1.ListOptions{ResourceVersion: "0"} 168 169 if err := func() error { 170 initTrace := trace.New("Reflector " + r.name + " ListAndWatch") 171 defer initTrace.LogIfLong(10 * time.Second) 172 var list runtime.Object 173 var err error 174 listCh := make(chan struct{}, 1) 175 panicCh := make(chan interface{}, 1) 176 go func() { 177 defer func() { 178 if r := recover(); r != nil { 179 panicCh <- r 180 } 181 }() 182 list, err = r.listerWatcher.List(options) 183 close(listCh) 184 }() 185 select { 186 case <-stopCh: 187 return nil 188 case r := <-panicCh: 189 panic(r) 190 case <-listCh: 191 } 192 if err != nil { 193 return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err) 194 } 195 initTrace.Step("Objects listed") 196 listMetaInterface, err := meta.ListAccessor(list) 197 if err != nil { 198 return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) 199 } 200 resourceVersion = listMetaInterface.GetResourceVersion() 201 initTrace.Step("Resource version extracted") 202 items, err := meta.ExtractList(list) 203 if err != nil { 204 return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) 205 } 206 initTrace.Step("Objects extracted") 207 if err := r.syncWith(items, resourceVersion); err != nil { 208 return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) 209 } 210 initTrace.Step("SyncWith done") 211 r.setLastSyncResourceVersion(resourceVersion) 212 initTrace.Step("Resource version updated") 213 return nil 214 }(); err != nil { 215 return err 216 } 217 218 resyncerrc := make(chan error, 1) 219 cancelCh := make(chan struct{}) 220 defer close(cancelCh) 221 go func() { 222 resyncCh, cleanup := r.resyncChan() 223 defer func() { 224 cleanup() // Call the last one written into cleanup 225 }() 226 for { 227 select { 228 case <-resyncCh: 229 case <-stopCh: 230 return 231 case <-cancelCh: 232 return 233 } 234 if r.ShouldResync == nil || r.ShouldResync() { 235 klog.V(4).Infof("%s: forcing resync", r.name) 236 if err := r.store.Resync(); err != nil { 237 resyncerrc <- err 238 return 239 } 240 } 241 cleanup() 242 resyncCh, cleanup = r.resyncChan() 243 } 244 }() 245 246 for { 247 // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors 248 select { 249 case <-stopCh: 250 return nil 251 default: 252 } 253 254 timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) 255 options = metav1.ListOptions{ 256 ResourceVersion: resourceVersion, 257 // We want to avoid situations of hanging watchers. Stop any wachers that do not 258 // receive any events within the timeout window. 259 TimeoutSeconds: &timeoutSeconds, 260 } 261 262 w, err := r.listerWatcher.Watch(options) 263 if err != nil { 264 switch err { 265 case io.EOF: 266 // watch closed normally 267 case io.ErrUnexpectedEOF: 268 klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err) 269 default: 270 utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err)) 271 } 272 // If this is "connection refused" error, it means that most likely apiserver is not responsive. 273 // It doesn't make sense to re-list all objects because most likely we will be able to restart 274 // watch where we ended. 275 // If that's the case wait and resend watch request. 276 if urlError, ok := err.(*url.Error); ok { 277 if opError, ok := urlError.Err.(*net.OpError); ok { 278 if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED { 279 time.Sleep(time.Second) 280 continue 281 } 282 } 283 } 284 return nil 285 } 286 287 if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { 288 if err != errorStopRequested { 289 klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err) 290 } 291 return nil 292 } 293 } 294} 295 296// syncWith replaces the store's items with the given list. 297func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { 298 found := make([]interface{}, 0, len(items)) 299 for _, item := range items { 300 found = append(found, item) 301 } 302 return r.store.Replace(found, resourceVersion) 303} 304 305// watchHandler watches w and keeps *resourceVersion up to date. 306func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { 307 start := r.clock.Now() 308 eventCount := 0 309 310 // Stopping the watcher should be idempotent and if we return from this function there's no way 311 // we're coming back in with the same watch interface. 312 defer w.Stop() 313 314loop: 315 for { 316 select { 317 case <-stopCh: 318 return errorStopRequested 319 case err := <-errc: 320 return err 321 case event, ok := <-w.ResultChan(): 322 if !ok { 323 break loop 324 } 325 if event.Type == watch.Error { 326 return apierrs.FromObject(event.Object) 327 } 328 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { 329 utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) 330 continue 331 } 332 meta, err := meta.Accessor(event.Object) 333 if err != nil { 334 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) 335 continue 336 } 337 newResourceVersion := meta.GetResourceVersion() 338 switch event.Type { 339 case watch.Added: 340 err := r.store.Add(event.Object) 341 if err != nil { 342 utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) 343 } 344 case watch.Modified: 345 err := r.store.Update(event.Object) 346 if err != nil { 347 utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 348 } 349 case watch.Deleted: 350 // TODO: Will any consumers need access to the "last known 351 // state", which is passed in event.Object? If so, may need 352 // to change this. 353 err := r.store.Delete(event.Object) 354 if err != nil { 355 utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) 356 } 357 default: 358 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) 359 } 360 *resourceVersion = newResourceVersion 361 r.setLastSyncResourceVersion(newResourceVersion) 362 eventCount++ 363 } 364 } 365 366 watchDuration := r.clock.Now().Sub(start) 367 if watchDuration < 1*time.Second && eventCount == 0 { 368 return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) 369 } 370 klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) 371 return nil 372} 373 374// LastSyncResourceVersion is the resource version observed when last sync with the underlying store 375// The value returned is not synchronized with access to the underlying store and is not thread-safe 376func (r *Reflector) LastSyncResourceVersion() string { 377 r.lastSyncResourceVersionMutex.RLock() 378 defer r.lastSyncResourceVersionMutex.RUnlock() 379 return r.lastSyncResourceVersion 380} 381 382func (r *Reflector) setLastSyncResourceVersion(v string) { 383 r.lastSyncResourceVersionMutex.Lock() 384 defer r.lastSyncResourceVersionMutex.Unlock() 385 r.lastSyncResourceVersion = v 386} 387