1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package replicaset 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "math/rand" 24 "net/http/httptest" 25 "net/url" 26 "reflect" 27 "sort" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 apps "k8s.io/api/apps/v1" 34 v1 "k8s.io/api/core/v1" 35 apiequality "k8s.io/apimachinery/pkg/api/equality" 36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 37 "k8s.io/apimachinery/pkg/runtime" 38 "k8s.io/apimachinery/pkg/runtime/schema" 39 "k8s.io/apimachinery/pkg/util/sets" 40 "k8s.io/apimachinery/pkg/util/uuid" 41 "k8s.io/apimachinery/pkg/util/wait" 42 "k8s.io/apimachinery/pkg/watch" 43 "k8s.io/client-go/informers" 44 clientset "k8s.io/client-go/kubernetes" 45 "k8s.io/client-go/kubernetes/fake" 46 restclient "k8s.io/client-go/rest" 47 core "k8s.io/client-go/testing" 48 "k8s.io/client-go/tools/cache" 49 utiltesting "k8s.io/client-go/util/testing" 50 "k8s.io/client-go/util/workqueue" 51 "k8s.io/klog/v2" 52 "k8s.io/kubernetes/pkg/controller" 53 . "k8s.io/kubernetes/pkg/controller/testutil" 54 "k8s.io/kubernetes/pkg/securitycontext" 55) 56 57var ( 58 informerSyncTimeout = 30 * time.Second 59) 60 61func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh chan struct{}, burstReplicas int) (*ReplicaSetController, informers.SharedInformerFactory) { 62 informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) 63 64 ret := NewReplicaSetController( 65 informers.Apps().V1().ReplicaSets(), 66 informers.Core().V1().Pods(), 67 client, 68 burstReplicas, 69 ) 70 71 ret.podListerSynced = alwaysReady 72 ret.rsListerSynced = alwaysReady 73 74 return ret, informers 75} 76 77func skipListerFunc(verb string, url url.URL) bool { 78 if verb != "GET" { 79 return false 80 } 81 if strings.HasSuffix(url.Path, "/pods") || strings.Contains(url.Path, "/replicasets") { 82 return true 83 } 84 return false 85} 86 87var alwaysReady = func() bool { return true } 88 89func newReplicaSet(replicas int, selectorMap map[string]string) *apps.ReplicaSet { 90 isController := true 91 rs := &apps.ReplicaSet{ 92 TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "ReplicaSet"}, 93 ObjectMeta: metav1.ObjectMeta{ 94 UID: uuid.NewUUID(), 95 Name: "foobar", 96 Namespace: metav1.NamespaceDefault, 97 OwnerReferences: []metav1.OwnerReference{ 98 {UID: "123", Controller: &isController}, 99 }, 100 ResourceVersion: "18", 101 }, 102 Spec: apps.ReplicaSetSpec{ 103 Replicas: func() *int32 { i := int32(replicas); return &i }(), 104 Selector: &metav1.LabelSelector{MatchLabels: selectorMap}, 105 Template: v1.PodTemplateSpec{ 106 ObjectMeta: metav1.ObjectMeta{ 107 Labels: map[string]string{ 108 "name": "foo", 109 "type": "production", 110 }, 111 }, 112 Spec: v1.PodSpec{ 113 Containers: []v1.Container{ 114 { 115 Image: "foo/bar", 116 TerminationMessagePath: v1.TerminationMessagePathDefault, 117 ImagePullPolicy: v1.PullIfNotPresent, 118 SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(), 119 }, 120 }, 121 RestartPolicy: v1.RestartPolicyAlways, 122 DNSPolicy: v1.DNSDefault, 123 NodeSelector: map[string]string{ 124 "baz": "blah", 125 }, 126 }, 127 }, 128 }, 129 } 130 return rs 131} 132 133// create a pod with the given phase for the given rs (same selectors and namespace) 134func newPod(name string, rs *apps.ReplicaSet, status v1.PodPhase, lastTransitionTime *metav1.Time, properlyOwned bool) *v1.Pod { 135 var conditions []v1.PodCondition 136 if status == v1.PodRunning { 137 condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} 138 if lastTransitionTime != nil { 139 condition.LastTransitionTime = *lastTransitionTime 140 } 141 conditions = append(conditions, condition) 142 } 143 var controllerReference metav1.OwnerReference 144 if properlyOwned { 145 var trueVar = true 146 controllerReference = metav1.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar} 147 } 148 return &v1.Pod{ 149 ObjectMeta: metav1.ObjectMeta{ 150 UID: uuid.NewUUID(), 151 Name: name, 152 Namespace: rs.Namespace, 153 Labels: rs.Spec.Selector.MatchLabels, 154 OwnerReferences: []metav1.OwnerReference{controllerReference}, 155 }, 156 Status: v1.PodStatus{Phase: status, Conditions: conditions}, 157 } 158} 159 160// create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store. 161func newPodList(store cache.Store, count int, status v1.PodPhase, labelMap map[string]string, rs *apps.ReplicaSet, name string) *v1.PodList { 162 pods := []v1.Pod{} 163 var trueVar = true 164 controllerReference := metav1.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar} 165 for i := 0; i < count; i++ { 166 pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status, nil, false) 167 pod.ObjectMeta.Labels = labelMap 168 pod.OwnerReferences = []metav1.OwnerReference{controllerReference} 169 if store != nil { 170 store.Add(pod) 171 } 172 pods = append(pods, *pod) 173 } 174 return &v1.PodList{ 175 Items: pods, 176 } 177} 178 179// processSync initiates a sync via processNextWorkItem() to test behavior that 180// depends on both functions (such as re-queueing on sync error). 181func processSync(rsc *ReplicaSetController, key string) error { 182 // Save old syncHandler and replace with one that captures the error. 183 oldSyncHandler := rsc.syncHandler 184 defer func() { 185 rsc.syncHandler = oldSyncHandler 186 }() 187 var syncErr error 188 rsc.syncHandler = func(key string) error { 189 syncErr = oldSyncHandler(key) 190 return syncErr 191 } 192 rsc.queue.Add(key) 193 rsc.processNextWorkItem() 194 return syncErr 195} 196 197func validateSyncReplicaSet(fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) error { 198 if e, a := expectedCreates, len(fakePodControl.Templates); e != a { 199 return fmt.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a) 200 } 201 202 if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a { 203 return fmt.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a) 204 } 205 206 if e, a := expectedPatches, len(fakePodControl.Patches); e != a { 207 return fmt.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a) 208 } 209 210 return nil 211} 212 213func TestSyncReplicaSetDoesNothing(t *testing.T) { 214 client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) 215 fakePodControl := controller.FakePodControl{} 216 stopCh := make(chan struct{}) 217 defer close(stopCh) 218 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) 219 220 // 2 running pods, a controller with 2 replicas, sync is a no-op 221 labelMap := map[string]string{"foo": "bar"} 222 rsSpec := newReplicaSet(2, labelMap) 223 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) 224 newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 2, v1.PodRunning, labelMap, rsSpec, "pod") 225 226 manager.podControl = &fakePodControl 227 manager.syncReplicaSet(GetKey(rsSpec, t)) 228 err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) 229 if err != nil { 230 t.Fatal(err) 231 } 232} 233 234func TestDeleteFinalStateUnknown(t *testing.T) { 235 client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) 236 fakePodControl := controller.FakePodControl{} 237 stopCh := make(chan struct{}) 238 defer close(stopCh) 239 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) 240 manager.podControl = &fakePodControl 241 242 received := make(chan string) 243 manager.syncHandler = func(key string) error { 244 received <- key 245 return nil 246 } 247 248 // The DeletedFinalStateUnknown object should cause the ReplicaSet manager to insert 249 // the controller matching the selectors of the deleted pod into the work queue. 250 labelMap := map[string]string{"foo": "bar"} 251 rsSpec := newReplicaSet(1, labelMap) 252 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) 253 pods := newPodList(nil, 1, v1.PodRunning, labelMap, rsSpec, "pod") 254 manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]}) 255 256 go manager.worker() 257 258 expected := GetKey(rsSpec, t) 259 select { 260 case key := <-received: 261 if key != expected { 262 t.Errorf("Unexpected sync all for ReplicaSet %v, expected %v", key, expected) 263 } 264 case <-time.After(wait.ForeverTestTimeout): 265 t.Errorf("Processing DeleteFinalStateUnknown took longer than expected") 266 } 267} 268 269// Tell the rs to create 100 replicas, but simulate a limit (like a quota limit) 270// of 10, and verify that the rs doesn't make 100 create calls per sync pass 271func TestSyncReplicaSetCreateFailures(t *testing.T) { 272 fakePodControl := controller.FakePodControl{} 273 fakePodControl.CreateLimit = 10 274 275 labelMap := map[string]string{"foo": "bar"} 276 rs := newReplicaSet(fakePodControl.CreateLimit*10, labelMap) 277 client := fake.NewSimpleClientset(rs) 278 stopCh := make(chan struct{}) 279 defer close(stopCh) 280 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) 281 282 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) 283 284 manager.podControl = &fakePodControl 285 manager.syncReplicaSet(GetKey(rs, t)) 286 err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0) 287 if err != nil { 288 t.Fatal(err) 289 } 290 expectedLimit := 0 291 for pass := uint8(0); expectedLimit <= fakePodControl.CreateLimit; pass++ { 292 expectedLimit += controller.SlowStartInitialBatchSize << pass 293 } 294 if fakePodControl.CreateCallCount > expectedLimit { 295 t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount) 296 } 297} 298 299func TestSyncReplicaSetDormancy(t *testing.T) { 300 // Setup a test server so we can lie about the current state of pods 301 fakeHandler := utiltesting.FakeHandler{ 302 StatusCode: 200, 303 ResponseBody: "{}", 304 SkipRequestFn: skipListerFunc, 305 T: t, 306 } 307 testServer := httptest.NewServer(&fakeHandler) 308 defer testServer.Close() 309 client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) 310 311 fakePodControl := controller.FakePodControl{} 312 stopCh := make(chan struct{}) 313 defer close(stopCh) 314 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) 315 316 manager.podControl = &fakePodControl 317 318 labelMap := map[string]string{"foo": "bar"} 319 rsSpec := newReplicaSet(2, labelMap) 320 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) 321 newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap, rsSpec, "pod") 322 323 // Creates a replica and sets expectations 324 rsSpec.Status.Replicas = 1 325 rsSpec.Status.ReadyReplicas = 1 326 rsSpec.Status.AvailableReplicas = 1 327 manager.syncReplicaSet(GetKey(rsSpec, t)) 328 err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) 329 if err != nil { 330 t.Fatal(err) 331 } 332 333 // Expectations prevents replicas but not an update on status 334 rsSpec.Status.Replicas = 0 335 rsSpec.Status.ReadyReplicas = 0 336 rsSpec.Status.AvailableReplicas = 0 337 fakePodControl.Clear() 338 manager.syncReplicaSet(GetKey(rsSpec, t)) 339 err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) 340 if err != nil { 341 t.Fatal(err) 342 } 343 344 // Get the key for the controller 345 rsKey, err := controller.KeyFunc(rsSpec) 346 if err != nil { 347 t.Errorf("Couldn't get key for object %#v: %v", rsSpec, err) 348 } 349 350 // Lowering expectations should lead to a sync that creates a replica, however the 351 // fakePodControl error will prevent this, leaving expectations at 0, 0 352 manager.expectations.CreationObserved(rsKey) 353 rsSpec.Status.Replicas = 1 354 rsSpec.Status.ReadyReplicas = 1 355 rsSpec.Status.AvailableReplicas = 1 356 fakePodControl.Clear() 357 fakePodControl.Err = fmt.Errorf("fake Error") 358 359 manager.syncReplicaSet(GetKey(rsSpec, t)) 360 err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) 361 if err != nil { 362 t.Fatal(err) 363 } 364 365 // This replica should not need a Lowering of expectations, since the previous create failed 366 fakePodControl.Clear() 367 fakePodControl.Err = nil 368 manager.syncReplicaSet(GetKey(rsSpec, t)) 369 err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) 370 if err != nil { 371 t.Fatal(err) 372 } 373 374 // 2 PUT for the ReplicaSet status during dormancy window. 375 // Note that the pod creates go through pod control so they're not recorded. 376 fakeHandler.ValidateRequestCount(t, 2) 377} 378 379func TestGetReplicaSetsWithSameController(t *testing.T) { 380 someRS := newReplicaSet(1, map[string]string{"foo": "bar"}) 381 someRS.Name = "rs1" 382 relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"}) 383 relatedRS.Name = "rs2" 384 unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"}) 385 unrelatedRS.Name = "rs3" 386 unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456" 387 pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"}) 388 pendingDeletionRS.Name = "rs4" 389 pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789" 390 now := metav1.Now() 391 pendingDeletionRS.DeletionTimestamp = &now 392 393 stopCh := make(chan struct{}) 394 defer close(stopCh) 395 manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas) 396 testCases := []struct { 397 name string 398 rss []*apps.ReplicaSet 399 rs *apps.ReplicaSet 400 expectedRSs []*apps.ReplicaSet 401 }{ 402 { 403 name: "expect to get back a ReplicaSet that is pending deletion", 404 rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS}, 405 rs: pendingDeletionRS, 406 expectedRSs: []*apps.ReplicaSet{pendingDeletionRS}, 407 }, 408 { 409 name: "expect to get back only the given ReplicaSet if there is no related ReplicaSet", 410 rss: []*apps.ReplicaSet{someRS, unrelatedRS}, 411 rs: someRS, 412 expectedRSs: []*apps.ReplicaSet{someRS}, 413 }, 414 { 415 name: "expect to get back the given ReplicaSet as well as any related ReplicaSet but not an unrelated ReplicaSet", 416 rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS}, 417 rs: someRS, 418 expectedRSs: []*apps.ReplicaSet{someRS, relatedRS}, 419 }, 420 } 421 for _, c := range testCases { 422 for _, r := range c.rss { 423 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r) 424 } 425 actualRSs := manager.getReplicaSetsWithSameController(c.rs) 426 var actualRSNames, expectedRSNames []string 427 for _, r := range actualRSs { 428 actualRSNames = append(actualRSNames, r.Name) 429 } 430 for _, r := range c.expectedRSs { 431 expectedRSNames = append(expectedRSNames, r.Name) 432 } 433 sort.Strings(actualRSNames) 434 sort.Strings(expectedRSNames) 435 if !reflect.DeepEqual(actualRSNames, expectedRSNames) { 436 t.Errorf("Got [%s]; expected [%s]", strings.Join(actualRSNames, ", "), strings.Join(expectedRSNames, ", ")) 437 } 438 } 439} 440 441func TestPodControllerLookup(t *testing.T) { 442 stopCh := make(chan struct{}) 443 defer close(stopCh) 444 manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas) 445 testCases := []struct { 446 inRSs []*apps.ReplicaSet 447 pod *v1.Pod 448 outRSName string 449 }{ 450 // pods without labels don't match any ReplicaSets 451 { 452 inRSs: []*apps.ReplicaSet{ 453 {ObjectMeta: metav1.ObjectMeta{Name: "basic"}}}, 454 pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll}}, 455 outRSName: "", 456 }, 457 // Matching labels, not namespace 458 { 459 inRSs: []*apps.ReplicaSet{ 460 { 461 ObjectMeta: metav1.ObjectMeta{Name: "foo"}, 462 Spec: apps.ReplicaSetSpec{ 463 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, 464 }, 465 }, 466 }, 467 pod: &v1.Pod{ 468 ObjectMeta: metav1.ObjectMeta{ 469 Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, 470 outRSName: "", 471 }, 472 // Matching ns and labels returns the key to the ReplicaSet, not the ReplicaSet name 473 { 474 inRSs: []*apps.ReplicaSet{ 475 { 476 ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"}, 477 Spec: apps.ReplicaSetSpec{ 478 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, 479 }, 480 }, 481 }, 482 pod: &v1.Pod{ 483 ObjectMeta: metav1.ObjectMeta{ 484 Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, 485 outRSName: "bar", 486 }, 487 } 488 for _, c := range testCases { 489 for _, r := range c.inRSs { 490 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r) 491 } 492 if rss := manager.getPodReplicaSets(c.pod); rss != nil { 493 if len(rss) != 1 { 494 t.Errorf("len(rss) = %v, want %v", len(rss), 1) 495 continue 496 } 497 rs := rss[0] 498 if c.outRSName != rs.Name { 499 t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName) 500 } 501 } else if c.outRSName != "" { 502 t.Errorf("Expected a replica set %v pod %v, found none", c.outRSName, c.pod.Name) 503 } 504 } 505} 506 507func TestRelatedPodsLookup(t *testing.T) { 508 someRS := newReplicaSet(1, map[string]string{"foo": "bar"}) 509 someRS.Name = "foo1" 510 relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"}) 511 relatedRS.Name = "foo2" 512 unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"}) 513 unrelatedRS.Name = "bar1" 514 unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456" 515 pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"}) 516 pendingDeletionRS.Name = "foo3" 517 pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789" 518 now := metav1.Now() 519 pendingDeletionRS.DeletionTimestamp = &now 520 pod1 := newPod("pod1", someRS, v1.PodRunning, nil, true) 521 pod2 := newPod("pod2", someRS, v1.PodRunning, nil, true) 522 pod3 := newPod("pod3", relatedRS, v1.PodRunning, nil, true) 523 pod4 := newPod("pod4", unrelatedRS, v1.PodRunning, nil, true) 524 525 stopCh := make(chan struct{}) 526 defer close(stopCh) 527 manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas) 528 testCases := []struct { 529 name string 530 rss []*apps.ReplicaSet 531 pods []*v1.Pod 532 rs *apps.ReplicaSet 533 expectedPodNames []string 534 }{ 535 { 536 name: "expect to get a pod even if its owning ReplicaSet is pending deletion", 537 rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS}, 538 rs: pendingDeletionRS, 539 pods: []*v1.Pod{newPod("pod", pendingDeletionRS, v1.PodRunning, nil, true)}, 540 expectedPodNames: []string{"pod"}, 541 }, 542 { 543 name: "expect to get only the ReplicaSet's own pods if there is no related ReplicaSet", 544 rss: []*apps.ReplicaSet{someRS, unrelatedRS}, 545 rs: someRS, 546 pods: []*v1.Pod{pod1, pod2, pod4}, 547 expectedPodNames: []string{"pod1", "pod2"}, 548 }, 549 { 550 name: "expect to get own pods as well as any related ReplicaSet's but not an unrelated ReplicaSet's", 551 rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS}, 552 rs: someRS, 553 pods: []*v1.Pod{pod1, pod2, pod3, pod4}, 554 expectedPodNames: []string{"pod1", "pod2", "pod3"}, 555 }, 556 } 557 for _, c := range testCases { 558 for _, r := range c.rss { 559 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r) 560 } 561 for _, pod := range c.pods { 562 informers.Core().V1().Pods().Informer().GetIndexer().Add(pod) 563 manager.addPod(pod) 564 } 565 actualPods, err := manager.getIndirectlyRelatedPods(c.rs) 566 if err != nil { 567 t.Errorf("Unexpected error from getIndirectlyRelatedPods: %v", err) 568 } 569 var actualPodNames []string 570 for _, pod := range actualPods { 571 actualPodNames = append(actualPodNames, pod.Name) 572 } 573 sort.Strings(actualPodNames) 574 sort.Strings(c.expectedPodNames) 575 if !reflect.DeepEqual(actualPodNames, c.expectedPodNames) { 576 t.Errorf("Got [%s]; expected [%s]", strings.Join(actualPodNames, ", "), strings.Join(c.expectedPodNames, ", ")) 577 } 578 } 579} 580 581func TestWatchControllers(t *testing.T) { 582 fakeWatch := watch.NewFake() 583 client := fake.NewSimpleClientset() 584 client.PrependWatchReactor("replicasets", core.DefaultWatchReactor(fakeWatch, nil)) 585 stopCh := make(chan struct{}) 586 defer close(stopCh) 587 informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) 588 manager := NewReplicaSetController( 589 informers.Apps().V1().ReplicaSets(), 590 informers.Core().V1().Pods(), 591 client, 592 BurstReplicas, 593 ) 594 informers.Start(stopCh) 595 informers.WaitForCacheSync(stopCh) 596 597 var testRSSpec apps.ReplicaSet 598 received := make(chan string) 599 600 // The update sent through the fakeWatcher should make its way into the workqueue, 601 // and eventually into the syncHandler. The handler validates the received controller 602 // and closes the received channel to indicate that the test can finish. 603 manager.syncHandler = func(key string) error { 604 obj, exists, err := informers.Apps().V1().ReplicaSets().Informer().GetIndexer().GetByKey(key) 605 if !exists || err != nil { 606 t.Errorf("Expected to find replica set under key %v", key) 607 } 608 rsSpec := *obj.(*apps.ReplicaSet) 609 if !apiequality.Semantic.DeepDerivative(rsSpec, testRSSpec) { 610 t.Errorf("Expected %#v, but got %#v", testRSSpec, rsSpec) 611 } 612 close(received) 613 return nil 614 } 615 // Start only the ReplicaSet watcher and the workqueue, send a watch event, 616 // and make sure it hits the sync method. 617 go wait.Until(manager.worker, 10*time.Millisecond, stopCh) 618 619 testRSSpec.Name = "foo" 620 fakeWatch.Add(&testRSSpec) 621 622 select { 623 case <-received: 624 case <-time.After(wait.ForeverTestTimeout): 625 t.Errorf("unexpected timeout from result channel") 626 } 627} 628 629func TestWatchPods(t *testing.T) { 630 client := fake.NewSimpleClientset() 631 632 fakeWatch := watch.NewFake() 633 client.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) 634 635 stopCh := make(chan struct{}) 636 defer close(stopCh) 637 638 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) 639 640 // Put one ReplicaSet into the shared informer 641 labelMap := map[string]string{"foo": "bar"} 642 testRSSpec := newReplicaSet(1, labelMap) 643 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(testRSSpec) 644 645 received := make(chan string) 646 // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and 647 // send it into the syncHandler. 648 manager.syncHandler = func(key string) error { 649 namespace, name, err := cache.SplitMetaNamespaceKey(key) 650 if err != nil { 651 t.Errorf("Error splitting key: %v", err) 652 } 653 rsSpec, err := manager.rsLister.ReplicaSets(namespace).Get(name) 654 if err != nil { 655 t.Errorf("Expected to find replica set under key %v: %v", key, err) 656 } 657 if !apiequality.Semantic.DeepDerivative(rsSpec, testRSSpec) { 658 t.Errorf("\nExpected %#v,\nbut got %#v", testRSSpec, rsSpec) 659 } 660 close(received) 661 return nil 662 } 663 664 // Start only the pod watcher and the workqueue, send a watch event, 665 // and make sure it hits the sync method for the right ReplicaSet. 666 go informers.Core().V1().Pods().Informer().Run(stopCh) 667 go manager.Run(1, stopCh) 668 669 pods := newPodList(nil, 1, v1.PodRunning, labelMap, testRSSpec, "pod") 670 testPod := pods.Items[0] 671 testPod.Status.Phase = v1.PodFailed 672 fakeWatch.Add(&testPod) 673 674 select { 675 case <-received: 676 case <-time.After(wait.ForeverTestTimeout): 677 t.Errorf("unexpected timeout from result channel") 678 } 679} 680 681func TestUpdatePods(t *testing.T) { 682 stopCh := make(chan struct{}) 683 defer close(stopCh) 684 manager, informers := testNewReplicaSetControllerFromClient(fake.NewSimpleClientset(), stopCh, BurstReplicas) 685 686 received := make(chan string) 687 688 manager.syncHandler = func(key string) error { 689 namespace, name, err := cache.SplitMetaNamespaceKey(key) 690 if err != nil { 691 t.Errorf("Error splitting key: %v", err) 692 } 693 rsSpec, err := manager.rsLister.ReplicaSets(namespace).Get(name) 694 if err != nil { 695 t.Errorf("Expected to find replica set under key %v: %v", key, err) 696 } 697 received <- rsSpec.Name 698 return nil 699 } 700 701 go wait.Until(manager.worker, 10*time.Millisecond, stopCh) 702 703 // Put 2 ReplicaSets and one pod into the informers 704 labelMap1 := map[string]string{"foo": "bar"} 705 testRSSpec1 := newReplicaSet(1, labelMap1) 706 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(testRSSpec1) 707 testRSSpec2 := *testRSSpec1 708 labelMap2 := map[string]string{"bar": "foo"} 709 testRSSpec2.Spec.Selector = &metav1.LabelSelector{MatchLabels: labelMap2} 710 testRSSpec2.Name = "barfoo" 711 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(&testRSSpec2) 712 713 isController := true 714 controllerRef1 := metav1.OwnerReference{UID: testRSSpec1.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec1.Name, Controller: &isController} 715 controllerRef2 := metav1.OwnerReference{UID: testRSSpec2.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: testRSSpec2.Name, Controller: &isController} 716 717 // case 1: Pod with a ControllerRef 718 pod1 := newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] 719 pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} 720 pod1.ResourceVersion = "1" 721 pod2 := pod1 722 pod2.Labels = labelMap2 723 pod2.ResourceVersion = "2" 724 manager.updatePod(&pod1, &pod2) 725 expected := sets.NewString(testRSSpec1.Name) 726 for _, name := range expected.List() { 727 t.Logf("Expecting update for %+v", name) 728 select { 729 case got := <-received: 730 if !expected.Has(got) { 731 t.Errorf("Expected keys %#v got %v", expected, got) 732 } 733 case <-time.After(wait.ForeverTestTimeout): 734 t.Errorf("Expected update notifications for replica sets") 735 } 736 } 737 738 // case 2: Remove ControllerRef (orphan). Expect to sync label-matching RS. 739 pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] 740 pod1.ResourceVersion = "1" 741 pod1.Labels = labelMap2 742 pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} 743 pod2 = pod1 744 pod2.OwnerReferences = nil 745 pod2.ResourceVersion = "2" 746 manager.updatePod(&pod1, &pod2) 747 expected = sets.NewString(testRSSpec2.Name) 748 for _, name := range expected.List() { 749 t.Logf("Expecting update for %+v", name) 750 select { 751 case got := <-received: 752 if !expected.Has(got) { 753 t.Errorf("Expected keys %#v got %v", expected, got) 754 } 755 case <-time.After(wait.ForeverTestTimeout): 756 t.Errorf("Expected update notifications for replica sets") 757 } 758 } 759 760 // case 2: Remove ControllerRef (orphan). Expect to sync both former owner and 761 // any label-matching RS. 762 pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] 763 pod1.ResourceVersion = "1" 764 pod1.Labels = labelMap2 765 pod1.OwnerReferences = []metav1.OwnerReference{controllerRef1} 766 pod2 = pod1 767 pod2.OwnerReferences = nil 768 pod2.ResourceVersion = "2" 769 manager.updatePod(&pod1, &pod2) 770 expected = sets.NewString(testRSSpec1.Name, testRSSpec2.Name) 771 for _, name := range expected.List() { 772 t.Logf("Expecting update for %+v", name) 773 select { 774 case got := <-received: 775 if !expected.Has(got) { 776 t.Errorf("Expected keys %#v got %v", expected, got) 777 } 778 case <-time.After(wait.ForeverTestTimeout): 779 t.Errorf("Expected update notifications for replica sets") 780 } 781 } 782 783 // case 4: Keep ControllerRef, change labels. Expect to sync owning RS. 784 pod1 = newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap1, testRSSpec1, "pod").Items[0] 785 pod1.ResourceVersion = "1" 786 pod1.Labels = labelMap1 787 pod1.OwnerReferences = []metav1.OwnerReference{controllerRef2} 788 pod2 = pod1 789 pod2.Labels = labelMap2 790 pod2.ResourceVersion = "2" 791 manager.updatePod(&pod1, &pod2) 792 expected = sets.NewString(testRSSpec2.Name) 793 for _, name := range expected.List() { 794 t.Logf("Expecting update for %+v", name) 795 select { 796 case got := <-received: 797 if !expected.Has(got) { 798 t.Errorf("Expected keys %#v got %v", expected, got) 799 } 800 case <-time.After(wait.ForeverTestTimeout): 801 t.Errorf("Expected update notifications for replica sets") 802 } 803 } 804} 805 806func TestControllerUpdateRequeue(t *testing.T) { 807 // This server should force a requeue of the controller because it fails to update status.Replicas. 808 labelMap := map[string]string{"foo": "bar"} 809 rs := newReplicaSet(1, labelMap) 810 client := fake.NewSimpleClientset(rs) 811 client.PrependReactor("update", "replicasets", 812 func(action core.Action) (bool, runtime.Object, error) { 813 if action.GetSubresource() != "status" { 814 return false, nil, nil 815 } 816 return true, nil, errors.New("failed to update status") 817 }) 818 stopCh := make(chan struct{}) 819 defer close(stopCh) 820 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, BurstReplicas) 821 822 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) 823 rs.Status = apps.ReplicaSetStatus{Replicas: 2} 824 newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 1, v1.PodRunning, labelMap, rs, "pod") 825 826 fakePodControl := controller.FakePodControl{} 827 manager.podControl = &fakePodControl 828 829 // Enqueue once. Then process it. Disable rate-limiting for this. 830 manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter()) 831 manager.enqueueRS(rs) 832 manager.processNextWorkItem() 833 // It should have been requeued. 834 if got, want := manager.queue.Len(), 1; got != want { 835 t.Errorf("queue.Len() = %v, want %v", got, want) 836 } 837} 838 839func TestControllerUpdateStatusWithFailure(t *testing.T) { 840 rs := newReplicaSet(1, map[string]string{"foo": "bar"}) 841 fakeClient := &fake.Clientset{} 842 fakeClient.AddReactor("get", "replicasets", func(action core.Action) (bool, runtime.Object, error) { return true, rs, nil }) 843 fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) { 844 return true, &apps.ReplicaSet{}, fmt.Errorf("fake error") 845 }) 846 fakeRSClient := fakeClient.AppsV1().ReplicaSets("default") 847 numReplicas := int32(10) 848 newStatus := apps.ReplicaSetStatus{Replicas: numReplicas} 849 updateReplicaSetStatus(fakeRSClient, rs, newStatus) 850 updates, gets := 0, 0 851 for _, a := range fakeClient.Actions() { 852 if a.GetResource().Resource != "replicasets" { 853 t.Errorf("Unexpected action %+v", a) 854 continue 855 } 856 857 switch action := a.(type) { 858 case core.GetAction: 859 gets++ 860 // Make sure the get is for the right ReplicaSet even though the update failed. 861 if action.GetName() != rs.Name { 862 t.Errorf("Expected get for ReplicaSet %v, got %+v instead", rs.Name, action.GetName()) 863 } 864 case core.UpdateAction: 865 updates++ 866 // Confirm that the update has the right status.Replicas even though the Get 867 // returned a ReplicaSet with replicas=1. 868 if c, ok := action.GetObject().(*apps.ReplicaSet); !ok { 869 t.Errorf("Expected a ReplicaSet as the argument to update, got %T", c) 870 } else if c.Status.Replicas != numReplicas { 871 t.Errorf("Expected update for ReplicaSet to contain replicas %v, got %v instead", 872 numReplicas, c.Status.Replicas) 873 } 874 default: 875 t.Errorf("Unexpected action %+v", a) 876 break 877 } 878 } 879 if gets != 1 || updates != 2 { 880 t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates) 881 } 882} 883 884// TODO: This test is too hairy for a unittest. It should be moved to an E2E suite. 885func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { 886 labelMap := map[string]string{"foo": "bar"} 887 rsSpec := newReplicaSet(numReplicas, labelMap) 888 client := fake.NewSimpleClientset(rsSpec) 889 fakePodControl := controller.FakePodControl{} 890 stopCh := make(chan struct{}) 891 defer close(stopCh) 892 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, burstReplicas) 893 manager.podControl = &fakePodControl 894 895 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) 896 897 expectedPods := int32(0) 898 pods := newPodList(nil, numReplicas, v1.PodPending, labelMap, rsSpec, "pod") 899 900 rsKey, err := controller.KeyFunc(rsSpec) 901 if err != nil { 902 t.Errorf("Couldn't get key for object %#v: %v", rsSpec, err) 903 } 904 905 // Size up the controller, then size it down, and confirm the expected create/delete pattern 906 for _, replicas := range []int32{int32(numReplicas), 0} { 907 908 *(rsSpec.Spec.Replicas) = replicas 909 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) 910 911 for i := 0; i < numReplicas; i += burstReplicas { 912 manager.syncReplicaSet(GetKey(rsSpec, t)) 913 914 // The store accrues active pods. It's also used by the ReplicaSet to determine how many 915 // replicas to create. 916 activePods := int32(len(informers.Core().V1().Pods().Informer().GetIndexer().List())) 917 if replicas != 0 { 918 // This is the number of pods currently "in flight". They were created by the 919 // ReplicaSet controller above, which then puts the ReplicaSet to sleep till 920 // all of them have been observed. 921 expectedPods = replicas - activePods 922 if expectedPods > int32(burstReplicas) { 923 expectedPods = int32(burstReplicas) 924 } 925 // This validates the ReplicaSet manager sync actually created pods 926 err := validateSyncReplicaSet(&fakePodControl, int(expectedPods), 0, 0) 927 if err != nil { 928 t.Fatal(err) 929 } 930 931 // This simulates the watch events for all but 1 of the expected pods. 932 // None of these should wake the controller because it has expectations==BurstReplicas. 933 for i := int32(0); i < expectedPods-1; i++ { 934 informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[i]) 935 manager.addPod(&pods.Items[i]) 936 } 937 938 podExp, exists, err := manager.expectations.GetExpectations(rsKey) 939 if !exists || err != nil { 940 t.Fatalf("Did not find expectations for rs.") 941 } 942 if add, _ := podExp.GetExpectations(); add != 1 { 943 t.Fatalf("Expectations are wrong %v", podExp) 944 } 945 } else { 946 expectedPods = (replicas - activePods) * -1 947 if expectedPods > int32(burstReplicas) { 948 expectedPods = int32(burstReplicas) 949 } 950 err := validateSyncReplicaSet(&fakePodControl, 0, int(expectedPods), 0) 951 if err != nil { 952 t.Fatal(err) 953 } 954 955 // To accurately simulate a watch we must delete the exact pods 956 // the rs is waiting for. 957 expectedDels := manager.expectations.GetUIDs(GetKey(rsSpec, t)) 958 podsToDelete := []*v1.Pod{} 959 isController := true 960 for _, key := range expectedDels.List() { 961 nsName := strings.Split(key, "/") 962 podsToDelete = append(podsToDelete, &v1.Pod{ 963 ObjectMeta: metav1.ObjectMeta{ 964 Name: nsName[1], 965 Namespace: nsName[0], 966 Labels: rsSpec.Spec.Selector.MatchLabels, 967 OwnerReferences: []metav1.OwnerReference{ 968 {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController}, 969 }, 970 }, 971 }) 972 } 973 // Don't delete all pods because we confirm that the last pod 974 // has exactly one expectation at the end, to verify that we 975 // don't double delete. 976 for i := range podsToDelete[1:] { 977 informers.Core().V1().Pods().Informer().GetIndexer().Delete(podsToDelete[i]) 978 manager.deletePod(podsToDelete[i]) 979 } 980 podExp, exists, err := manager.expectations.GetExpectations(rsKey) 981 if !exists || err != nil { 982 t.Fatalf("Did not find expectations for ReplicaSet.") 983 } 984 if _, del := podExp.GetExpectations(); del != 1 { 985 t.Fatalf("Expectations are wrong %v", podExp) 986 } 987 } 988 989 // Check that the ReplicaSet didn't take any action for all the above pods 990 fakePodControl.Clear() 991 manager.syncReplicaSet(GetKey(rsSpec, t)) 992 err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) 993 if err != nil { 994 t.Fatal(err) 995 } 996 997 // Create/Delete the last pod 998 // The last add pod will decrease the expectation of the ReplicaSet to 0, 999 // which will cause it to create/delete the remaining replicas up to burstReplicas. 1000 if replicas != 0 { 1001 informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[expectedPods-1]) 1002 manager.addPod(&pods.Items[expectedPods-1]) 1003 } else { 1004 expectedDel := manager.expectations.GetUIDs(GetKey(rsSpec, t)) 1005 if expectedDel.Len() != 1 { 1006 t.Fatalf("Waiting on unexpected number of deletes.") 1007 } 1008 nsName := strings.Split(expectedDel.List()[0], "/") 1009 isController := true 1010 lastPod := &v1.Pod{ 1011 ObjectMeta: metav1.ObjectMeta{ 1012 Name: nsName[1], 1013 Namespace: nsName[0], 1014 Labels: rsSpec.Spec.Selector.MatchLabels, 1015 OwnerReferences: []metav1.OwnerReference{ 1016 {UID: rsSpec.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rsSpec.Name, Controller: &isController}, 1017 }, 1018 }, 1019 } 1020 informers.Core().V1().Pods().Informer().GetIndexer().Delete(lastPod) 1021 manager.deletePod(lastPod) 1022 } 1023 pods.Items = pods.Items[expectedPods:] 1024 } 1025 1026 // Confirm that we've created the right number of replicas 1027 activePods := int32(len(informers.Core().V1().Pods().Informer().GetIndexer().List())) 1028 if activePods != *(rsSpec.Spec.Replicas) { 1029 t.Fatalf("Unexpected number of active pods, expected %d, got %d", *(rsSpec.Spec.Replicas), activePods) 1030 } 1031 // Replenish the pod list, since we cut it down sizing up 1032 pods = newPodList(nil, int(replicas), v1.PodRunning, labelMap, rsSpec, "pod") 1033 } 1034} 1035 1036func TestControllerBurstReplicas(t *testing.T) { 1037 doTestControllerBurstReplicas(t, 5, 30) 1038 doTestControllerBurstReplicas(t, 5, 12) 1039 doTestControllerBurstReplicas(t, 3, 2) 1040} 1041 1042type FakeRSExpectations struct { 1043 *controller.ControllerExpectations 1044 satisfied bool 1045 expSatisfied func() 1046} 1047 1048func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool { 1049 fe.expSatisfied() 1050 return fe.satisfied 1051} 1052 1053// TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods 1054// and checking expectations. 1055func TestRSSyncExpectations(t *testing.T) { 1056 client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) 1057 fakePodControl := controller.FakePodControl{} 1058 stopCh := make(chan struct{}) 1059 defer close(stopCh) 1060 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 2) 1061 manager.podControl = &fakePodControl 1062 1063 labelMap := map[string]string{"foo": "bar"} 1064 rsSpec := newReplicaSet(2, labelMap) 1065 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec) 1066 pods := newPodList(nil, 2, v1.PodPending, labelMap, rsSpec, "pod") 1067 informers.Core().V1().Pods().Informer().GetIndexer().Add(&pods.Items[0]) 1068 postExpectationsPod := pods.Items[1] 1069 1070 manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{ 1071 controller.NewControllerExpectations(), true, func() { 1072 // If we check active pods before checking expectataions, the 1073 // ReplicaSet will create a new replica because it doesn't see 1074 // this pod, but has fulfilled its expectations. 1075 informers.Core().V1().Pods().Informer().GetIndexer().Add(&postExpectationsPod) 1076 }, 1077 }) 1078 manager.syncReplicaSet(GetKey(rsSpec, t)) 1079 err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0) 1080 if err != nil { 1081 t.Fatal(err) 1082 } 1083} 1084 1085func TestDeleteControllerAndExpectations(t *testing.T) { 1086 rs := newReplicaSet(1, map[string]string{"foo": "bar"}) 1087 client := fake.NewSimpleClientset(rs) 1088 stopCh := make(chan struct{}) 1089 defer close(stopCh) 1090 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10) 1091 1092 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) 1093 1094 fakePodControl := controller.FakePodControl{} 1095 manager.podControl = &fakePodControl 1096 1097 // This should set expectations for the ReplicaSet 1098 manager.syncReplicaSet(GetKey(rs, t)) 1099 err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0) 1100 if err != nil { 1101 t.Fatal(err) 1102 } 1103 fakePodControl.Clear() 1104 1105 // Get the ReplicaSet key 1106 rsKey, err := controller.KeyFunc(rs) 1107 if err != nil { 1108 t.Errorf("Couldn't get key for object %#v: %v", rs, err) 1109 } 1110 1111 // This is to simulate a concurrent addPod, that has a handle on the expectations 1112 // as the controller deletes it. 1113 podExp, exists, err := manager.expectations.GetExpectations(rsKey) 1114 if !exists || err != nil { 1115 t.Errorf("No expectations found for ReplicaSet") 1116 } 1117 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs) 1118 manager.deleteRS(rs) 1119 manager.syncReplicaSet(GetKey(rs, t)) 1120 1121 _, exists, err = manager.expectations.GetExpectations(rsKey) 1122 if err != nil { 1123 t.Errorf("Failed to get controllee expectations: %v", err) 1124 } 1125 if exists { 1126 t.Errorf("Found expectations, expected none since the ReplicaSet has been deleted.") 1127 } 1128 1129 // This should have no effect, since we've deleted the ReplicaSet. 1130 podExp.Add(-1, 0) 1131 informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0") 1132 manager.syncReplicaSet(GetKey(rs, t)) 1133 err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0) 1134 if err != nil { 1135 t.Fatal(err) 1136 } 1137} 1138 1139func TestExpectationsOnRecreate(t *testing.T) { 1140 client := fake.NewSimpleClientset() 1141 stopCh := make(chan struct{}) 1142 defer close(stopCh) 1143 1144 f := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) 1145 manager := NewReplicaSetController( 1146 f.Apps().V1().ReplicaSets(), 1147 f.Core().V1().Pods(), 1148 client, 1149 100, 1150 ) 1151 f.Start(stopCh) 1152 f.WaitForCacheSync(stopCh) 1153 fakePodControl := controller.FakePodControl{} 1154 manager.podControl = &fakePodControl 1155 1156 if manager.queue.Len() != 0 { 1157 t.Fatal("Unexpected item in the queue") 1158 } 1159 1160 oldRS := newReplicaSet(1, map[string]string{"foo": "bar"}) 1161 oldRS, err := client.AppsV1().ReplicaSets(oldRS.Namespace).Create(context.TODO(), oldRS, metav1.CreateOptions{}) 1162 if err != nil { 1163 t.Fatal(err) 1164 } 1165 1166 err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { 1167 klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len()) 1168 return manager.queue.Len() == 1, nil 1169 }) 1170 if err != nil { 1171 t.Fatalf("initial RS didn't result in new item in the queue: %v", err) 1172 } 1173 1174 ok := manager.processNextWorkItem() 1175 if !ok { 1176 t.Fatal("queue is shutting down") 1177 } 1178 1179 err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) 1180 if err != nil { 1181 t.Fatal(err) 1182 } 1183 fakePodControl.Clear() 1184 1185 oldRSKey, err := controller.KeyFunc(oldRS) 1186 if err != nil { 1187 t.Fatal(err) 1188 } 1189 1190 rsExp, exists, err := manager.expectations.GetExpectations(oldRSKey) 1191 if err != nil { 1192 t.Fatal(err) 1193 } 1194 if !exists { 1195 t.Errorf("No expectations found for ReplicaSet %q", oldRSKey) 1196 } 1197 if rsExp.Fulfilled() { 1198 t.Errorf("There should be unfulfilled expectations for creating new pods for ReplicaSet %q", oldRSKey) 1199 } 1200 1201 if manager.queue.Len() != 0 { 1202 t.Fatal("Unexpected item in the queue") 1203 } 1204 1205 err = client.AppsV1().ReplicaSets(oldRS.Namespace).Delete(context.TODO(), oldRS.Name, metav1.DeleteOptions{}) 1206 if err != nil { 1207 t.Fatal(err) 1208 } 1209 1210 err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { 1211 klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len()) 1212 return manager.queue.Len() == 1, nil 1213 }) 1214 if err != nil { 1215 t.Fatalf("Deleting RS didn't result in new item in the queue: %v", err) 1216 } 1217 1218 _, exists, err = manager.expectations.GetExpectations(oldRSKey) 1219 if err != nil { 1220 t.Fatal(err) 1221 } 1222 if exists { 1223 t.Errorf("There should be no expectations for ReplicaSet %q after it was deleted", oldRSKey) 1224 } 1225 1226 // skip sync for the delete event so we only see the new RS in sync 1227 key, quit := manager.queue.Get() 1228 if quit { 1229 t.Fatal("Queue is shutting down!") 1230 } 1231 manager.queue.Done(key) 1232 if key != oldRSKey { 1233 t.Fatal("Keys should be equal!") 1234 } 1235 1236 if manager.queue.Len() != 0 { 1237 t.Fatal("Unexpected item in the queue") 1238 } 1239 1240 newRS := oldRS.DeepCopy() 1241 newRS.UID = uuid.NewUUID() 1242 newRS, err = client.AppsV1().ReplicaSets(newRS.Namespace).Create(context.TODO(), newRS, metav1.CreateOptions{}) 1243 if err != nil { 1244 t.Fatal(err) 1245 } 1246 1247 // Sanity check 1248 if newRS.UID == oldRS.UID { 1249 t.Fatal("New RS has the same UID as the old one!") 1250 } 1251 1252 err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { 1253 klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", manager.queue.Len()) 1254 return manager.queue.Len() == 1, nil 1255 }) 1256 if err != nil { 1257 t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err) 1258 } 1259 1260 ok = manager.processNextWorkItem() 1261 if !ok { 1262 t.Fatal("Queue is shutting down!") 1263 } 1264 1265 newRSKey, err := controller.KeyFunc(newRS) 1266 if err != nil { 1267 t.Fatal(err) 1268 } 1269 rsExp, exists, err = manager.expectations.GetExpectations(newRSKey) 1270 if err != nil { 1271 t.Fatal(err) 1272 } 1273 if !exists { 1274 t.Errorf("No expectations found for ReplicaSet %q", oldRSKey) 1275 } 1276 if rsExp.Fulfilled() { 1277 t.Errorf("There should be unfulfilled expectations for creating new pods for ReplicaSet %q", oldRSKey) 1278 } 1279 1280 err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0) 1281 if err != nil { 1282 t.Fatal(err) 1283 } 1284 fakePodControl.Clear() 1285} 1286 1287// shuffle returns a new shuffled list of container controllers. 1288func shuffle(controllers []*apps.ReplicaSet) []*apps.ReplicaSet { 1289 numControllers := len(controllers) 1290 randIndexes := rand.Perm(numControllers) 1291 shuffled := make([]*apps.ReplicaSet, numControllers) 1292 for i := 0; i < numControllers; i++ { 1293 shuffled[i] = controllers[randIndexes[i]] 1294 } 1295 return shuffled 1296} 1297 1298func TestOverlappingRSs(t *testing.T) { 1299 client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) 1300 labelMap := map[string]string{"foo": "bar"} 1301 1302 stopCh := make(chan struct{}) 1303 defer close(stopCh) 1304 manager, informers := testNewReplicaSetControllerFromClient(client, stopCh, 10) 1305 1306 // Create 10 ReplicaSets, shuffled them randomly and insert them into the 1307 // ReplicaSet controller's store. 1308 // All use the same CreationTimestamp since ControllerRef should be able 1309 // to handle that. 1310 timestamp := metav1.Date(2014, time.December, 0, 0, 0, 0, 0, time.Local) 1311 var controllers []*apps.ReplicaSet 1312 for j := 1; j < 10; j++ { 1313 rsSpec := newReplicaSet(1, labelMap) 1314 rsSpec.CreationTimestamp = timestamp 1315 rsSpec.Name = fmt.Sprintf("rs%d", j) 1316 controllers = append(controllers, rsSpec) 1317 } 1318 shuffledControllers := shuffle(controllers) 1319 for j := range shuffledControllers { 1320 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(shuffledControllers[j]) 1321 } 1322 // Add a pod with a ControllerRef and make sure only the corresponding 1323 // ReplicaSet is synced. Pick a RS in the middle since the old code used to 1324 // sort by name if all timestamps were equal. 1325 rs := controllers[3] 1326 pods := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod") 1327 pod := &pods.Items[0] 1328 isController := true 1329 pod.OwnerReferences = []metav1.OwnerReference{ 1330 {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController}, 1331 } 1332 rsKey := GetKey(rs, t) 1333 1334 manager.addPod(pod) 1335 queueRS, _ := manager.queue.Get() 1336 if queueRS != rsKey { 1337 t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) 1338 } 1339} 1340 1341func TestDeletionTimestamp(t *testing.T) { 1342 c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) 1343 labelMap := map[string]string{"foo": "bar"} 1344 stopCh := make(chan struct{}) 1345 defer close(stopCh) 1346 manager, informers := testNewReplicaSetControllerFromClient(c, stopCh, 10) 1347 1348 rs := newReplicaSet(1, labelMap) 1349 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) 1350 rsKey, err := controller.KeyFunc(rs) 1351 if err != nil { 1352 t.Errorf("Couldn't get key for object %#v: %v", rs, err) 1353 } 1354 pod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0] 1355 pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} 1356 pod.ResourceVersion = "1" 1357 manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) 1358 1359 // A pod added with a deletion timestamp should decrement deletions, not creations. 1360 manager.addPod(&pod) 1361 1362 queueRS, _ := manager.queue.Get() 1363 if queueRS != rsKey { 1364 t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) 1365 } 1366 manager.queue.Done(rsKey) 1367 1368 podExp, exists, err := manager.expectations.GetExpectations(rsKey) 1369 if !exists || err != nil || !podExp.Fulfilled() { 1370 t.Fatalf("Wrong expectations %#v", podExp) 1371 } 1372 1373 // An update from no deletion timestamp to having one should be treated 1374 // as a deletion. 1375 oldPod := newPodList(nil, 1, v1.PodPending, labelMap, rs, "pod").Items[0] 1376 oldPod.ResourceVersion = "2" 1377 manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)}) 1378 manager.updatePod(&oldPod, &pod) 1379 1380 queueRS, _ = manager.queue.Get() 1381 if queueRS != rsKey { 1382 t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) 1383 } 1384 manager.queue.Done(rsKey) 1385 1386 podExp, exists, err = manager.expectations.GetExpectations(rsKey) 1387 if !exists || err != nil || !podExp.Fulfilled() { 1388 t.Fatalf("Wrong expectations %#v", podExp) 1389 } 1390 1391 // An update to the pod (including an update to the deletion timestamp) 1392 // should not be counted as a second delete. 1393 isController := true 1394 secondPod := &v1.Pod{ 1395 ObjectMeta: metav1.ObjectMeta{ 1396 Namespace: pod.Namespace, 1397 Name: "secondPod", 1398 Labels: pod.Labels, 1399 OwnerReferences: []metav1.OwnerReference{ 1400 {UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name, Controller: &isController}, 1401 }, 1402 }, 1403 } 1404 manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)}) 1405 oldPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} 1406 oldPod.ResourceVersion = "2" 1407 manager.updatePod(&oldPod, &pod) 1408 1409 podExp, exists, err = manager.expectations.GetExpectations(rsKey) 1410 if !exists || err != nil || podExp.Fulfilled() { 1411 t.Fatalf("Wrong expectations %#v", podExp) 1412 } 1413 1414 // A pod with a non-nil deletion timestamp should also be ignored by the 1415 // delete handler, because it's already been counted in the update. 1416 manager.deletePod(&pod) 1417 podExp, exists, err = manager.expectations.GetExpectations(rsKey) 1418 if !exists || err != nil || podExp.Fulfilled() { 1419 t.Fatalf("Wrong expectations %#v", podExp) 1420 } 1421 1422 // Deleting the second pod should clear expectations. 1423 manager.deletePod(secondPod) 1424 1425 queueRS, _ = manager.queue.Get() 1426 if queueRS != rsKey { 1427 t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) 1428 } 1429 manager.queue.Done(rsKey) 1430 1431 podExp, exists, err = manager.expectations.GetExpectations(rsKey) 1432 if !exists || err != nil || !podExp.Fulfilled() { 1433 t.Fatalf("Wrong expectations %#v", podExp) 1434 } 1435} 1436 1437// setupManagerWithGCEnabled creates a RS manager with a fakePodControl 1438func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) { 1439 c := fake.NewSimpleClientset(objs...) 1440 fakePodControl = &controller.FakePodControl{} 1441 manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas) 1442 1443 manager.podControl = fakePodControl 1444 return manager, fakePodControl, informers 1445} 1446 1447func TestDoNotPatchPodWithOtherControlRef(t *testing.T) { 1448 labelMap := map[string]string{"foo": "bar"} 1449 rs := newReplicaSet(2, labelMap) 1450 stopCh := make(chan struct{}) 1451 defer close(stopCh) 1452 manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs) 1453 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) 1454 var trueVar = true 1455 otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar} 1456 // add to podLister a matching Pod controlled by another controller. Expect no patch. 1457 pod := newPod("pod", rs, v1.PodRunning, nil, true) 1458 pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference} 1459 informers.Core().V1().Pods().Informer().GetIndexer().Add(pod) 1460 err := manager.syncReplicaSet(GetKey(rs, t)) 1461 if err != nil { 1462 t.Fatal(err) 1463 } 1464 // because the matching pod already has a controller, so 2 pods should be created. 1465 err = validateSyncReplicaSet(fakePodControl, 2, 0, 0) 1466 if err != nil { 1467 t.Fatal(err) 1468 } 1469} 1470 1471func TestPatchPodFails(t *testing.T) { 1472 labelMap := map[string]string{"foo": "bar"} 1473 rs := newReplicaSet(2, labelMap) 1474 stopCh := make(chan struct{}) 1475 defer close(stopCh) 1476 manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs) 1477 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) 1478 // add to podLister two matching pods. Expect two patches to take control 1479 // them. 1480 informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil, false)) 1481 informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil, false)) 1482 // let both patches fail. The rs controller will assume it fails to take 1483 // control of the pods and requeue to try again. 1484 fakePodControl.Err = fmt.Errorf("fake Error") 1485 rsKey := GetKey(rs, t) 1486 err := processSync(manager, rsKey) 1487 if err == nil || !strings.Contains(err.Error(), "fake Error") { 1488 t.Errorf("expected fake Error, got %+v", err) 1489 } 1490 // 2 patches to take control of pod1 and pod2 (both fail). 1491 err = validateSyncReplicaSet(fakePodControl, 0, 0, 2) 1492 if err != nil { 1493 t.Fatal(err) 1494 } 1495 // RS should requeue itself. 1496 queueRS, _ := manager.queue.Get() 1497 if queueRS != rsKey { 1498 t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS) 1499 } 1500} 1501 1502// RS controller shouldn't adopt or create more pods if the rc is about to be 1503// deleted. 1504func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) { 1505 labelMap := map[string]string{"foo": "bar"} 1506 rs := newReplicaSet(2, labelMap) 1507 now := metav1.Now() 1508 rs.DeletionTimestamp = &now 1509 stopCh := make(chan struct{}) 1510 defer close(stopCh) 1511 manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs) 1512 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs) 1513 pod1 := newPod("pod1", rs, v1.PodRunning, nil, false) 1514 informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) 1515 1516 // no patch, no create 1517 err := manager.syncReplicaSet(GetKey(rs, t)) 1518 if err != nil { 1519 t.Fatal(err) 1520 } 1521 err = validateSyncReplicaSet(fakePodControl, 0, 0, 0) 1522 if err != nil { 1523 t.Fatal(err) 1524 } 1525} 1526 1527func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) { 1528 labelMap := map[string]string{"foo": "bar"} 1529 // Bare client says it IS deleted. 1530 rs := newReplicaSet(2, labelMap) 1531 now := metav1.Now() 1532 rs.DeletionTimestamp = &now 1533 stopCh := make(chan struct{}) 1534 defer close(stopCh) 1535 manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs) 1536 // Lister (cache) says it's NOT deleted. 1537 rs2 := *rs 1538 rs2.DeletionTimestamp = nil 1539 informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(&rs2) 1540 1541 // Recheck occurs if a matching orphan is present. 1542 pod1 := newPod("pod1", rs, v1.PodRunning, nil, false) 1543 informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1) 1544 1545 // sync should abort. 1546 err := manager.syncReplicaSet(GetKey(rs, t)) 1547 if err == nil { 1548 t.Error("syncReplicaSet() err = nil, expected non-nil") 1549 } 1550 // no patch, no create. 1551 err = validateSyncReplicaSet(fakePodControl, 0, 0, 0) 1552 if err != nil { 1553 t.Fatal(err) 1554 } 1555} 1556 1557var ( 1558 imagePullBackOff apps.ReplicaSetConditionType = "ImagePullBackOff" 1559 1560 condImagePullBackOff = func() apps.ReplicaSetCondition { 1561 return apps.ReplicaSetCondition{ 1562 Type: imagePullBackOff, 1563 Status: v1.ConditionTrue, 1564 Reason: "NonExistentImage", 1565 } 1566 } 1567 1568 condReplicaFailure = func() apps.ReplicaSetCondition { 1569 return apps.ReplicaSetCondition{ 1570 Type: apps.ReplicaSetReplicaFailure, 1571 Status: v1.ConditionTrue, 1572 Reason: "OtherFailure", 1573 } 1574 } 1575 1576 condReplicaFailure2 = func() apps.ReplicaSetCondition { 1577 return apps.ReplicaSetCondition{ 1578 Type: apps.ReplicaSetReplicaFailure, 1579 Status: v1.ConditionTrue, 1580 Reason: "AnotherFailure", 1581 } 1582 } 1583 1584 status = func() *apps.ReplicaSetStatus { 1585 return &apps.ReplicaSetStatus{ 1586 Conditions: []apps.ReplicaSetCondition{condReplicaFailure()}, 1587 } 1588 } 1589) 1590 1591func TestGetCondition(t *testing.T) { 1592 exampleStatus := status() 1593 1594 tests := []struct { 1595 name string 1596 1597 status apps.ReplicaSetStatus 1598 condType apps.ReplicaSetConditionType 1599 1600 expected bool 1601 }{ 1602 { 1603 name: "condition exists", 1604 1605 status: *exampleStatus, 1606 condType: apps.ReplicaSetReplicaFailure, 1607 1608 expected: true, 1609 }, 1610 { 1611 name: "condition does not exist", 1612 1613 status: *exampleStatus, 1614 condType: imagePullBackOff, 1615 1616 expected: false, 1617 }, 1618 } 1619 1620 for _, test := range tests { 1621 cond := GetCondition(test.status, test.condType) 1622 exists := cond != nil 1623 if exists != test.expected { 1624 t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists) 1625 } 1626 } 1627} 1628 1629func TestSetCondition(t *testing.T) { 1630 tests := []struct { 1631 name string 1632 1633 status *apps.ReplicaSetStatus 1634 cond apps.ReplicaSetCondition 1635 1636 expectedStatus *apps.ReplicaSetStatus 1637 }{ 1638 { 1639 name: "set for the first time", 1640 1641 status: &apps.ReplicaSetStatus{}, 1642 cond: condReplicaFailure(), 1643 1644 expectedStatus: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure()}}, 1645 }, 1646 { 1647 name: "simple set", 1648 1649 status: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condImagePullBackOff()}}, 1650 cond: condReplicaFailure(), 1651 1652 expectedStatus: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condImagePullBackOff(), condReplicaFailure()}}, 1653 }, 1654 { 1655 name: "overwrite", 1656 1657 status: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure()}}, 1658 cond: condReplicaFailure2(), 1659 1660 expectedStatus: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure2()}}, 1661 }, 1662 } 1663 1664 for _, test := range tests { 1665 SetCondition(test.status, test.cond) 1666 if !reflect.DeepEqual(test.status, test.expectedStatus) { 1667 t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) 1668 } 1669 } 1670} 1671 1672func TestRemoveCondition(t *testing.T) { 1673 tests := []struct { 1674 name string 1675 1676 status *apps.ReplicaSetStatus 1677 condType apps.ReplicaSetConditionType 1678 1679 expectedStatus *apps.ReplicaSetStatus 1680 }{ 1681 { 1682 name: "remove from empty status", 1683 1684 status: &apps.ReplicaSetStatus{}, 1685 condType: apps.ReplicaSetReplicaFailure, 1686 1687 expectedStatus: &apps.ReplicaSetStatus{}, 1688 }, 1689 { 1690 name: "simple remove", 1691 1692 status: &apps.ReplicaSetStatus{Conditions: []apps.ReplicaSetCondition{condReplicaFailure()}}, 1693 condType: apps.ReplicaSetReplicaFailure, 1694 1695 expectedStatus: &apps.ReplicaSetStatus{}, 1696 }, 1697 { 1698 name: "doesn't remove anything", 1699 1700 status: status(), 1701 condType: imagePullBackOff, 1702 1703 expectedStatus: status(), 1704 }, 1705 } 1706 1707 for _, test := range tests { 1708 RemoveCondition(test.status, test.condType) 1709 if !reflect.DeepEqual(test.status, test.expectedStatus) { 1710 t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status) 1711 } 1712 } 1713} 1714 1715func TestSlowStartBatch(t *testing.T) { 1716 fakeErr := fmt.Errorf("fake error") 1717 callCnt := 0 1718 callLimit := 0 1719 var lock sync.Mutex 1720 fn := func() error { 1721 lock.Lock() 1722 defer lock.Unlock() 1723 callCnt++ 1724 if callCnt > callLimit { 1725 return fakeErr 1726 } 1727 return nil 1728 } 1729 1730 tests := []struct { 1731 name string 1732 count int 1733 callLimit int 1734 fn func() error 1735 expectedSuccesses int 1736 expectedErr error 1737 expectedCallCnt int 1738 }{ 1739 { 1740 name: "callLimit = 0 (all fail)", 1741 count: 10, 1742 callLimit: 0, 1743 fn: fn, 1744 expectedSuccesses: 0, 1745 expectedErr: fakeErr, 1746 expectedCallCnt: 1, // 1(first batch): function will be called at least once 1747 }, 1748 { 1749 name: "callLimit = count (all succeed)", 1750 count: 10, 1751 callLimit: 10, 1752 fn: fn, 1753 expectedSuccesses: 10, 1754 expectedErr: nil, 1755 expectedCallCnt: 10, // 1(first batch) + 2(2nd batch) + 4(3rd batch) + 3(4th batch) = 10 1756 }, 1757 { 1758 name: "callLimit < count (some succeed)", 1759 count: 10, 1760 callLimit: 5, 1761 fn: fn, 1762 expectedSuccesses: 5, 1763 expectedErr: fakeErr, 1764 expectedCallCnt: 7, // 1(first batch) + 2(2nd batch) + 4(3rd batch) = 7 1765 }, 1766 } 1767 1768 for _, test := range tests { 1769 callCnt = 0 1770 callLimit = test.callLimit 1771 successes, err := slowStartBatch(test.count, 1, test.fn) 1772 if successes != test.expectedSuccesses { 1773 t.Errorf("%s: unexpected processed batch size, expected %d, got %d", test.name, test.expectedSuccesses, successes) 1774 } 1775 if err != test.expectedErr { 1776 t.Errorf("%s: unexpected processed batch size, expected %v, got %v", test.name, test.expectedErr, err) 1777 } 1778 // verify that slowStartBatch stops trying more calls after a batch fails 1779 if callCnt != test.expectedCallCnt { 1780 t.Errorf("%s: slowStartBatch() still tries calls after a batch fails, expected %d calls, got %d", test.name, test.expectedCallCnt, callCnt) 1781 } 1782 } 1783} 1784 1785func TestGetPodsToDelete(t *testing.T) { 1786 labelMap := map[string]string{"name": "foo"} 1787 rs := newReplicaSet(1, labelMap) 1788 // an unscheduled, pending pod 1789 unscheduledPendingPod := newPod("unscheduled-pending-pod", rs, v1.PodPending, nil, true) 1790 // a scheduled, pending pod 1791 scheduledPendingPod := newPod("scheduled-pending-pod", rs, v1.PodPending, nil, true) 1792 scheduledPendingPod.Spec.NodeName = "fake-node" 1793 // a scheduled, running, not-ready pod 1794 scheduledRunningNotReadyPod := newPod("scheduled-running-not-ready-pod", rs, v1.PodRunning, nil, true) 1795 scheduledRunningNotReadyPod.Spec.NodeName = "fake-node" 1796 scheduledRunningNotReadyPod.Status.Conditions = []v1.PodCondition{ 1797 { 1798 Type: v1.PodReady, 1799 Status: v1.ConditionFalse, 1800 }, 1801 } 1802 // a scheduled, running, ready pod on fake-node-1 1803 scheduledRunningReadyPodOnNode1 := newPod("scheduled-running-ready-pod-on-node-1", rs, v1.PodRunning, nil, true) 1804 scheduledRunningReadyPodOnNode1.Spec.NodeName = "fake-node-1" 1805 scheduledRunningReadyPodOnNode1.Status.Conditions = []v1.PodCondition{ 1806 { 1807 Type: v1.PodReady, 1808 Status: v1.ConditionTrue, 1809 }, 1810 } 1811 // a scheduled, running, ready pod on fake-node-2 1812 scheduledRunningReadyPodOnNode2 := newPod("scheduled-running-ready-pod-on-node-2", rs, v1.PodRunning, nil, true) 1813 scheduledRunningReadyPodOnNode2.Spec.NodeName = "fake-node-2" 1814 scheduledRunningReadyPodOnNode2.Status.Conditions = []v1.PodCondition{ 1815 { 1816 Type: v1.PodReady, 1817 Status: v1.ConditionTrue, 1818 }, 1819 } 1820 1821 tests := []struct { 1822 name string 1823 pods []*v1.Pod 1824 // related defaults to pods if nil. 1825 related []*v1.Pod 1826 diff int 1827 expectedPodsToDelete []*v1.Pod 1828 }{ 1829 // Order used when selecting pods for deletion: 1830 // an unscheduled, pending pod 1831 // a scheduled, pending pod 1832 // a scheduled, running, not-ready pod 1833 // a scheduled, running, ready pod on same node as a related pod 1834 // a scheduled, running, ready pod not on node with related pods 1835 // Note that a pending pod cannot be ready 1836 { 1837 name: "len(pods) = 0 (i.e., diff = 0 too)", 1838 pods: []*v1.Pod{}, 1839 diff: 0, 1840 expectedPodsToDelete: []*v1.Pod{}, 1841 }, 1842 { 1843 name: "diff = len(pods)", 1844 pods: []*v1.Pod{ 1845 scheduledRunningNotReadyPod, 1846 scheduledRunningReadyPodOnNode1, 1847 }, 1848 diff: 2, 1849 expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPodOnNode1}, 1850 }, 1851 { 1852 name: "diff < len(pods)", 1853 pods: []*v1.Pod{ 1854 scheduledRunningReadyPodOnNode1, 1855 scheduledRunningNotReadyPod, 1856 }, 1857 diff: 1, 1858 expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod}, 1859 }, 1860 { 1861 name: "various pod phases and conditions, diff = len(pods)", 1862 pods: []*v1.Pod{ 1863 scheduledRunningReadyPodOnNode1, 1864 scheduledRunningReadyPodOnNode1, 1865 scheduledRunningReadyPodOnNode2, 1866 scheduledRunningNotReadyPod, 1867 scheduledPendingPod, 1868 unscheduledPendingPod, 1869 }, 1870 diff: 6, 1871 expectedPodsToDelete: []*v1.Pod{ 1872 scheduledRunningReadyPodOnNode1, 1873 scheduledRunningReadyPodOnNode1, 1874 scheduledRunningReadyPodOnNode2, 1875 scheduledRunningNotReadyPod, 1876 scheduledPendingPod, 1877 unscheduledPendingPod, 1878 }, 1879 }, 1880 { 1881 name: "various pod phases and conditions, diff = len(pods), relatedPods empty", 1882 pods: []*v1.Pod{ 1883 scheduledRunningReadyPodOnNode1, 1884 scheduledRunningReadyPodOnNode1, 1885 scheduledRunningReadyPodOnNode2, 1886 scheduledRunningNotReadyPod, 1887 scheduledPendingPod, 1888 unscheduledPendingPod, 1889 }, 1890 related: []*v1.Pod{}, 1891 diff: 6, 1892 expectedPodsToDelete: []*v1.Pod{ 1893 scheduledRunningReadyPodOnNode1, 1894 scheduledRunningReadyPodOnNode1, 1895 scheduledRunningReadyPodOnNode2, 1896 scheduledRunningNotReadyPod, 1897 scheduledPendingPod, 1898 unscheduledPendingPod, 1899 }, 1900 }, 1901 { 1902 name: "scheduled vs unscheduled, diff < len(pods)", 1903 pods: []*v1.Pod{ 1904 scheduledPendingPod, 1905 unscheduledPendingPod, 1906 }, 1907 diff: 1, 1908 expectedPodsToDelete: []*v1.Pod{ 1909 unscheduledPendingPod, 1910 }, 1911 }, 1912 { 1913 name: "ready vs not-ready, diff < len(pods)", 1914 pods: []*v1.Pod{ 1915 scheduledRunningReadyPodOnNode1, 1916 scheduledRunningNotReadyPod, 1917 scheduledRunningNotReadyPod, 1918 }, 1919 diff: 2, 1920 expectedPodsToDelete: []*v1.Pod{ 1921 scheduledRunningNotReadyPod, 1922 scheduledRunningNotReadyPod, 1923 }, 1924 }, 1925 { 1926 name: "ready and colocated with another ready pod vs not colocated, diff < len(pods)", 1927 pods: []*v1.Pod{ 1928 scheduledRunningReadyPodOnNode1, 1929 scheduledRunningReadyPodOnNode2, 1930 }, 1931 related: []*v1.Pod{ 1932 scheduledRunningReadyPodOnNode1, 1933 scheduledRunningReadyPodOnNode2, 1934 scheduledRunningReadyPodOnNode2, 1935 }, 1936 diff: 1, 1937 expectedPodsToDelete: []*v1.Pod{ 1938 scheduledRunningReadyPodOnNode2, 1939 }, 1940 }, 1941 { 1942 name: "pending vs running, diff < len(pods)", 1943 pods: []*v1.Pod{ 1944 scheduledPendingPod, 1945 scheduledRunningNotReadyPod, 1946 }, 1947 diff: 1, 1948 expectedPodsToDelete: []*v1.Pod{ 1949 scheduledPendingPod, 1950 }, 1951 }, 1952 { 1953 name: "various pod phases and conditions, diff < len(pods)", 1954 pods: []*v1.Pod{ 1955 scheduledRunningReadyPodOnNode1, 1956 scheduledRunningReadyPodOnNode2, 1957 scheduledRunningNotReadyPod, 1958 scheduledPendingPod, 1959 unscheduledPendingPod, 1960 }, 1961 diff: 3, 1962 expectedPodsToDelete: []*v1.Pod{ 1963 unscheduledPendingPod, 1964 scheduledPendingPod, 1965 scheduledRunningNotReadyPod, 1966 }, 1967 }, 1968 } 1969 1970 for _, test := range tests { 1971 related := test.related 1972 if related == nil { 1973 related = test.pods 1974 } 1975 podsToDelete := getPodsToDelete(test.pods, related, test.diff) 1976 if len(podsToDelete) != len(test.expectedPodsToDelete) { 1977 t.Errorf("%s: unexpected pods to delete, expected %v, got %v", test.name, test.expectedPodsToDelete, podsToDelete) 1978 } 1979 if !reflect.DeepEqual(podsToDelete, test.expectedPodsToDelete) { 1980 t.Errorf("%s: unexpected pods to delete, expected %v, got %v", test.name, test.expectedPodsToDelete, podsToDelete) 1981 } 1982 } 1983} 1984 1985func TestGetPodKeys(t *testing.T) { 1986 labelMap := map[string]string{"name": "foo"} 1987 rs := newReplicaSet(1, labelMap) 1988 pod1 := newPod("pod1", rs, v1.PodRunning, nil, true) 1989 pod2 := newPod("pod2", rs, v1.PodRunning, nil, true) 1990 1991 tests := []struct { 1992 name string 1993 pods []*v1.Pod 1994 expectedPodKeys []string 1995 }{ 1996 { 1997 "len(pods) = 0 (i.e., pods = nil)", 1998 []*v1.Pod{}, 1999 []string{}, 2000 }, 2001 { 2002 "len(pods) > 0", 2003 []*v1.Pod{ 2004 pod1, 2005 pod2, 2006 }, 2007 []string{"default/pod1", "default/pod2"}, 2008 }, 2009 } 2010 2011 for _, test := range tests { 2012 podKeys := getPodKeys(test.pods) 2013 if len(podKeys) != len(test.expectedPodKeys) { 2014 t.Errorf("%s: unexpected keys for pods to delete, expected %v, got %v", test.name, test.expectedPodKeys, podKeys) 2015 } 2016 for i := 0; i < len(podKeys); i++ { 2017 if podKeys[i] != test.expectedPodKeys[i] { 2018 t.Errorf("%s: unexpected keys for pods to delete, expected %v, got %v", test.name, test.expectedPodKeys, podKeys) 2019 } 2020 } 2021 } 2022} 2023