1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "fmt" 21 "math/rand" 22 "sync" 23 "testing" 24 "time" 25 26 "k8s.io/api/core/v1" 27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 28 "k8s.io/apimachinery/pkg/runtime" 29 "k8s.io/apimachinery/pkg/util/sets" 30 "k8s.io/apimachinery/pkg/util/wait" 31 "k8s.io/apimachinery/pkg/watch" 32 fcache "k8s.io/client-go/tools/cache/testing" 33 34 "github.com/google/gofuzz" 35) 36 37func Example() { 38 // source simulates an apiserver object endpoint. 39 source := fcache.NewFakeControllerSource() 40 41 // This will hold the downstream state, as we know it. 42 downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc) 43 44 // This will hold incoming changes. Note how we pass downstream in as a 45 // KeyLister, that way resync operations will result in the correct set 46 // of update/delete deltas. 47 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, downstream) 48 49 // Let's do threadsafe output to get predictable test results. 50 deletionCounter := make(chan string, 1000) 51 52 cfg := &Config{ 53 Queue: fifo, 54 ListerWatcher: source, 55 ObjectType: &v1.Pod{}, 56 FullResyncPeriod: time.Millisecond * 100, 57 RetryOnError: false, 58 59 // Let's implement a simple controller that just deletes 60 // everything that comes in. 61 Process: func(obj interface{}) error { 62 // Obj is from the Pop method of the Queue we make above. 63 newest := obj.(Deltas).Newest() 64 65 if newest.Type != Deleted { 66 // Update our downstream store. 67 err := downstream.Add(newest.Object) 68 if err != nil { 69 return err 70 } 71 72 // Delete this object. 73 source.Delete(newest.Object.(runtime.Object)) 74 } else { 75 // Update our downstream store. 76 err := downstream.Delete(newest.Object) 77 if err != nil { 78 return err 79 } 80 81 // fifo's KeyOf is easiest, because it handles 82 // DeletedFinalStateUnknown markers. 83 key, err := fifo.KeyOf(newest.Object) 84 if err != nil { 85 return err 86 } 87 88 // Report this deletion. 89 deletionCounter <- key 90 } 91 return nil 92 }, 93 } 94 95 // Create the controller and run it until we close stop. 96 stop := make(chan struct{}) 97 defer close(stop) 98 go New(cfg).Run(stop) 99 100 // Let's add a few objects to the source. 101 testIDs := []string{"a-hello", "b-controller", "c-framework"} 102 for _, name := range testIDs { 103 // Note that these pods are not valid-- the fake source doesn't 104 // call validation or anything. 105 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}}) 106 } 107 108 // Let's wait for the controller to process the things we just added. 109 outputSet := sets.String{} 110 for i := 0; i < len(testIDs); i++ { 111 outputSet.Insert(<-deletionCounter) 112 } 113 114 for _, key := range outputSet.List() { 115 fmt.Println(key) 116 } 117 // Output: 118 // a-hello 119 // b-controller 120 // c-framework 121} 122 123func ExampleNewInformer() { 124 // source simulates an apiserver object endpoint. 125 source := fcache.NewFakeControllerSource() 126 127 // Let's do threadsafe output to get predictable test results. 128 deletionCounter := make(chan string, 1000) 129 130 // Make a controller that immediately deletes anything added to it, and 131 // logs anything deleted. 132 _, controller := NewInformer( 133 source, 134 &v1.Pod{}, 135 time.Millisecond*100, 136 ResourceEventHandlerFuncs{ 137 AddFunc: func(obj interface{}) { 138 source.Delete(obj.(runtime.Object)) 139 }, 140 DeleteFunc: func(obj interface{}) { 141 key, err := DeletionHandlingMetaNamespaceKeyFunc(obj) 142 if err != nil { 143 key = "oops something went wrong with the key" 144 } 145 146 // Report this deletion. 147 deletionCounter <- key 148 }, 149 }, 150 ) 151 152 // Run the controller and run it until we close stop. 153 stop := make(chan struct{}) 154 defer close(stop) 155 go controller.Run(stop) 156 157 // Let's add a few objects to the source. 158 testIDs := []string{"a-hello", "b-controller", "c-framework"} 159 for _, name := range testIDs { 160 // Note that these pods are not valid-- the fake source doesn't 161 // call validation or anything. 162 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}}) 163 } 164 165 // Let's wait for the controller to process the things we just added. 166 outputSet := sets.String{} 167 for i := 0; i < len(testIDs); i++ { 168 outputSet.Insert(<-deletionCounter) 169 } 170 171 for _, key := range outputSet.List() { 172 fmt.Println(key) 173 } 174 // Output: 175 // a-hello 176 // b-controller 177 // c-framework 178} 179 180func TestHammerController(t *testing.T) { 181 // This test executes a bunch of requests through the fake source and 182 // controller framework to make sure there's no locking/threading 183 // errors. If an error happens, it should hang forever or trigger the 184 // race detector. 185 186 // source simulates an apiserver object endpoint. 187 source := fcache.NewFakeControllerSource() 188 189 // Let's do threadsafe output to get predictable test results. 190 outputSetLock := sync.Mutex{} 191 // map of key to operations done on the key 192 outputSet := map[string][]string{} 193 194 recordFunc := func(eventType string, obj interface{}) { 195 key, err := DeletionHandlingMetaNamespaceKeyFunc(obj) 196 if err != nil { 197 t.Errorf("something wrong with key: %v", err) 198 key = "oops something went wrong with the key" 199 } 200 201 // Record some output when items are deleted. 202 outputSetLock.Lock() 203 defer outputSetLock.Unlock() 204 outputSet[key] = append(outputSet[key], eventType) 205 } 206 207 // Make a controller which just logs all the changes it gets. 208 _, controller := NewInformer( 209 source, 210 &v1.Pod{}, 211 time.Millisecond*100, 212 ResourceEventHandlerFuncs{ 213 AddFunc: func(obj interface{}) { recordFunc("add", obj) }, 214 UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) }, 215 DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) }, 216 }, 217 ) 218 219 if controller.HasSynced() { 220 t.Errorf("Expected HasSynced() to return false before we started the controller") 221 } 222 223 // Run the controller and run it until we close stop. 224 stop := make(chan struct{}) 225 go controller.Run(stop) 226 227 // Let's wait for the controller to do its initial sync 228 wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { 229 return controller.HasSynced(), nil 230 }) 231 if !controller.HasSynced() { 232 t.Errorf("Expected HasSynced() to return true after the initial sync") 233 } 234 235 wg := sync.WaitGroup{} 236 const threads = 3 237 wg.Add(threads) 238 for i := 0; i < threads; i++ { 239 go func() { 240 defer wg.Done() 241 // Let's add a few objects to the source. 242 currentNames := sets.String{} 243 rs := rand.NewSource(rand.Int63()) 244 f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs) 245 for i := 0; i < 100; i++ { 246 var name string 247 var isNew bool 248 if currentNames.Len() == 0 || rand.Intn(3) == 1 { 249 f.Fuzz(&name) 250 isNew = true 251 } else { 252 l := currentNames.List() 253 name = l[rand.Intn(len(l))] 254 } 255 256 pod := &v1.Pod{} 257 f.Fuzz(pod) 258 pod.ObjectMeta.Name = name 259 pod.ObjectMeta.Namespace = "default" 260 // Add, update, or delete randomly. 261 // Note that these pods are not valid-- the fake source doesn't 262 // call validation or perform any other checking. 263 if isNew { 264 currentNames.Insert(name) 265 source.Add(pod) 266 continue 267 } 268 switch rand.Intn(2) { 269 case 0: 270 currentNames.Insert(name) 271 source.Modify(pod) 272 case 1: 273 currentNames.Delete(name) 274 source.Delete(pod) 275 } 276 } 277 }() 278 } 279 wg.Wait() 280 281 // Let's wait for the controller to finish processing the things we just added. 282 // TODO: look in the queue to see how many items need to be processed. 283 time.Sleep(100 * time.Millisecond) 284 close(stop) 285 286 // TODO: Verify that no goroutines were leaked here and that everything shut 287 // down cleanly. 288 289 outputSetLock.Lock() 290 t.Logf("got: %#v", outputSet) 291} 292 293func TestUpdate(t *testing.T) { 294 // This test is going to exercise the various paths that result in a 295 // call to update. 296 297 // source simulates an apiserver object endpoint. 298 source := fcache.NewFakeControllerSource() 299 300 const ( 301 FROM = "from" 302 TO = "to" 303 ) 304 305 // These are the transitions we expect to see; because this is 306 // asynchronous, there are a lot of valid possibilities. 307 type pair struct{ from, to string } 308 allowedTransitions := map[pair]bool{ 309 {FROM, TO}: true, 310 311 // Because a resync can happen when we've already observed one 312 // of the above but before the item is deleted. 313 {TO, TO}: true, 314 // Because a resync could happen before we observe an update. 315 {FROM, FROM}: true, 316 } 317 318 pod := func(name, check string, final bool) *v1.Pod { 319 p := &v1.Pod{ 320 ObjectMeta: metav1.ObjectMeta{ 321 Name: name, 322 Labels: map[string]string{"check": check}, 323 }, 324 } 325 if final { 326 p.Labels["final"] = "true" 327 } 328 return p 329 } 330 deletePod := func(p *v1.Pod) bool { 331 return p.Labels["final"] == "true" 332 } 333 334 tests := []func(string){ 335 func(name string) { 336 name = "a-" + name 337 source.Add(pod(name, FROM, false)) 338 source.Modify(pod(name, TO, true)) 339 }, 340 } 341 342 const threads = 3 343 344 var testDoneWG sync.WaitGroup 345 testDoneWG.Add(threads * len(tests)) 346 347 // Make a controller that deletes things once it observes an update. 348 // It calls Done() on the wait group on deletions so we can tell when 349 // everything we've added has been deleted. 350 watchCh := make(chan struct{}) 351 _, controller := NewInformer( 352 &testLW{ 353 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { 354 watch, err := source.Watch(options) 355 close(watchCh) 356 return watch, err 357 }, 358 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { 359 return source.List(options) 360 }, 361 }, 362 &v1.Pod{}, 363 0, 364 ResourceEventHandlerFuncs{ 365 UpdateFunc: func(oldObj, newObj interface{}) { 366 o, n := oldObj.(*v1.Pod), newObj.(*v1.Pod) 367 from, to := o.Labels["check"], n.Labels["check"] 368 if !allowedTransitions[pair{from, to}] { 369 t.Errorf("observed transition %q -> %q for %v", from, to, n.Name) 370 } 371 if deletePod(n) { 372 source.Delete(n) 373 } 374 }, 375 DeleteFunc: func(obj interface{}) { 376 testDoneWG.Done() 377 }, 378 }, 379 ) 380 381 // Run the controller and run it until we close stop. 382 // Once Run() is called, calls to testDoneWG.Done() might start, so 383 // all testDoneWG.Add() calls must happen before this point 384 stop := make(chan struct{}) 385 go controller.Run(stop) 386 <-watchCh 387 388 // run every test a few times, in parallel 389 var wg sync.WaitGroup 390 wg.Add(threads * len(tests)) 391 for i := 0; i < threads; i++ { 392 for j, f := range tests { 393 go func(name string, f func(string)) { 394 defer wg.Done() 395 f(name) 396 }(fmt.Sprintf("%v-%v", i, j), f) 397 } 398 } 399 wg.Wait() 400 401 // Let's wait for the controller to process the things we just added. 402 testDoneWG.Wait() 403 close(stop) 404} 405 406func TestPanicPropagated(t *testing.T) { 407 // source simulates an apiserver object endpoint. 408 source := fcache.NewFakeControllerSource() 409 410 // Make a controller that just panic if the AddFunc is called. 411 _, controller := NewInformer( 412 source, 413 &v1.Pod{}, 414 time.Millisecond*100, 415 ResourceEventHandlerFuncs{ 416 AddFunc: func(obj interface{}) { 417 // Create a panic. 418 panic("Just panic.") 419 }, 420 }, 421 ) 422 423 // Run the controller and run it until we close stop. 424 stop := make(chan struct{}) 425 defer close(stop) 426 427 propagated := make(chan interface{}) 428 go func() { 429 defer func() { 430 if r := recover(); r != nil { 431 propagated <- r 432 } 433 }() 434 controller.Run(stop) 435 }() 436 // Let's add a object to the source. It will trigger a panic. 437 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}}) 438 439 // Check if the panic propagated up. 440 select { 441 case p := <-propagated: 442 if p == "Just panic." { 443 t.Logf("Test Passed") 444 } else { 445 t.Errorf("unrecognized panic in controller run: %v", p) 446 } 447 case <-time.After(wait.ForeverTestTimeout): 448 t.Errorf("timeout: the panic failed to propagate from the controller run method!") 449 } 450} 451