1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cacher 18 19import ( 20 "fmt" 21 "reflect" 22 "sort" 23 "sync" 24 "time" 25 26 "k8s.io/apimachinery/pkg/api/errors" 27 "k8s.io/apimachinery/pkg/fields" 28 "k8s.io/apimachinery/pkg/labels" 29 "k8s.io/apimachinery/pkg/runtime" 30 "k8s.io/apimachinery/pkg/util/clock" 31 "k8s.io/apimachinery/pkg/watch" 32 "k8s.io/apiserver/pkg/storage" 33 "k8s.io/client-go/tools/cache" 34 "k8s.io/klog/v2" 35 utiltrace "k8s.io/utils/trace" 36) 37 38const ( 39 // blockTimeout determines how long we're willing to block the request 40 // to wait for a given resource version to be propagated to cache, 41 // before terminating request and returning Timeout error with retry 42 // after suggestion. 43 blockTimeout = 3 * time.Second 44 45 // resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client 46 // after receiving a 'too high resource version' error. 47 resourceVersionTooHighRetrySeconds = 1 48 49 // eventFreshDuration is time duration of events we want to keep. 50 // We set it to `defaultBookmarkFrequency` plus epsilon to maximize 51 // chances that last bookmark was sent within kept history, at the 52 // same time, minimizing the needed memory usage. 53 eventFreshDuration = 75 * time.Second 54 55 // defaultLowerBoundCapacity is a default value for event cache capacity's lower bound. 56 // TODO: Figure out, to what value we can decreased it. 57 defaultLowerBoundCapacity = 100 58 59 // defaultUpperBoundCapacity should be able to keep eventFreshDuration of history. 60 defaultUpperBoundCapacity = 100 * 1024 61) 62 63// watchCacheEvent is a single "watch event" that is send to users of 64// watchCache. Additionally to a typical "watch.Event" it contains 65// the previous value of the object to enable proper filtering in the 66// upper layers. 67type watchCacheEvent struct { 68 Type watch.EventType 69 Object runtime.Object 70 ObjLabels labels.Set 71 ObjFields fields.Set 72 PrevObject runtime.Object 73 PrevObjLabels labels.Set 74 PrevObjFields fields.Set 75 Key string 76 ResourceVersion uint64 77 RecordTime time.Time 78} 79 80// Computing a key of an object is generally non-trivial (it performs 81// e.g. validation underneath). Similarly computing object fields and 82// labels. To avoid computing them multiple times (to serve the event 83// in different List/Watch requests), in the underlying store we are 84// keeping structs (key, object, labels, fields). 85type storeElement struct { 86 Key string 87 Object runtime.Object 88 Labels labels.Set 89 Fields fields.Set 90} 91 92func storeElementKey(obj interface{}) (string, error) { 93 elem, ok := obj.(*storeElement) 94 if !ok { 95 return "", fmt.Errorf("not a storeElement: %v", obj) 96 } 97 return elem.Key, nil 98} 99 100func storeElementObject(obj interface{}) (runtime.Object, error) { 101 elem, ok := obj.(*storeElement) 102 if !ok { 103 return nil, fmt.Errorf("not a storeElement: %v", obj) 104 } 105 return elem.Object, nil 106} 107 108func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc { 109 return func(obj interface{}) (strings []string, e error) { 110 seo, err := storeElementObject(obj) 111 if err != nil { 112 return nil, err 113 } 114 return objIndexFunc(seo) 115 } 116} 117 118func storeElementIndexers(indexers *cache.Indexers) cache.Indexers { 119 if indexers == nil { 120 return cache.Indexers{} 121 } 122 ret := cache.Indexers{} 123 for indexName, indexFunc := range *indexers { 124 ret[indexName] = storeElementIndexFunc(indexFunc) 125 } 126 return ret 127} 128 129// watchCache implements a Store interface. 130// However, it depends on the elements implementing runtime.Object interface. 131// 132// watchCache is a "sliding window" (with a limited capacity) of objects 133// observed from a watch. 134type watchCache struct { 135 sync.RWMutex 136 137 // Condition on which lists are waiting for the fresh enough 138 // resource version. 139 cond *sync.Cond 140 141 // Maximum size of history window. 142 capacity int 143 144 // upper bound of capacity since event cache has a dynamic size. 145 upperBoundCapacity int 146 147 // lower bound of capacity since event cache has a dynamic size. 148 lowerBoundCapacity int 149 150 // keyFunc is used to get a key in the underlying storage for a given object. 151 keyFunc func(runtime.Object) (string, error) 152 153 // getAttrsFunc is used to get labels and fields of an object. 154 getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) 155 156 // cache is used a cyclic buffer - its first element (with the smallest 157 // resourceVersion) is defined by startIndex, its last element is defined 158 // by endIndex (if cache is full it will be startIndex + capacity). 159 // Both startIndex and endIndex can be greater than buffer capacity - 160 // you should always apply modulo capacity to get an index in cache array. 161 cache []*watchCacheEvent 162 startIndex int 163 endIndex int 164 165 // store will effectively support LIST operation from the "end of cache 166 // history" i.e. from the moment just after the newest cached watched event. 167 // It is necessary to effectively allow clients to start watching at now. 168 // NOTE: We assume that <store> is thread-safe. 169 store cache.Indexer 170 171 // ResourceVersion up to which the watchCache is propagated. 172 resourceVersion uint64 173 174 // ResourceVersion of the last list result (populated via Replace() method). 175 listResourceVersion uint64 176 177 // This handler is run at the end of every successful Replace() method. 178 onReplace func() 179 180 // This handler is run at the end of every Add/Update/Delete method 181 // and additionally gets the previous value of the object. 182 eventHandler func(*watchCacheEvent) 183 184 // for testing timeouts. 185 clock clock.Clock 186 187 // An underlying storage.Versioner. 188 versioner storage.Versioner 189 190 // cacher's objectType. 191 objectType reflect.Type 192} 193 194func newWatchCache( 195 keyFunc func(runtime.Object) (string, error), 196 eventHandler func(*watchCacheEvent), 197 getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), 198 versioner storage.Versioner, 199 indexers *cache.Indexers, 200 clock clock.Clock, 201 objectType reflect.Type) *watchCache { 202 wc := &watchCache{ 203 capacity: defaultLowerBoundCapacity, 204 keyFunc: keyFunc, 205 getAttrsFunc: getAttrsFunc, 206 cache: make([]*watchCacheEvent, defaultLowerBoundCapacity), 207 lowerBoundCapacity: defaultLowerBoundCapacity, 208 upperBoundCapacity: defaultUpperBoundCapacity, 209 startIndex: 0, 210 endIndex: 0, 211 store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)), 212 resourceVersion: 0, 213 listResourceVersion: 0, 214 eventHandler: eventHandler, 215 clock: clock, 216 versioner: versioner, 217 objectType: objectType, 218 } 219 objType := objectType.String() 220 watchCacheCapacity.WithLabelValues(objType).Set(float64(wc.capacity)) 221 wc.cond = sync.NewCond(wc.RLocker()) 222 return wc 223} 224 225// Add takes runtime.Object as an argument. 226func (w *watchCache) Add(obj interface{}) error { 227 object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) 228 if err != nil { 229 return err 230 } 231 event := watch.Event{Type: watch.Added, Object: object} 232 233 f := func(elem *storeElement) error { return w.store.Add(elem) } 234 return w.processEvent(event, resourceVersion, f) 235} 236 237// Update takes runtime.Object as an argument. 238func (w *watchCache) Update(obj interface{}) error { 239 object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) 240 if err != nil { 241 return err 242 } 243 event := watch.Event{Type: watch.Modified, Object: object} 244 245 f := func(elem *storeElement) error { return w.store.Update(elem) } 246 return w.processEvent(event, resourceVersion, f) 247} 248 249// Delete takes runtime.Object as an argument. 250func (w *watchCache) Delete(obj interface{}) error { 251 object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj) 252 if err != nil { 253 return err 254 } 255 event := watch.Event{Type: watch.Deleted, Object: object} 256 257 f := func(elem *storeElement) error { return w.store.Delete(elem) } 258 return w.processEvent(event, resourceVersion, f) 259} 260 261func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) { 262 object, ok := obj.(runtime.Object) 263 if !ok { 264 return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) 265 } 266 resourceVersion, err := w.versioner.ObjectResourceVersion(object) 267 if err != nil { 268 return nil, 0, err 269 } 270 return object, resourceVersion, nil 271} 272 273// processEvent is safe as long as there is at most one call to it in flight 274// at any point in time. 275func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { 276 key, err := w.keyFunc(event.Object) 277 if err != nil { 278 return fmt.Errorf("couldn't compute key: %v", err) 279 } 280 elem := &storeElement{Key: key, Object: event.Object} 281 elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object) 282 if err != nil { 283 return err 284 } 285 286 wcEvent := &watchCacheEvent{ 287 Type: event.Type, 288 Object: elem.Object, 289 ObjLabels: elem.Labels, 290 ObjFields: elem.Fields, 291 Key: key, 292 ResourceVersion: resourceVersion, 293 RecordTime: w.clock.Now(), 294 } 295 296 if err := func() error { 297 // TODO: We should consider moving this lock below after the watchCacheEvent 298 // is created. In such situation, the only problematic scenario is Replace( 299 // happening after getting object from store and before acquiring a lock. 300 // Maybe introduce another lock for this purpose. 301 w.Lock() 302 defer w.Unlock() 303 304 previous, exists, err := w.store.Get(elem) 305 if err != nil { 306 return err 307 } 308 if exists { 309 previousElem := previous.(*storeElement) 310 wcEvent.PrevObject = previousElem.Object 311 wcEvent.PrevObjLabels = previousElem.Labels 312 wcEvent.PrevObjFields = previousElem.Fields 313 } 314 315 w.updateCache(wcEvent) 316 w.resourceVersion = resourceVersion 317 defer w.cond.Broadcast() 318 319 return updateFunc(elem) 320 }(); err != nil { 321 return err 322 } 323 324 // Avoid calling event handler under lock. 325 // This is safe as long as there is at most one call to Add/Update/Delete and 326 // UpdateResourceVersion in flight at any point in time, which is true now, 327 // because reflector calls them synchronously from its main thread. 328 if w.eventHandler != nil { 329 w.eventHandler(wcEvent) 330 } 331 return nil 332} 333 334// Assumes that lock is already held for write. 335func (w *watchCache) updateCache(event *watchCacheEvent) { 336 w.resizeCacheLocked(event.RecordTime) 337 if w.isCacheFullLocked() { 338 // Cache is full - remove the oldest element. 339 w.startIndex++ 340 } 341 w.cache[w.endIndex%w.capacity] = event 342 w.endIndex++ 343} 344 345// resizeCacheLocked resizes the cache if necessary: 346// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration. 347// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping). 348func (w *watchCache) resizeCacheLocked(eventTime time.Time) { 349 if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration { 350 capacity := min(w.capacity*2, w.upperBoundCapacity) 351 if capacity > w.capacity { 352 w.doCacheResizeLocked(capacity) 353 } 354 return 355 } 356 if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration { 357 capacity := max(w.capacity/2, w.lowerBoundCapacity) 358 if capacity < w.capacity { 359 w.doCacheResizeLocked(capacity) 360 } 361 return 362 } 363} 364 365// isCacheFullLocked used to judge whether watchCacheEvent is full. 366// Assumes that lock is already held for write. 367func (w *watchCache) isCacheFullLocked() bool { 368 return w.endIndex == w.startIndex+w.capacity 369} 370 371// doCacheResizeLocked resize watchCache's event array with different capacity. 372// Assumes that lock is already held for write. 373func (w *watchCache) doCacheResizeLocked(capacity int) { 374 newCache := make([]*watchCacheEvent, capacity) 375 if capacity < w.capacity { 376 // adjust startIndex if cache capacity shrink. 377 w.startIndex = w.endIndex - capacity 378 } 379 for i := w.startIndex; i < w.endIndex; i++ { 380 newCache[i%capacity] = w.cache[i%w.capacity] 381 } 382 w.cache = newCache 383 recordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity) 384 w.capacity = capacity 385} 386 387func (w *watchCache) UpdateResourceVersion(resourceVersion string) { 388 rv, err := w.versioner.ParseResourceVersion(resourceVersion) 389 if err != nil { 390 klog.Errorf("Couldn't parse resourceVersion: %v", err) 391 return 392 } 393 394 func() { 395 w.Lock() 396 defer w.Unlock() 397 w.resourceVersion = rv 398 }() 399 400 // Avoid calling event handler under lock. 401 // This is safe as long as there is at most one call to Add/Update/Delete and 402 // UpdateResourceVersion in flight at any point in time, which is true now, 403 // because reflector calls them synchronously from its main thread. 404 if w.eventHandler != nil { 405 wcEvent := &watchCacheEvent{ 406 Type: watch.Bookmark, 407 ResourceVersion: rv, 408 } 409 w.eventHandler(wcEvent) 410 } 411} 412 413// List returns list of pointers to <storeElement> objects. 414func (w *watchCache) List() []interface{} { 415 return w.store.List() 416} 417 418// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>. 419// NOTE: This function acquired lock and doesn't release it. 420// You HAVE TO explicitly call w.RUnlock() after this function. 421func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error { 422 startTime := w.clock.Now() 423 go func() { 424 // Wake us up when the time limit has expired. The docs 425 // promise that time.After (well, NewTimer, which it calls) 426 // will wait *at least* the duration given. Since this go 427 // routine starts sometime after we record the start time, and 428 // it will wake up the loop below sometime after the broadcast, 429 // we don't need to worry about waking it up before the time 430 // has expired accidentally. 431 <-w.clock.After(blockTimeout) 432 w.cond.Broadcast() 433 }() 434 435 w.RLock() 436 if trace != nil { 437 trace.Step("watchCache locked acquired") 438 } 439 for w.resourceVersion < resourceVersion { 440 if w.clock.Since(startTime) >= blockTimeout { 441 // Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds. 442 return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds) 443 } 444 w.cond.Wait() 445 } 446 if trace != nil { 447 trace.Step("watchCache fresh enough") 448 } 449 return nil 450} 451 452// WaitUntilFreshAndList returns list of pointers to <storeElement> objects. 453func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, error) { 454 err := w.waitUntilFreshAndBlock(resourceVersion, trace) 455 defer w.RUnlock() 456 if err != nil { 457 return nil, 0, err 458 } 459 460 // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only 461 // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we 462 // want - they will be filtered out later. The fact that we return less things is only further performance improvement. 463 // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. 464 for _, matchValue := range matchValues { 465 if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { 466 return result, w.resourceVersion, nil 467 } 468 } 469 return w.store.List(), w.resourceVersion, nil 470} 471 472// WaitUntilFreshAndGet returns a pointers to <storeElement> object. 473func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) { 474 err := w.waitUntilFreshAndBlock(resourceVersion, trace) 475 defer w.RUnlock() 476 if err != nil { 477 return nil, false, 0, err 478 } 479 value, exists, err := w.store.GetByKey(key) 480 return value, exists, w.resourceVersion, err 481} 482 483func (w *watchCache) ListKeys() []string { 484 return w.store.ListKeys() 485} 486 487// Get takes runtime.Object as a parameter. However, it returns 488// pointer to <storeElement>. 489func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) { 490 object, ok := obj.(runtime.Object) 491 if !ok { 492 return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj) 493 } 494 key, err := w.keyFunc(object) 495 if err != nil { 496 return nil, false, fmt.Errorf("couldn't compute key: %v", err) 497 } 498 499 return w.store.Get(&storeElement{Key: key, Object: object}) 500} 501 502// GetByKey returns pointer to <storeElement>. 503func (w *watchCache) GetByKey(key string) (interface{}, bool, error) { 504 return w.store.GetByKey(key) 505} 506 507// Replace takes slice of runtime.Object as a parameter. 508func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { 509 version, err := w.versioner.ParseResourceVersion(resourceVersion) 510 if err != nil { 511 return err 512 } 513 514 toReplace := make([]interface{}, 0, len(objs)) 515 for _, obj := range objs { 516 object, ok := obj.(runtime.Object) 517 if !ok { 518 return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj) 519 } 520 key, err := w.keyFunc(object) 521 if err != nil { 522 return fmt.Errorf("couldn't compute key: %v", err) 523 } 524 objLabels, objFields, err := w.getAttrsFunc(object) 525 if err != nil { 526 return err 527 } 528 toReplace = append(toReplace, &storeElement{ 529 Key: key, 530 Object: object, 531 Labels: objLabels, 532 Fields: objFields, 533 }) 534 } 535 536 w.Lock() 537 defer w.Unlock() 538 539 w.startIndex = 0 540 w.endIndex = 0 541 if err := w.store.Replace(toReplace, resourceVersion); err != nil { 542 return err 543 } 544 w.listResourceVersion = version 545 w.resourceVersion = version 546 if w.onReplace != nil { 547 w.onReplace() 548 } 549 w.cond.Broadcast() 550 klog.V(3).Infof("Replace watchCache (rev: %v) ", resourceVersion) 551 return nil 552} 553 554func (w *watchCache) SetOnReplace(onReplace func()) { 555 w.Lock() 556 defer w.Unlock() 557 w.onReplace = onReplace 558} 559 560func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { 561 size := w.endIndex - w.startIndex 562 var oldest uint64 563 switch { 564 case w.listResourceVersion > 0 && w.startIndex == 0: 565 // If no event was removed from the buffer since last relist, the oldest watch 566 // event we can deliver is one greater than the resource version of the list. 567 oldest = w.listResourceVersion + 1 568 case size > 0: 569 // If the previous condition is not satisfied: either some event was already 570 // removed from the buffer or we've never completed a list (the latter can 571 // only happen in unit tests that populate the buffer without performing 572 // list/replace operations), the oldest watch event we can deliver is the first 573 // one in the buffer. 574 oldest = w.cache[w.startIndex%w.capacity].ResourceVersion 575 default: 576 return nil, fmt.Errorf("watch cache isn't correctly initialized") 577 } 578 579 if resourceVersion == 0 { 580 // resourceVersion = 0 means that we don't require any specific starting point 581 // and we would like to start watching from ~now. 582 // However, to keep backward compatibility, we additionally need to return the 583 // current state and only then start watching from that point. 584 // 585 // TODO: In v2 api, we should stop returning the current state - #13969. 586 allItems := w.store.List() 587 result := make([]*watchCacheEvent, len(allItems)) 588 for i, item := range allItems { 589 elem, ok := item.(*storeElement) 590 if !ok { 591 return nil, fmt.Errorf("not a storeElement: %v", elem) 592 } 593 objLabels, objFields, err := w.getAttrsFunc(elem.Object) 594 if err != nil { 595 return nil, err 596 } 597 result[i] = &watchCacheEvent{ 598 Type: watch.Added, 599 Object: elem.Object, 600 ObjLabels: objLabels, 601 ObjFields: objFields, 602 Key: elem.Key, 603 ResourceVersion: w.resourceVersion, 604 } 605 } 606 return result, nil 607 } 608 if resourceVersion < oldest-1 { 609 return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) 610 } 611 612 // Binary search the smallest index at which resourceVersion is greater than the given one. 613 f := func(i int) bool { 614 return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion 615 } 616 first := sort.Search(size, f) 617 result := make([]*watchCacheEvent, size-first) 618 for i := 0; i < size-first; i++ { 619 result[i] = w.cache[(w.startIndex+first+i)%w.capacity] 620 } 621 return result, nil 622} 623 624func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) { 625 w.RLock() 626 defer w.RUnlock() 627 return w.GetAllEventsSinceThreadUnsafe(resourceVersion) 628} 629 630func (w *watchCache) Resync() error { 631 // Nothing to do 632 return nil 633} 634