1// Copyright 2017 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package crd 16 17import ( 18 "context" 19 "errors" 20 "reflect" 21 "sync" 22 "sync/atomic" 23 "testing" 24 "time" 25 26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 27 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" 28 "k8s.io/apimachinery/pkg/runtime" 29 "k8s.io/apimachinery/pkg/watch" 30 "k8s.io/client-go/discovery" 31 "k8s.io/client-go/discovery/fake" 32 "k8s.io/client-go/dynamic" 33 "k8s.io/client-go/rest" 34 k8stesting "k8s.io/client-go/testing" 35 36 "istio.io/istio/mixer/pkg/config/store" 37 "istio.io/pkg/probe" 38) 39 40// The "retryTimeout" used by the test. 41const testingRetryTimeout = 10 * time.Millisecond 42 43// The timeout for "waitFor" function, waiting for the expected event to come. 44const waitForTimeout = time.Second 45 46const apiGroupVersion = ConfigAPIGroup + "/" + ConfigAPIVersion 47 48func createFakeDiscovery(_ *rest.Config) (discovery.DiscoveryInterface, error) { 49 return &fake.FakeDiscovery{ 50 Fake: &k8stesting.Fake{ 51 Resources: []*metav1.APIResourceList{ 52 { 53 GroupVersion: apiGroupVersion, 54 APIResources: []metav1.APIResource{ 55 {Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true}, 56 {Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true}, 57 }, 58 }, 59 }, 60 }, 61 }, nil 62} 63 64type dummyListerWatcherBuilder struct { 65 mu sync.RWMutex 66 data map[store.Key]*unstructured.Unstructured 67 watchers map[string]*watch.RaceFreeFakeWatcher 68} 69 70func (f *fakeDynamicResource) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { 71 list := &unstructured.UnstructuredList{} 72 f.d.mu.RLock() 73 for k, v := range f.d.data { 74 if k.Kind == f.res.Kind { 75 list.Items = append(list.Items, *v) 76 } 77 } 78 f.d.mu.RUnlock() 79 return list, nil 80} 81 82func (f *fakeDynamicResource) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { 83 return f.w, nil 84} 85 86type fakeDynamicResource struct { 87 d *dummyListerWatcherBuilder 88 dynamic.ResourceInterface 89 w watch.Interface 90 res metav1.APIResource 91} 92 93func (d *dummyListerWatcherBuilder) build(res metav1.APIResource) dynamic.ResourceInterface { 94 w := watch.NewRaceFreeFake() 95 d.mu.Lock() 96 d.watchers[res.Kind] = w 97 d.mu.Unlock() 98 99 return &fakeDynamicResource{d: d, w: w, res: res} 100} 101 102// nolint: unparam 103func (d *dummyListerWatcherBuilder) put(key store.Key, spec map[string]interface{}) error { 104 res := &unstructured.Unstructured{} 105 res.SetKind(key.Kind) 106 res.SetAPIVersion(apiGroupVersion) 107 res.SetName(key.Name) 108 res.SetNamespace(key.Namespace) 109 res.Object["spec"] = spec 110 111 d.mu.Lock() 112 defer d.mu.Unlock() 113 _, existed := d.data[key] 114 d.data[key] = res 115 w, ok := d.watchers[key.Kind] 116 if !ok { 117 return nil 118 } 119 if existed { 120 w.Modify(res) 121 } else { 122 w.Add(res) 123 } 124 return nil 125} 126 127func (d *dummyListerWatcherBuilder) delete(key store.Key) { 128 d.mu.Lock() 129 defer d.mu.Unlock() 130 value, ok := d.data[key] 131 if !ok { 132 return 133 } 134 delete(d.data, key) 135 w, ok := d.watchers[key.Kind] 136 if !ok { 137 return 138 } 139 w.Delete(value) 140} 141 142func getTempClient() (*Store, string, *dummyListerWatcherBuilder) { 143 ns := "istio-mixer-testing" 144 145 lw := &dummyListerWatcherBuilder{ 146 data: map[store.Key]*unstructured.Unstructured{}, 147 watchers: map[string]*watch.RaceFreeFakeWatcher{}, 148 } 149 client := &Store{ 150 conf: &rest.Config{}, 151 donec: make(chan struct{}), 152 retryTimeout: testingRetryTimeout, 153 discoveryBuilder: createFakeDiscovery, 154 listerWatcherBuilder: func(*rest.Config) (listerWatcherBuilderInterface, error) { 155 return lw, nil 156 }, 157 Probe: probe.NewProbe(), 158 retryInterval: 1 * time.Millisecond, 159 } 160 return client, ns, lw 161} 162 163func waitFor(wch <-chan store.BackendEvent, ct store.ChangeType, key store.Key) error { 164 timeout := time.After(waitForTimeout) 165 for { 166 select { 167 case ev := <-wch: 168 if ev.Key == key && ev.Type == ct { 169 return nil 170 } 171 case <-timeout: 172 return context.DeadlineExceeded 173 } 174 } 175} 176 177func TestStore(t *testing.T) { 178 s, ns, lw := getTempClient() 179 if err := s.Init([]string{"Handler", "Action"}); err != nil { 180 t.Fatal(err.Error()) 181 } 182 defer s.Stop() 183 184 wch, err := s.Watch() 185 if err != nil { 186 t.Fatal(err.Error()) 187 } 188 k := store.Key{Kind: "Handler", Namespace: ns, Name: "default"} 189 if _, err = s.Get(k); err != store.ErrNotFound { 190 t.Errorf("Got %v, Want ErrNotFound", err) 191 } 192 h := map[string]interface{}{"name": "default", "adapter": "noop"} 193 if err = lw.put(k, h); err != nil { 194 t.Errorf("Got %v, Want nil", err) 195 } 196 if err = waitFor(wch, store.Update, k); err != nil { 197 t.Errorf("Got %v, Want nil", err) 198 } 199 h2, err := s.Get(k) 200 if err != nil { 201 t.Errorf("Got %v, Want nil", err) 202 } 203 if !reflect.DeepEqual(h, h2.Spec) { 204 t.Errorf("Got %+v, Want %+v", h2.Spec, h) 205 } 206 want := map[store.Key]*store.BackEndResource{k: h2} 207 if lst := s.List(); !reflect.DeepEqual(lst, want) { 208 t.Errorf("Got %+v, Want %+v", lst, want) 209 } 210 h["adapter"] = "noop2" 211 if err = lw.put(k, h); err != nil { 212 t.Errorf("Got %v, Want nil", err) 213 } 214 h2, err = s.Get(k) 215 if err != nil { 216 t.Errorf("Got %v, Want nil", err) 217 } 218 if !reflect.DeepEqual(h, h2.Spec) { 219 t.Errorf("Got %+v, Want %+v", h2.Spec, h) 220 } 221 lw.delete(k) 222 if err = waitFor(wch, store.Delete, k); err != nil { 223 t.Errorf("Got %v, Want nil", err) 224 } 225 if _, err := s.Get(k); err != store.ErrNotFound { 226 t.Errorf("Got %v, Want ErrNotFound", err) 227 } 228} 229 230func TestStoreWrongKind(t *testing.T) { 231 s, ns, lw := getTempClient() 232 if err := s.Init([]string{"Action"}); err != nil { 233 t.Fatal(err.Error()) 234 } 235 defer s.Stop() 236 237 k := store.Key{Kind: "Handler", Namespace: ns, Name: "default"} 238 h := map[string]interface{}{"name": "default", "adapter": "noop"} 239 if err := lw.put(k, h); err != nil { 240 t.Error("Got nil, Want error") 241 } 242 243 if _, err := s.Get(k); err == nil { 244 t.Errorf("Got nil, Want error") 245 } 246} 247 248func TestStoreNamespaces(t *testing.T) { 249 s, ns, lw := getTempClient() 250 otherNS := "other-namespace" 251 s.ns = map[string]bool{ns: true, otherNS: true} 252 if err := s.Init([]string{"Action", "Handler"}); err != nil { 253 t.Fatal(err) 254 } 255 defer s.Stop() 256 257 wch, err := s.Watch() 258 if err != nil { 259 t.Fatal(err) 260 } 261 k1 := store.Key{Kind: "Handler", Namespace: ns, Name: "default"} 262 k2 := store.Key{Kind: "Handler", Namespace: otherNS, Name: "default"} 263 k3 := store.Key{Kind: "Handler", Namespace: "irrelevant-namespace", Name: "default"} 264 h := map[string]interface{}{"name": "default", "adapter": "noop"} 265 for _, k := range []store.Key{k1, k2, k3} { 266 if err = lw.put(k, h); err != nil { 267 t.Errorf("Got %v, Want nil", err) 268 } 269 } 270 if err = waitFor(wch, store.Update, k3); err == nil { 271 t.Error("Got nil, Want error") 272 } 273 list := s.List() 274 for _, c := range []struct { 275 key store.Key 276 ok bool 277 }{ 278 {k1, true}, 279 {k2, true}, 280 {k3, false}, 281 } { 282 if _, ok := list[c.key]; ok != c.ok { 283 t.Errorf("For key %s, Got %v, Want %v", c.key, ok, c.ok) 284 } 285 if _, err = s.Get(c.key); (err == nil) != c.ok { 286 t.Errorf("For key %s, Got %v error, Want %v", c.key, err, c.ok) 287 } 288 } 289} 290 291func TestStoreFailToInit(t *testing.T) { 292 s, _, _ := getTempClient() 293 s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) { 294 return nil, errors.New("dummy") 295 } 296 if err := s.Init([]string{"Handler", "Action"}); err.Error() != "dummy" { 297 t.Errorf("Got %v, Want dummy error", err) 298 } 299 s.discoveryBuilder = createFakeDiscovery 300 s.listerWatcherBuilder = func(*rest.Config) (listerWatcherBuilderInterface, error) { 301 return nil, errors.New("dummy2") 302 } 303 if err := s.Init([]string{"Handler", "Action"}); err.Error() != "dummy2" { 304 t.Errorf("Got %v, Want dummy2 error", err) 305 } 306 s.Stop() 307} 308 309func TestCriticalCrdsAreReady(t *testing.T) { 310 fakeDiscovery := &fake.FakeDiscovery{ 311 Fake: &k8stesting.Fake{ 312 Resources: []*metav1.APIResourceList{ 313 {GroupVersion: apiGroupVersion}, 314 }, 315 }, 316 } 317 var callCount int32 318 fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) { 319 atomic.AddInt32(&callCount, 1) 320 fakeDiscovery.Resources[0].APIResources = append( 321 fakeDiscovery.Resources[0].APIResources, 322 metav1.APIResource{Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true}, 323 metav1.APIResource{Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true}, 324 ) 325 return true, nil, nil 326 }) 327 328 s, _, _ := getTempClient() 329 s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) { 330 return fakeDiscovery, nil 331 } 332 s.criticalKinds = []string{"Handler", "Action"} 333 s.bgRetryInterval = 1 * time.Millisecond 334 err := s.Init([]string{"Handler", "Action", "Whatever"}) 335 if err != nil { 336 t.Errorf("Got error %v from Init", err) 337 } 338 count := atomic.LoadInt32(&callCount) 339 if count != 1 { 340 t.Errorf("callCount is not expected, got %v wang 1", count) 341 } 342 s.Stop() 343} 344 345func TestCriticalCrdsAreNotReadyRetryTimeout(t *testing.T) { 346 fakeDiscovery := &fake.FakeDiscovery{ 347 Fake: &k8stesting.Fake{ 348 Resources: []*metav1.APIResourceList{ 349 {GroupVersion: apiGroupVersion}, 350 }, 351 }, 352 } 353 var callCount int32 354 fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) { 355 atomic.AddInt32(&callCount, 1) 356 return true, nil, nil 357 }) 358 359 s, _, _ := getTempClient() 360 s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) { 361 return fakeDiscovery, nil 362 } 363 s.criticalKinds = []string{"Handler"} 364 s.retryTimeout = 2 * time.Second 365 s.retryInterval = time.Second 366 err := s.Init([]string{"Handler", "Action"}) 367 errorMsg := "failed to discover critical kinds: [Handler]" 368 if err == nil { 369 t.Errorf("got no error from Init, want Init to fail") 370 } else if err.Error() != errorMsg { 371 t.Errorf("got Init error message %v, want %v", err.Error(), errorMsg) 372 } 373 count := atomic.LoadInt32(&callCount) 374 if count < 1 || count > 3 { 375 t.Errorf("got callCount %v, want call count to be more than 1 and less than 3 times", count) 376 } 377 s.Stop() 378} 379 380func TestCriticalCrdsRetryMakeSucceed(t *testing.T) { 381 fakeDiscovery := &fake.FakeDiscovery{ 382 Fake: &k8stesting.Fake{ 383 Resources: []*metav1.APIResourceList{ 384 {GroupVersion: apiGroupVersion}, 385 }, 386 }, 387 } 388 var callCount int32 389 // Gradually increase the number of API resources. 390 fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) { 391 count := atomic.AddInt32(&callCount, 1) 392 if count == 2 { 393 fakeDiscovery.Resources[0].APIResources = append( 394 fakeDiscovery.Resources[0].APIResources, 395 metav1.APIResource{Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true}, 396 ) 397 } else if count == 3 { 398 fakeDiscovery.Resources[0].APIResources = append( 399 fakeDiscovery.Resources[0].APIResources, 400 metav1.APIResource{Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true}, 401 ) 402 } 403 return true, nil, nil 404 }) 405 406 s, _, _ := getTempClient() 407 s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) { 408 return fakeDiscovery, nil 409 } 410 // Should set a longer timeout to avoid early quitting retry loop due to lack of computational power. 411 s.retryTimeout = 2 * time.Second 412 s.retryInterval = 10 * time.Millisecond 413 s.criticalKinds = []string{"Handler", "Action"} 414 err := s.Init([]string{"Handler", "Action"}) 415 if err != nil { 416 t.Errorf("Got %v, Want nil", err) 417 } 418 count := atomic.LoadInt32(&callCount) 419 if count != 3 { 420 t.Errorf("Got %d, Want 3", count) 421 } 422 s.Stop() 423} 424 425func TestCrdsRetryAsynchronously(t *testing.T) { 426 fakeDiscovery := &fake.FakeDiscovery{ 427 Fake: &k8stesting.Fake{ 428 Resources: []*metav1.APIResourceList{ 429 { 430 GroupVersion: apiGroupVersion, 431 APIResources: []metav1.APIResource{ 432 {Name: "handlers", SingularName: "handler", Kind: "Handler", Namespaced: true}, 433 }, 434 }, 435 }, 436 }, 437 } 438 var count int32 439 // Gradually increase the number of API resources. 440 fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) { 441 if atomic.LoadInt32(&count) != 0 { 442 fakeDiscovery.Resources[0].APIResources = append( 443 fakeDiscovery.Resources[0].APIResources, 444 metav1.APIResource{Name: "actions", SingularName: "action", Kind: "Action", Namespaced: true}, 445 ) 446 } 447 return true, nil, nil 448 }) 449 s, ns, lw := getTempClient() 450 s.bgRetryInterval = 1 * time.Millisecond 451 s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) { 452 return fakeDiscovery, nil 453 } 454 k1 := store.Key{Kind: "Handler", Namespace: ns, Name: "default"} 455 if err := lw.put(k1, map[string]interface{}{"adapter": "noop"}); err != nil { 456 t.Fatal(err) 457 } 458 if err := s.Init([]string{"Handler", "Action"}); err != nil { 459 t.Fatal(err) 460 } 461 defer s.Stop() 462 s.cacheMutex.Lock() 463 ncaches := len(s.caches) 464 s.cacheMutex.Unlock() 465 if ncaches != 1 { 466 t.Errorf("Has %d caches, Want 1 caches", ncaches) 467 } 468 wch, err := s.Watch() 469 if err != nil { 470 t.Fatal(err) 471 } 472 atomic.StoreInt32(&count, 1) 473 474 after := time.After(time.Second) 475 tick := time.Tick(time.Millisecond) 476loop: 477 for { 478 select { 479 case <-after: 480 break loop 481 case <-tick: 482 s.cacheMutex.Lock() 483 ncaches = len(s.caches) 484 s.cacheMutex.Unlock() 485 if ncaches > 1 { 486 break loop 487 } 488 } 489 } 490 if ncaches != 2 { 491 t.Fatalf("Has %d caches, Want 2 caches", ncaches) 492 } 493 494 k2 := store.Key{Kind: "Action", Namespace: ns, Name: "default"} 495 if err = lw.put(k2, map[string]interface{}{"test": "value"}); err != nil { 496 t.Error(err) 497 } 498 if err = waitFor(wch, store.Update, k2); err != nil { 499 t.Errorf("Got %v, Want nil", err) 500 } 501} 502 503func TestCrdsRetryAsynchronouslyStoreClose(t *testing.T) { 504 fakeDiscovery := &fake.FakeDiscovery{ 505 Fake: &k8stesting.Fake{ 506 Resources: []*metav1.APIResourceList{ 507 {GroupVersion: apiGroupVersion}, 508 }, 509 }, 510 } 511 callCount := 0 512 mutex := sync.RWMutex{} 513 fakeDiscovery.AddReactor("get", "resource", func(k8stesting.Action) (bool, runtime.Object, error) { 514 mutex.Lock() 515 callCount++ 516 mutex.Unlock() 517 return true, nil, nil 518 }) 519 520 s, _, _ := getTempClient() 521 s.discoveryBuilder = func(*rest.Config) (discovery.DiscoveryInterface, error) { 522 return fakeDiscovery, nil 523 } 524 s.bgRetryInterval = 10 * time.Millisecond 525 s.Init([]string{"Handler", "Action"}) 526 527 // Close store, which should shut down the background retry. 528 // With 10ms retry interval and 30ms before shutdown, at most 5 discovery calls would be made. 529 time.Sleep(30 * time.Millisecond) 530 s.Stop() 531 time.Sleep(30 * time.Millisecond) 532 mutex.RLock() 533 if callCount > 5 { 534 t.Errorf("got %v, want no more than 5 calls", callCount) 535 } 536 mutex.RUnlock() 537} 538