1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package statefulset 18 19import ( 20 "errors" 21 "fmt" 22 "math/rand" 23 "reflect" 24 "runtime" 25 "sort" 26 "strconv" 27 "strings" 28 "testing" 29 "time" 30 31 apps "k8s.io/api/apps/v1" 32 v1 "k8s.io/api/core/v1" 33 apierrors "k8s.io/apimachinery/pkg/api/errors" 34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 35 "k8s.io/apimachinery/pkg/labels" 36 utilerrors "k8s.io/apimachinery/pkg/util/errors" 37 utilfeature "k8s.io/apiserver/pkg/util/feature" 38 "k8s.io/client-go/informers" 39 appsinformers "k8s.io/client-go/informers/apps/v1" 40 coreinformers "k8s.io/client-go/informers/core/v1" 41 clientset "k8s.io/client-go/kubernetes" 42 "k8s.io/client-go/kubernetes/fake" 43 appslisters "k8s.io/client-go/listers/apps/v1" 44 corelisters "k8s.io/client-go/listers/core/v1" 45 "k8s.io/client-go/tools/cache" 46 "k8s.io/client-go/tools/record" 47 featuregatetesting "k8s.io/component-base/featuregate/testing" 48 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 49 "k8s.io/kubernetes/pkg/controller" 50 "k8s.io/kubernetes/pkg/controller/history" 51 "k8s.io/kubernetes/pkg/features" 52) 53 54type invariantFunc func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error 55 56func setupController(client clientset.Interface) (*fakeStatefulPodControl, *fakeStatefulSetStatusUpdater, StatefulSetControlInterface, chan struct{}) { 57 informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) 58 spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().ControllerRevisions()) 59 ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) 60 recorder := record.NewFakeRecorder(10) 61 ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder) 62 63 stop := make(chan struct{}) 64 informerFactory.Start(stop) 65 cache.WaitForCacheSync( 66 stop, 67 informerFactory.Apps().V1().StatefulSets().Informer().HasSynced, 68 informerFactory.Core().V1().Pods().Informer().HasSynced, 69 informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced, 70 ) 71 return spc, ssu, ssc, stop 72} 73 74func burst(set *apps.StatefulSet) *apps.StatefulSet { 75 set.Spec.PodManagementPolicy = apps.ParallelPodManagement 76 return set 77} 78 79func setMinReadySeconds(set *apps.StatefulSet, minReadySeconds int32) *apps.StatefulSet { 80 set.Spec.MinReadySeconds = minReadySeconds 81 return set 82} 83 84func TestStatefulSetControl(t *testing.T) { 85 simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) } 86 largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) } 87 88 testCases := []struct { 89 fn func(*testing.T, *apps.StatefulSet, invariantFunc) 90 obj func() *apps.StatefulSet 91 }{ 92 {CreatesPods, simpleSetFn}, 93 {ScalesUp, simpleSetFn}, 94 {ScalesDown, simpleSetFn}, 95 {ReplacesPods, largeSetFn}, 96 {RecreatesFailedPod, simpleSetFn}, 97 {CreatePodFailure, simpleSetFn}, 98 {UpdatePodFailure, simpleSetFn}, 99 {UpdateSetStatusFailure, simpleSetFn}, 100 {PodRecreateDeleteFailure, simpleSetFn}, 101 } 102 103 for _, testCase := range testCases { 104 fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name() 105 if i := strings.LastIndex(fnName, "."); i != -1 { 106 fnName = fnName[i+1:] 107 } 108 t.Run( 109 fmt.Sprintf("%s/Monotonic", fnName), 110 func(t *testing.T) { 111 testCase.fn(t, testCase.obj(), assertMonotonicInvariants) 112 }, 113 ) 114 t.Run( 115 fmt.Sprintf("%s/Burst", fnName), 116 func(t *testing.T) { 117 set := burst(testCase.obj()) 118 testCase.fn(t, set, assertBurstInvariants) 119 }, 120 ) 121 } 122} 123 124func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 125 client := fake.NewSimpleClientset(set) 126 spc, _, ssc, stop := setupController(client) 127 defer close(stop) 128 129 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 130 t.Errorf("Failed to turn up StatefulSet : %s", err) 131 } 132 var err error 133 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 134 if err != nil { 135 t.Fatalf("Error getting updated StatefulSet: %v", err) 136 } 137 if set.Status.Replicas != 3 { 138 t.Error("Failed to scale statefulset to 3 replicas") 139 } 140 if set.Status.ReadyReplicas != 3 { 141 t.Error("Failed to set ReadyReplicas correctly") 142 } 143 if set.Status.UpdatedReplicas != 3 { 144 t.Error("Failed to set UpdatedReplicas correctly") 145 } 146} 147 148func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 149 client := fake.NewSimpleClientset(set) 150 spc, _, ssc, stop := setupController(client) 151 defer close(stop) 152 153 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 154 t.Errorf("Failed to turn up StatefulSet : %s", err) 155 } 156 *set.Spec.Replicas = 4 157 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 158 t.Errorf("Failed to scale StatefulSet : %s", err) 159 } 160 var err error 161 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 162 if err != nil { 163 t.Fatalf("Error getting updated StatefulSet: %v", err) 164 } 165 if set.Status.Replicas != 4 { 166 t.Error("Failed to scale statefulset to 4 replicas") 167 } 168 if set.Status.ReadyReplicas != 4 { 169 t.Error("Failed to set readyReplicas correctly") 170 } 171 if set.Status.UpdatedReplicas != 4 { 172 t.Error("Failed to set updatedReplicas correctly") 173 } 174} 175 176func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 177 client := fake.NewSimpleClientset(set) 178 spc, _, ssc, stop := setupController(client) 179 defer close(stop) 180 181 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 182 t.Errorf("Failed to turn up StatefulSet : %s", err) 183 } 184 *set.Spec.Replicas = 0 185 if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil { 186 t.Errorf("Failed to scale StatefulSet : %s", err) 187 } 188 if set.Status.Replicas != 0 { 189 t.Error("Failed to scale statefulset to 0 replicas") 190 } 191 if set.Status.ReadyReplicas != 0 { 192 t.Error("Failed to set readyReplicas correctly") 193 } 194 if set.Status.UpdatedReplicas != 0 { 195 t.Error("Failed to set updatedReplicas correctly") 196 } 197} 198 199func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 200 client := fake.NewSimpleClientset(set) 201 spc, _, ssc, stop := setupController(client) 202 defer close(stop) 203 204 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 205 t.Errorf("Failed to turn up StatefulSet : %s", err) 206 } 207 var err error 208 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 209 if err != nil { 210 t.Fatalf("Error getting updated StatefulSet: %v", err) 211 } 212 if set.Status.Replicas != 5 { 213 t.Error("Failed to scale statefulset to 5 replicas") 214 } 215 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 216 if err != nil { 217 t.Error(err) 218 } 219 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 220 if err != nil { 221 t.Error(err) 222 } 223 sort.Sort(ascendingOrdinal(pods)) 224 spc.podsIndexer.Delete(pods[0]) 225 spc.podsIndexer.Delete(pods[2]) 226 spc.podsIndexer.Delete(pods[4]) 227 for i := 0; i < 5; i += 2 { 228 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 229 if err != nil { 230 t.Error(err) 231 } 232 if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { 233 t.Errorf("Failed to update StatefulSet : %s", err) 234 } 235 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 236 if err != nil { 237 t.Fatalf("Error getting updated StatefulSet: %v", err) 238 } 239 if pods, err = spc.setPodRunning(set, i); err != nil { 240 t.Error(err) 241 } 242 if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { 243 t.Errorf("Failed to update StatefulSet : %s", err) 244 } 245 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 246 if err != nil { 247 t.Fatalf("Error getting updated StatefulSet: %v", err) 248 } 249 if _, err = spc.setPodReady(set, i); err != nil { 250 t.Error(err) 251 } 252 } 253 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 254 if err != nil { 255 t.Error(err) 256 } 257 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { 258 t.Errorf("Failed to update StatefulSet : %s", err) 259 } 260 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 261 if err != nil { 262 t.Fatalf("Error getting updated StatefulSet: %v", err) 263 } 264 if e, a := int32(5), set.Status.Replicas; e != a { 265 t.Errorf("Expected to scale to %d, got %d", e, a) 266 } 267} 268 269func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 270 client := fake.NewSimpleClientset() 271 spc, _, ssc, stop := setupController(client) 272 defer close(stop) 273 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 274 if err != nil { 275 t.Error(err) 276 } 277 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 278 if err != nil { 279 t.Error(err) 280 } 281 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { 282 t.Errorf("Error updating StatefulSet %s", err) 283 } 284 if err := invariants(set, spc); err != nil { 285 t.Error(err) 286 } 287 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 288 if err != nil { 289 t.Error(err) 290 } 291 pods[0].Status.Phase = v1.PodFailed 292 spc.podsIndexer.Update(pods[0]) 293 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { 294 t.Errorf("Error updating StatefulSet %s", err) 295 } 296 if err := invariants(set, spc); err != nil { 297 t.Error(err) 298 } 299 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 300 if err != nil { 301 t.Error(err) 302 } 303 if isCreated(pods[0]) { 304 t.Error("StatefulSet did not recreate failed Pod") 305 } 306} 307 308func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 309 client := fake.NewSimpleClientset(set) 310 spc, _, ssc, stop := setupController(client) 311 defer close(stop) 312 spc.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) 313 314 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { 315 t.Errorf("StatefulSetControl did not return InternalError found %s", err) 316 } 317 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 318 t.Errorf("Failed to turn up StatefulSet : %s", err) 319 } 320 var err error 321 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 322 if err != nil { 323 t.Fatalf("Error getting updated StatefulSet: %v", err) 324 } 325 if set.Status.Replicas != 3 { 326 t.Error("Failed to scale StatefulSet to 3 replicas") 327 } 328 if set.Status.ReadyReplicas != 3 { 329 t.Error("Failed to set readyReplicas correctly") 330 } 331 if set.Status.UpdatedReplicas != 3 { 332 t.Error("Failed to updatedReplicas correctly") 333 } 334} 335 336func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 337 client := fake.NewSimpleClientset(set) 338 spc, _, ssc, stop := setupController(client) 339 defer close(stop) 340 spc.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) 341 342 // have to have 1 successful loop first 343 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 344 t.Fatalf("Unexpected error: %v", err) 345 } 346 var err error 347 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 348 if err != nil { 349 t.Fatalf("Error getting updated StatefulSet: %v", err) 350 } 351 if set.Status.Replicas != 3 { 352 t.Error("Failed to scale StatefulSet to 3 replicas") 353 } 354 if set.Status.ReadyReplicas != 3 { 355 t.Error("Failed to set readyReplicas correctly") 356 } 357 if set.Status.UpdatedReplicas != 3 { 358 t.Error("Failed to set updatedReplicas correctly") 359 } 360 361 // now mutate a pod's identity 362 pods, err := spc.podsLister.List(labels.Everything()) 363 if err != nil { 364 t.Fatalf("Error listing pods: %v", err) 365 } 366 if len(pods) != 3 { 367 t.Fatalf("Expected 3 pods, got %d", len(pods)) 368 } 369 sort.Sort(ascendingOrdinal(pods)) 370 pods[0].Name = "goo-0" 371 spc.podsIndexer.Update(pods[0]) 372 373 // now it should fail 374 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { 375 t.Errorf("StatefulSetControl did not return InternalError found %s", err) 376 } 377} 378 379func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 380 client := fake.NewSimpleClientset(set) 381 spc, ssu, ssc, stop := setupController(client) 382 defer close(stop) 383 ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) 384 385 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { 386 t.Errorf("StatefulSetControl did not return InternalError found %s", err) 387 } 388 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 389 t.Errorf("Failed to turn up StatefulSet : %s", err) 390 } 391 var err error 392 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 393 if err != nil { 394 t.Fatalf("Error getting updated StatefulSet: %v", err) 395 } 396 if set.Status.Replicas != 3 { 397 t.Error("Failed to scale StatefulSet to 3 replicas") 398 } 399 if set.Status.ReadyReplicas != 3 { 400 t.Error("Failed to set readyReplicas to 3") 401 } 402 if set.Status.UpdatedReplicas != 3 { 403 t.Error("Failed to set updatedReplicas to 3") 404 } 405} 406 407func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { 408 client := fake.NewSimpleClientset(set) 409 spc, _, ssc, stop := setupController(client) 410 defer close(stop) 411 412 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 413 if err != nil { 414 t.Error(err) 415 } 416 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 417 if err != nil { 418 t.Error(err) 419 } 420 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { 421 t.Errorf("Error updating StatefulSet %s", err) 422 } 423 if err := invariants(set, spc); err != nil { 424 t.Error(err) 425 } 426 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 427 if err != nil { 428 t.Error(err) 429 } 430 pods[0].Status.Phase = v1.PodFailed 431 spc.podsIndexer.Update(pods[0]) 432 spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0) 433 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil && isOrHasInternalError(err) { 434 t.Errorf("StatefulSet failed to %s", err) 435 } 436 if err := invariants(set, spc); err != nil { 437 t.Error(err) 438 } 439 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { 440 t.Errorf("Error updating StatefulSet %s", err) 441 } 442 if err := invariants(set, spc); err != nil { 443 t.Error(err) 444 } 445 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 446 if err != nil { 447 t.Error(err) 448 } 449 if isCreated(pods[0]) { 450 t.Error("StatefulSet did not recreate failed Pod") 451 } 452} 453 454func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { 455 invariants := assertMonotonicInvariants 456 set := newStatefulSet(3) 457 client := fake.NewSimpleClientset(set) 458 spc, _, ssc, stop := setupController(client) 459 defer close(stop) 460 461 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 462 t.Errorf("Failed to turn up StatefulSet : %s", err) 463 } 464 var err error 465 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 466 if err != nil { 467 t.Fatalf("Error getting updated StatefulSet: %v", err) 468 } 469 *set.Spec.Replicas = 0 470 spc.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2) 471 if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil && isOrHasInternalError(err) { 472 t.Errorf("StatefulSetControl failed to throw error on delete %s", err) 473 } 474 if err := scaleDownStatefulSetControl(set, ssc, spc, invariants); err != nil { 475 t.Errorf("Failed to turn down StatefulSet %s", err) 476 } 477 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 478 if err != nil { 479 t.Fatalf("Error getting updated StatefulSet: %v", err) 480 } 481 if set.Status.Replicas != 0 { 482 t.Error("Failed to scale statefulset to 0 replicas") 483 } 484 if set.Status.ReadyReplicas != 0 { 485 t.Error("Failed to set readyReplicas to 0") 486 } 487 if set.Status.UpdatedReplicas != 0 { 488 t.Error("Failed to set updatedReplicas to 0") 489 } 490} 491 492func TestStatefulSetControl_getSetRevisions(t *testing.T) { 493 type testcase struct { 494 name string 495 existing []*apps.ControllerRevision 496 set *apps.StatefulSet 497 expectedCount int 498 expectedCurrent *apps.ControllerRevision 499 expectedUpdate *apps.ControllerRevision 500 err bool 501 } 502 503 testFn := func(test *testcase, t *testing.T) { 504 client := fake.NewSimpleClientset() 505 informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) 506 spc := newFakeStatefulPodControl(informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().StatefulSets(), informerFactory.Apps().V1().ControllerRevisions()) 507 ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets()) 508 recorder := record.NewFakeRecorder(10) 509 ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder} 510 511 stop := make(chan struct{}) 512 defer close(stop) 513 informerFactory.Start(stop) 514 cache.WaitForCacheSync( 515 stop, 516 informerFactory.Apps().V1().StatefulSets().Informer().HasSynced, 517 informerFactory.Core().V1().Pods().Informer().HasSynced, 518 informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced, 519 ) 520 test.set.Status.CollisionCount = new(int32) 521 for i := range test.existing { 522 ssc.controllerHistory.CreateControllerRevision(test.set, test.existing[i], test.set.Status.CollisionCount) 523 } 524 revisions, err := ssc.ListRevisions(test.set) 525 if err != nil { 526 t.Fatal(err) 527 } 528 current, update, _, err := ssc.getStatefulSetRevisions(test.set, revisions) 529 if err != nil { 530 t.Fatalf("error getting statefulset revisions:%v", err) 531 } 532 revisions, err = ssc.ListRevisions(test.set) 533 if err != nil { 534 t.Fatal(err) 535 } 536 if len(revisions) != test.expectedCount { 537 t.Errorf("%s: want %d revisions got %d", test.name, test.expectedCount, len(revisions)) 538 } 539 if test.err && err == nil { 540 t.Errorf("%s: expected error", test.name) 541 } 542 if !test.err && !history.EqualRevision(current, test.expectedCurrent) { 543 t.Errorf("%s: for current want %v got %v", test.name, test.expectedCurrent, current) 544 } 545 if !test.err && !history.EqualRevision(update, test.expectedUpdate) { 546 t.Errorf("%s: for update want %v got %v", test.name, test.expectedUpdate, update) 547 } 548 if !test.err && test.expectedCurrent != nil && current != nil && test.expectedCurrent.Revision != current.Revision { 549 t.Errorf("%s: for current revision want %d got %d", test.name, test.expectedCurrent.Revision, current.Revision) 550 } 551 if !test.err && test.expectedUpdate != nil && update != nil && test.expectedUpdate.Revision != update.Revision { 552 t.Errorf("%s: for update revision want %d got %d", test.name, test.expectedUpdate.Revision, update.Revision) 553 } 554 } 555 556 updateRevision := func(cr *apps.ControllerRevision, revision int64) *apps.ControllerRevision { 557 clone := cr.DeepCopy() 558 clone.Revision = revision 559 return clone 560 } 561 562 set := newStatefulSet(3) 563 set.Status.CollisionCount = new(int32) 564 rev0 := newRevisionOrDie(set, 1) 565 set1 := set.DeepCopy() 566 set1.Spec.Template.Spec.Containers[0].Image = "foo" 567 set1.Status.CurrentRevision = rev0.Name 568 set1.Status.CollisionCount = new(int32) 569 rev1 := newRevisionOrDie(set1, 2) 570 set2 := set1.DeepCopy() 571 set2.Spec.Template.Labels["new"] = "label" 572 set2.Status.CurrentRevision = rev0.Name 573 set2.Status.CollisionCount = new(int32) 574 rev2 := newRevisionOrDie(set2, 3) 575 tests := []testcase{ 576 { 577 name: "creates initial revision", 578 existing: nil, 579 set: set, 580 expectedCount: 1, 581 expectedCurrent: rev0, 582 expectedUpdate: rev0, 583 err: false, 584 }, 585 { 586 name: "creates revision on update", 587 existing: []*apps.ControllerRevision{rev0}, 588 set: set1, 589 expectedCount: 2, 590 expectedCurrent: rev0, 591 expectedUpdate: rev1, 592 err: false, 593 }, 594 { 595 name: "must not recreate a new revision of same set", 596 existing: []*apps.ControllerRevision{rev0, rev1}, 597 set: set1, 598 expectedCount: 2, 599 expectedCurrent: rev0, 600 expectedUpdate: rev1, 601 err: false, 602 }, 603 { 604 name: "must rollback to a previous revision", 605 existing: []*apps.ControllerRevision{rev0, rev1, rev2}, 606 set: set1, 607 expectedCount: 3, 608 expectedCurrent: rev0, 609 expectedUpdate: updateRevision(rev1, 4), 610 err: false, 611 }, 612 } 613 for i := range tests { 614 testFn(&tests[i], t) 615 } 616} 617 618func TestStatefulSetControlRollingUpdate(t *testing.T) { 619 type testcase struct { 620 name string 621 invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error 622 initial func() *apps.StatefulSet 623 update func(set *apps.StatefulSet) *apps.StatefulSet 624 validate func(set *apps.StatefulSet, pods []*v1.Pod) error 625 } 626 627 testFn := func(test *testcase, t *testing.T) { 628 set := test.initial() 629 client := fake.NewSimpleClientset(set) 630 spc, _, ssc, stop := setupController(client) 631 defer close(stop) 632 if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil { 633 t.Fatalf("%s: %s", test.name, err) 634 } 635 set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 636 if err != nil { 637 t.Fatalf("%s: %s", test.name, err) 638 } 639 set = test.update(set) 640 if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil { 641 t.Fatalf("%s: %s", test.name, err) 642 } 643 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 644 if err != nil { 645 t.Fatalf("%s: %s", test.name, err) 646 } 647 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 648 if err != nil { 649 t.Fatalf("%s: %s", test.name, err) 650 } 651 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 652 if err != nil { 653 t.Fatalf("%s: %s", test.name, err) 654 } 655 if err := test.validate(set, pods); err != nil { 656 t.Fatalf("%s: %s", test.name, err) 657 } 658 } 659 660 tests := []testcase{ 661 { 662 name: "monotonic image update", 663 invariants: assertMonotonicInvariants, 664 initial: func() *apps.StatefulSet { 665 return newStatefulSet(3) 666 }, 667 update: func(set *apps.StatefulSet) *apps.StatefulSet { 668 set.Spec.Template.Spec.Containers[0].Image = "foo" 669 return set 670 }, 671 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 672 sort.Sort(ascendingOrdinal(pods)) 673 for i := range pods { 674 if pods[i].Spec.Containers[0].Image != "foo" { 675 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 676 } 677 } 678 return nil 679 }, 680 }, 681 { 682 name: "monotonic image update and scale up", 683 invariants: assertMonotonicInvariants, 684 initial: func() *apps.StatefulSet { 685 return newStatefulSet(3) 686 }, 687 update: func(set *apps.StatefulSet) *apps.StatefulSet { 688 *set.Spec.Replicas = 5 689 set.Spec.Template.Spec.Containers[0].Image = "foo" 690 return set 691 }, 692 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 693 sort.Sort(ascendingOrdinal(pods)) 694 for i := range pods { 695 if pods[i].Spec.Containers[0].Image != "foo" { 696 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 697 } 698 } 699 return nil 700 }, 701 }, 702 { 703 name: "monotonic image update and scale down", 704 invariants: assertMonotonicInvariants, 705 initial: func() *apps.StatefulSet { 706 return newStatefulSet(5) 707 }, 708 update: func(set *apps.StatefulSet) *apps.StatefulSet { 709 *set.Spec.Replicas = 3 710 set.Spec.Template.Spec.Containers[0].Image = "foo" 711 return set 712 }, 713 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 714 sort.Sort(ascendingOrdinal(pods)) 715 for i := range pods { 716 if pods[i].Spec.Containers[0].Image != "foo" { 717 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 718 } 719 } 720 return nil 721 }, 722 }, 723 { 724 name: "burst image update", 725 invariants: assertBurstInvariants, 726 initial: func() *apps.StatefulSet { 727 return burst(newStatefulSet(3)) 728 }, 729 update: func(set *apps.StatefulSet) *apps.StatefulSet { 730 set.Spec.Template.Spec.Containers[0].Image = "foo" 731 return set 732 }, 733 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 734 sort.Sort(ascendingOrdinal(pods)) 735 for i := range pods { 736 if pods[i].Spec.Containers[0].Image != "foo" { 737 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 738 } 739 } 740 return nil 741 }, 742 }, 743 { 744 name: "burst image update and scale up", 745 invariants: assertBurstInvariants, 746 initial: func() *apps.StatefulSet { 747 return burst(newStatefulSet(3)) 748 }, 749 update: func(set *apps.StatefulSet) *apps.StatefulSet { 750 *set.Spec.Replicas = 5 751 set.Spec.Template.Spec.Containers[0].Image = "foo" 752 return set 753 }, 754 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 755 sort.Sort(ascendingOrdinal(pods)) 756 for i := range pods { 757 if pods[i].Spec.Containers[0].Image != "foo" { 758 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 759 } 760 } 761 return nil 762 }, 763 }, 764 { 765 name: "burst image update and scale down", 766 invariants: assertBurstInvariants, 767 initial: func() *apps.StatefulSet { 768 return burst(newStatefulSet(5)) 769 }, 770 update: func(set *apps.StatefulSet) *apps.StatefulSet { 771 *set.Spec.Replicas = 3 772 set.Spec.Template.Spec.Containers[0].Image = "foo" 773 return set 774 }, 775 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 776 sort.Sort(ascendingOrdinal(pods)) 777 for i := range pods { 778 if pods[i].Spec.Containers[0].Image != "foo" { 779 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 780 } 781 } 782 return nil 783 }, 784 }, 785 } 786 for i := range tests { 787 testFn(&tests[i], t) 788 } 789} 790 791func TestStatefulSetControlOnDeleteUpdate(t *testing.T) { 792 type testcase struct { 793 name string 794 invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error 795 initial func() *apps.StatefulSet 796 update func(set *apps.StatefulSet) *apps.StatefulSet 797 validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error 798 validateRestart func(set *apps.StatefulSet, pods []*v1.Pod) error 799 } 800 801 originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image 802 803 testFn := func(test *testcase, t *testing.T) { 804 set := test.initial() 805 set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{Type: apps.OnDeleteStatefulSetStrategyType} 806 client := fake.NewSimpleClientset(set) 807 spc, _, ssc, stop := setupController(client) 808 defer close(stop) 809 if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil { 810 t.Fatalf("%s: %s", test.name, err) 811 } 812 set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 813 if err != nil { 814 t.Fatalf("%s: %s", test.name, err) 815 } 816 set = test.update(set) 817 if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil { 818 t.Fatalf("%s: %s", test.name, err) 819 } 820 821 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 822 if err != nil { 823 t.Fatalf("%s: %s", test.name, err) 824 } 825 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 826 if err != nil { 827 t.Fatalf("%s: %s", test.name, err) 828 } 829 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 830 if err != nil { 831 t.Fatalf("%s: %s", test.name, err) 832 } 833 if err := test.validateUpdate(set, pods); err != nil { 834 for i := range pods { 835 t.Log(pods[i].Name) 836 } 837 t.Fatalf("%s: %s", test.name, err) 838 839 } 840 replicas := *set.Spec.Replicas 841 *set.Spec.Replicas = 0 842 if err := scaleDownStatefulSetControl(set, ssc, spc, test.invariants); err != nil { 843 t.Fatalf("%s: %s", test.name, err) 844 } 845 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 846 if err != nil { 847 t.Fatalf("%s: %s", test.name, err) 848 } 849 *set.Spec.Replicas = replicas 850 if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil { 851 t.Fatalf("%s: %s", test.name, err) 852 } 853 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 854 if err != nil { 855 t.Fatalf("%s: %s", test.name, err) 856 } 857 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 858 if err != nil { 859 t.Fatalf("%s: %s", test.name, err) 860 } 861 if err := test.validateRestart(set, pods); err != nil { 862 t.Fatalf("%s: %s", test.name, err) 863 } 864 } 865 866 tests := []testcase{ 867 { 868 name: "monotonic image update", 869 invariants: assertMonotonicInvariants, 870 initial: func() *apps.StatefulSet { 871 return newStatefulSet(3) 872 }, 873 update: func(set *apps.StatefulSet) *apps.StatefulSet { 874 set.Spec.Template.Spec.Containers[0].Image = "foo" 875 return set 876 }, 877 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 878 sort.Sort(ascendingOrdinal(pods)) 879 for i := range pods { 880 if pods[i].Spec.Containers[0].Image != originalImage { 881 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 882 } 883 } 884 return nil 885 }, 886 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error { 887 sort.Sort(ascendingOrdinal(pods)) 888 for i := range pods { 889 if pods[i].Spec.Containers[0].Image != "foo" { 890 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 891 } 892 } 893 return nil 894 }, 895 }, 896 { 897 name: "monotonic image update and scale up", 898 invariants: assertMonotonicInvariants, 899 initial: func() *apps.StatefulSet { 900 return newStatefulSet(3) 901 }, 902 update: func(set *apps.StatefulSet) *apps.StatefulSet { 903 *set.Spec.Replicas = 5 904 set.Spec.Template.Spec.Containers[0].Image = "foo" 905 return set 906 }, 907 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 908 sort.Sort(ascendingOrdinal(pods)) 909 for i := range pods { 910 if i < 3 && pods[i].Spec.Containers[0].Image != originalImage { 911 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 912 } 913 if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" { 914 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 915 } 916 } 917 return nil 918 }, 919 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error { 920 sort.Sort(ascendingOrdinal(pods)) 921 for i := range pods { 922 if pods[i].Spec.Containers[0].Image != "foo" { 923 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 924 } 925 } 926 return nil 927 }, 928 }, 929 { 930 name: "monotonic image update and scale down", 931 invariants: assertMonotonicInvariants, 932 initial: func() *apps.StatefulSet { 933 return newStatefulSet(5) 934 }, 935 update: func(set *apps.StatefulSet) *apps.StatefulSet { 936 *set.Spec.Replicas = 3 937 set.Spec.Template.Spec.Containers[0].Image = "foo" 938 return set 939 }, 940 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 941 sort.Sort(ascendingOrdinal(pods)) 942 for i := range pods { 943 if pods[i].Spec.Containers[0].Image != originalImage { 944 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 945 } 946 } 947 return nil 948 }, 949 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error { 950 sort.Sort(ascendingOrdinal(pods)) 951 for i := range pods { 952 if pods[i].Spec.Containers[0].Image != "foo" { 953 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 954 } 955 } 956 return nil 957 }, 958 }, 959 { 960 name: "burst image update", 961 invariants: assertBurstInvariants, 962 initial: func() *apps.StatefulSet { 963 return burst(newStatefulSet(3)) 964 }, 965 update: func(set *apps.StatefulSet) *apps.StatefulSet { 966 set.Spec.Template.Spec.Containers[0].Image = "foo" 967 return set 968 }, 969 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 970 sort.Sort(ascendingOrdinal(pods)) 971 for i := range pods { 972 if pods[i].Spec.Containers[0].Image != originalImage { 973 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 974 } 975 } 976 return nil 977 }, 978 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error { 979 sort.Sort(ascendingOrdinal(pods)) 980 for i := range pods { 981 if pods[i].Spec.Containers[0].Image != "foo" { 982 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 983 } 984 } 985 return nil 986 }, 987 }, 988 { 989 name: "burst image update and scale up", 990 invariants: assertBurstInvariants, 991 initial: func() *apps.StatefulSet { 992 return burst(newStatefulSet(3)) 993 }, 994 update: func(set *apps.StatefulSet) *apps.StatefulSet { 995 *set.Spec.Replicas = 5 996 set.Spec.Template.Spec.Containers[0].Image = "foo" 997 return set 998 }, 999 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1000 sort.Sort(ascendingOrdinal(pods)) 1001 for i := range pods { 1002 if i < 3 && pods[i].Spec.Containers[0].Image != originalImage { 1003 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1004 } 1005 if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" { 1006 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1007 } 1008 } 1009 return nil 1010 }, 1011 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1012 sort.Sort(ascendingOrdinal(pods)) 1013 for i := range pods { 1014 if pods[i].Spec.Containers[0].Image != "foo" { 1015 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1016 } 1017 } 1018 return nil 1019 }, 1020 }, 1021 { 1022 name: "burst image update and scale down", 1023 invariants: assertBurstInvariants, 1024 initial: func() *apps.StatefulSet { 1025 return burst(newStatefulSet(5)) 1026 }, 1027 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1028 *set.Spec.Replicas = 3 1029 set.Spec.Template.Spec.Containers[0].Image = "foo" 1030 return set 1031 }, 1032 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1033 sort.Sort(ascendingOrdinal(pods)) 1034 for i := range pods { 1035 if pods[i].Spec.Containers[0].Image != originalImage { 1036 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1037 } 1038 } 1039 return nil 1040 }, 1041 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1042 sort.Sort(ascendingOrdinal(pods)) 1043 for i := range pods { 1044 if pods[i].Spec.Containers[0].Image != "foo" { 1045 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1046 } 1047 } 1048 return nil 1049 }, 1050 }, 1051 } 1052 for i := range tests { 1053 testFn(&tests[i], t) 1054 } 1055} 1056 1057func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) { 1058 type testcase struct { 1059 name string 1060 partition int32 1061 invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error 1062 initial func() *apps.StatefulSet 1063 update func(set *apps.StatefulSet) *apps.StatefulSet 1064 validate func(set *apps.StatefulSet, pods []*v1.Pod) error 1065 } 1066 1067 testFn := func(test *testcase, t *testing.T) { 1068 set := test.initial() 1069 set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ 1070 Type: apps.RollingUpdateStatefulSetStrategyType, 1071 RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy { 1072 return &apps.RollingUpdateStatefulSetStrategy{Partition: &test.partition} 1073 }(), 1074 } 1075 client := fake.NewSimpleClientset(set) 1076 spc, _, ssc, stop := setupController(client) 1077 defer close(stop) 1078 if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil { 1079 t.Fatalf("%s: %s", test.name, err) 1080 } 1081 set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1082 if err != nil { 1083 t.Fatalf("%s: %s", test.name, err) 1084 } 1085 set = test.update(set) 1086 if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil { 1087 t.Fatalf("%s: %s", test.name, err) 1088 } 1089 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1090 if err != nil { 1091 t.Fatalf("%s: %s", test.name, err) 1092 } 1093 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1094 if err != nil { 1095 t.Fatalf("%s: %s", test.name, err) 1096 } 1097 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1098 if err != nil { 1099 t.Fatalf("%s: %s", test.name, err) 1100 } 1101 if err := test.validate(set, pods); err != nil { 1102 t.Fatalf("%s: %s", test.name, err) 1103 } 1104 } 1105 1106 originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image 1107 1108 tests := []testcase{ 1109 { 1110 name: "monotonic image update", 1111 invariants: assertMonotonicInvariants, 1112 partition: 2, 1113 initial: func() *apps.StatefulSet { 1114 return newStatefulSet(3) 1115 }, 1116 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1117 set.Spec.Template.Spec.Containers[0].Image = "foo" 1118 return set 1119 }, 1120 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1121 sort.Sort(ascendingOrdinal(pods)) 1122 for i := range pods { 1123 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage { 1124 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1125 } 1126 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" { 1127 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1128 } 1129 } 1130 return nil 1131 }, 1132 }, 1133 { 1134 name: "monotonic image update and scale up", 1135 partition: 2, 1136 invariants: assertMonotonicInvariants, 1137 initial: func() *apps.StatefulSet { 1138 return newStatefulSet(3) 1139 }, 1140 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1141 *set.Spec.Replicas = 5 1142 set.Spec.Template.Spec.Containers[0].Image = "foo" 1143 return set 1144 }, 1145 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1146 sort.Sort(ascendingOrdinal(pods)) 1147 for i := range pods { 1148 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage { 1149 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1150 } 1151 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" { 1152 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1153 } 1154 } 1155 return nil 1156 }, 1157 }, 1158 { 1159 name: "burst image update", 1160 partition: 2, 1161 invariants: assertBurstInvariants, 1162 initial: func() *apps.StatefulSet { 1163 return burst(newStatefulSet(3)) 1164 }, 1165 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1166 set.Spec.Template.Spec.Containers[0].Image = "foo" 1167 return set 1168 }, 1169 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1170 sort.Sort(ascendingOrdinal(pods)) 1171 for i := range pods { 1172 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage { 1173 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1174 } 1175 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" { 1176 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1177 } 1178 } 1179 return nil 1180 }, 1181 }, 1182 { 1183 name: "burst image update and scale up", 1184 invariants: assertBurstInvariants, 1185 partition: 2, 1186 initial: func() *apps.StatefulSet { 1187 return burst(newStatefulSet(3)) 1188 }, 1189 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1190 *set.Spec.Replicas = 5 1191 set.Spec.Template.Spec.Containers[0].Image = "foo" 1192 return set 1193 }, 1194 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1195 sort.Sort(ascendingOrdinal(pods)) 1196 for i := range pods { 1197 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage { 1198 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1199 } 1200 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" { 1201 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1202 } 1203 } 1204 return nil 1205 }, 1206 }, 1207 } 1208 for i := range tests { 1209 testFn(&tests[i], t) 1210 } 1211} 1212 1213func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) { 1214 invariants := assertMonotonicInvariants 1215 set := newStatefulSet(3) 1216 client := fake.NewSimpleClientset(set) 1217 spc, ssu, ssc, stop := setupController(client) 1218 defer close(stop) 1219 1220 if err := scaleUpStatefulSetControl(set, ssc, spc, invariants); err != nil { 1221 t.Errorf("Failed to turn up StatefulSet : %s", err) 1222 } 1223 var err error 1224 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1225 if err != nil { 1226 t.Fatalf("Error getting updated StatefulSet: %v", err) 1227 } 1228 1229 for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ { 1230 set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i) 1231 ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2) 1232 updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants) 1233 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1234 if err != nil { 1235 t.Fatalf("Error getting updated StatefulSet: %v", err) 1236 } 1237 revisions, err := ssc.ListRevisions(set) 1238 if err != nil { 1239 t.Fatalf("Error listing revisions: %v", err) 1240 } 1241 // the extra 2 revisions are `currentRevision` and `updateRevision` 1242 // They're considered as `live`, and truncateHistory only cleans up non-live revisions 1243 if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 { 1244 t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit) 1245 } 1246 } 1247} 1248 1249func TestStatefulSetControlLimitsHistory(t *testing.T) { 1250 type testcase struct { 1251 name string 1252 invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error 1253 initial func() *apps.StatefulSet 1254 } 1255 1256 testFn := func(test *testcase, t *testing.T) { 1257 set := test.initial() 1258 client := fake.NewSimpleClientset(set) 1259 spc, _, ssc, stop := setupController(client) 1260 defer close(stop) 1261 if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil { 1262 t.Fatalf("%s: %s", test.name, err) 1263 } 1264 set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1265 if err != nil { 1266 t.Fatalf("%s: %s", test.name, err) 1267 } 1268 for i := 0; i < 10; i++ { 1269 set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i) 1270 if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil { 1271 t.Fatalf("%s: %s", test.name, err) 1272 } 1273 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1274 if err != nil { 1275 t.Fatalf("%s: %s", test.name, err) 1276 } 1277 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1278 if err != nil { 1279 t.Fatalf("%s: %s", test.name, err) 1280 } 1281 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1282 if err != nil { 1283 t.Fatalf("%s: %s", test.name, err) 1284 } 1285 _, err = ssc.UpdateStatefulSet(set, pods) 1286 if err != nil { 1287 t.Fatalf("%s: %s", test.name, err) 1288 } 1289 revisions, err := ssc.ListRevisions(set) 1290 if err != nil { 1291 t.Fatalf("%s: %s", test.name, err) 1292 } 1293 if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 { 1294 t.Fatalf("%s: %d greater than limit %d", test.name, len(revisions), *set.Spec.RevisionHistoryLimit) 1295 } 1296 } 1297 } 1298 1299 tests := []testcase{ 1300 { 1301 name: "monotonic update", 1302 invariants: assertMonotonicInvariants, 1303 initial: func() *apps.StatefulSet { 1304 return newStatefulSet(3) 1305 }, 1306 }, 1307 { 1308 name: "burst update", 1309 invariants: assertBurstInvariants, 1310 initial: func() *apps.StatefulSet { 1311 return burst(newStatefulSet(3)) 1312 }, 1313 }, 1314 } 1315 for i := range tests { 1316 testFn(&tests[i], t) 1317 } 1318} 1319 1320func TestStatefulSetControlRollback(t *testing.T) { 1321 type testcase struct { 1322 name string 1323 invariants func(set *apps.StatefulSet, spc *fakeStatefulPodControl) error 1324 initial func() *apps.StatefulSet 1325 update func(set *apps.StatefulSet) *apps.StatefulSet 1326 validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error 1327 validateRollback func(set *apps.StatefulSet, pods []*v1.Pod) error 1328 } 1329 1330 originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image 1331 1332 testFn := func(test *testcase, t *testing.T) { 1333 set := test.initial() 1334 client := fake.NewSimpleClientset(set) 1335 spc, _, ssc, stop := setupController(client) 1336 defer close(stop) 1337 if err := scaleUpStatefulSetControl(set, ssc, spc, test.invariants); err != nil { 1338 t.Fatalf("%s: %s", test.name, err) 1339 } 1340 set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1341 if err != nil { 1342 t.Fatalf("%s: %s", test.name, err) 1343 } 1344 set = test.update(set) 1345 if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil { 1346 t.Fatalf("%s: %s", test.name, err) 1347 } 1348 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1349 if err != nil { 1350 t.Fatalf("%s: %s", test.name, err) 1351 } 1352 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1353 if err != nil { 1354 t.Fatalf("%s: %s", test.name, err) 1355 } 1356 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1357 if err != nil { 1358 t.Fatalf("%s: %s", test.name, err) 1359 } 1360 if err := test.validateUpdate(set, pods); err != nil { 1361 t.Fatalf("%s: %s", test.name, err) 1362 } 1363 revisions, err := ssc.ListRevisions(set) 1364 if err != nil { 1365 t.Fatalf("%s: %s", test.name, err) 1366 } 1367 history.SortControllerRevisions(revisions) 1368 set, err = ApplyRevision(set, revisions[0]) 1369 if err != nil { 1370 t.Fatalf("%s: %s", test.name, err) 1371 } 1372 if err := updateStatefulSetControl(set, ssc, spc, assertUpdateInvariants); err != nil { 1373 t.Fatalf("%s: %s", test.name, err) 1374 } 1375 if err != nil { 1376 t.Fatalf("%s: %s", test.name, err) 1377 } 1378 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 1379 if err != nil { 1380 t.Fatalf("%s: %s", test.name, err) 1381 } 1382 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1383 if err != nil { 1384 t.Fatalf("%s: %s", test.name, err) 1385 } 1386 if err := test.validateRollback(set, pods); err != nil { 1387 t.Fatalf("%s: %s", test.name, err) 1388 } 1389 } 1390 1391 tests := []testcase{ 1392 { 1393 name: "monotonic image update", 1394 invariants: assertMonotonicInvariants, 1395 initial: func() *apps.StatefulSet { 1396 return newStatefulSet(3) 1397 }, 1398 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1399 set.Spec.Template.Spec.Containers[0].Image = "foo" 1400 return set 1401 }, 1402 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1403 sort.Sort(ascendingOrdinal(pods)) 1404 for i := range pods { 1405 if pods[i].Spec.Containers[0].Image != "foo" { 1406 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1407 } 1408 } 1409 return nil 1410 }, 1411 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1412 sort.Sort(ascendingOrdinal(pods)) 1413 for i := range pods { 1414 if pods[i].Spec.Containers[0].Image != originalImage { 1415 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1416 } 1417 } 1418 return nil 1419 }, 1420 }, 1421 { 1422 name: "monotonic image update and scale up", 1423 invariants: assertMonotonicInvariants, 1424 initial: func() *apps.StatefulSet { 1425 return newStatefulSet(3) 1426 }, 1427 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1428 *set.Spec.Replicas = 5 1429 set.Spec.Template.Spec.Containers[0].Image = "foo" 1430 return set 1431 }, 1432 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1433 sort.Sort(ascendingOrdinal(pods)) 1434 for i := range pods { 1435 if pods[i].Spec.Containers[0].Image != "foo" { 1436 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1437 } 1438 } 1439 return nil 1440 }, 1441 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1442 sort.Sort(ascendingOrdinal(pods)) 1443 for i := range pods { 1444 if pods[i].Spec.Containers[0].Image != originalImage { 1445 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1446 } 1447 } 1448 return nil 1449 }, 1450 }, 1451 { 1452 name: "monotonic image update and scale down", 1453 invariants: assertMonotonicInvariants, 1454 initial: func() *apps.StatefulSet { 1455 return newStatefulSet(5) 1456 }, 1457 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1458 *set.Spec.Replicas = 3 1459 set.Spec.Template.Spec.Containers[0].Image = "foo" 1460 return set 1461 }, 1462 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1463 sort.Sort(ascendingOrdinal(pods)) 1464 for i := range pods { 1465 if pods[i].Spec.Containers[0].Image != "foo" { 1466 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1467 } 1468 } 1469 return nil 1470 }, 1471 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1472 sort.Sort(ascendingOrdinal(pods)) 1473 for i := range pods { 1474 if pods[i].Spec.Containers[0].Image != originalImage { 1475 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1476 } 1477 } 1478 return nil 1479 }, 1480 }, 1481 { 1482 name: "burst image update", 1483 invariants: assertBurstInvariants, 1484 initial: func() *apps.StatefulSet { 1485 return burst(newStatefulSet(3)) 1486 }, 1487 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1488 set.Spec.Template.Spec.Containers[0].Image = "foo" 1489 return set 1490 }, 1491 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1492 sort.Sort(ascendingOrdinal(pods)) 1493 for i := range pods { 1494 if pods[i].Spec.Containers[0].Image != "foo" { 1495 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1496 } 1497 } 1498 return nil 1499 }, 1500 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1501 sort.Sort(ascendingOrdinal(pods)) 1502 for i := range pods { 1503 if pods[i].Spec.Containers[0].Image != originalImage { 1504 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1505 } 1506 } 1507 return nil 1508 }, 1509 }, 1510 { 1511 name: "burst image update and scale up", 1512 invariants: assertBurstInvariants, 1513 initial: func() *apps.StatefulSet { 1514 return burst(newStatefulSet(3)) 1515 }, 1516 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1517 *set.Spec.Replicas = 5 1518 set.Spec.Template.Spec.Containers[0].Image = "foo" 1519 return set 1520 }, 1521 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1522 sort.Sort(ascendingOrdinal(pods)) 1523 for i := range pods { 1524 if pods[i].Spec.Containers[0].Image != "foo" { 1525 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1526 } 1527 } 1528 return nil 1529 }, 1530 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1531 sort.Sort(ascendingOrdinal(pods)) 1532 for i := range pods { 1533 if pods[i].Spec.Containers[0].Image != originalImage { 1534 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1535 } 1536 } 1537 return nil 1538 }, 1539 }, 1540 { 1541 name: "burst image update and scale down", 1542 invariants: assertBurstInvariants, 1543 initial: func() *apps.StatefulSet { 1544 return burst(newStatefulSet(5)) 1545 }, 1546 update: func(set *apps.StatefulSet) *apps.StatefulSet { 1547 *set.Spec.Replicas = 3 1548 set.Spec.Template.Spec.Containers[0].Image = "foo" 1549 return set 1550 }, 1551 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1552 sort.Sort(ascendingOrdinal(pods)) 1553 for i := range pods { 1554 if pods[i].Spec.Containers[0].Image != "foo" { 1555 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image) 1556 } 1557 } 1558 return nil 1559 }, 1560 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error { 1561 sort.Sort(ascendingOrdinal(pods)) 1562 for i := range pods { 1563 if pods[i].Spec.Containers[0].Image != originalImage { 1564 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image) 1565 } 1566 } 1567 return nil 1568 }, 1569 }, 1570 } 1571 for i := range tests { 1572 testFn(&tests[i], t) 1573 } 1574} 1575 1576func TestStatefulSetAvailability(t *testing.T) { 1577 tests := []struct { 1578 name string 1579 inputSTS *apps.StatefulSet 1580 expectedActiveReplicas int32 1581 readyDuration time.Duration 1582 minReadySecondsFeaturegateEnabled bool 1583 }{ 1584 { 1585 name: "replicas not running for required time, still will be available," + 1586 " when minReadySeconds is disabled", 1587 inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)), 1588 readyDuration: 0 * time.Minute, 1589 expectedActiveReplicas: int32(1), 1590 minReadySecondsFeaturegateEnabled: false, 1591 }, 1592 { 1593 name: "replicas running for required time, when minReadySeconds is enabled", 1594 inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)), 1595 readyDuration: -120 * time.Minute, 1596 expectedActiveReplicas: int32(1), 1597 minReadySecondsFeaturegateEnabled: true, 1598 }, 1599 { 1600 name: "replicas not running for required time, when minReadySeconds is enabled", 1601 inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)), 1602 readyDuration: -30 * time.Minute, 1603 expectedActiveReplicas: int32(0), 1604 minReadySecondsFeaturegateEnabled: true, 1605 }, 1606 } 1607 for _, test := range tests { 1608 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, test.minReadySecondsFeaturegateEnabled)() 1609 set := test.inputSTS 1610 client := fake.NewSimpleClientset(set) 1611 spc, _, ssc, stop := setupController(client) 1612 defer close(stop) 1613 if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil { 1614 t.Fatalf("%s: %s", test.name, err) 1615 } 1616 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1617 if err != nil { 1618 t.Fatalf("%s: %s", test.name, err) 1619 } 1620 _, err = spc.podsLister.Pods(set.Namespace).List(selector) 1621 if err != nil { 1622 t.Fatalf("%s: %s", test.name, err) 1623 } 1624 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 1625 if err != nil { 1626 t.Fatalf("%s: %s", test.name, err) 1627 } 1628 pods, err := spc.setPodAvailable(set, 0, time.Now().Add(test.readyDuration)) 1629 if err != nil { 1630 t.Fatalf("%s: %s", test.name, err) 1631 } 1632 status, err := ssc.UpdateStatefulSet(set, pods) 1633 if err != nil { 1634 t.Fatalf("%s: %s", test.name, err) 1635 } 1636 if status.AvailableReplicas != test.expectedActiveReplicas { 1637 t.Fatalf("expected %d active replicas got %d", test.expectedActiveReplicas, status.AvailableReplicas) 1638 } 1639 } 1640} 1641 1642type requestTracker struct { 1643 requests int 1644 err error 1645 after int 1646} 1647 1648func (rt *requestTracker) errorReady() bool { 1649 return rt.err != nil && rt.requests >= rt.after 1650} 1651 1652func (rt *requestTracker) inc() { 1653 rt.requests++ 1654} 1655 1656func (rt *requestTracker) reset() { 1657 rt.err = nil 1658 rt.after = 0 1659} 1660 1661type fakeStatefulPodControl struct { 1662 podsLister corelisters.PodLister 1663 claimsLister corelisters.PersistentVolumeClaimLister 1664 setsLister appslisters.StatefulSetLister 1665 podsIndexer cache.Indexer 1666 claimsIndexer cache.Indexer 1667 setsIndexer cache.Indexer 1668 revisionsIndexer cache.Indexer 1669 createPodTracker requestTracker 1670 updatePodTracker requestTracker 1671 deletePodTracker requestTracker 1672} 1673 1674func newFakeStatefulPodControl(podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer, revisionInformer appsinformers.ControllerRevisionInformer) *fakeStatefulPodControl { 1675 claimsIndexer := cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) 1676 return &fakeStatefulPodControl{ 1677 podInformer.Lister(), 1678 corelisters.NewPersistentVolumeClaimLister(claimsIndexer), 1679 setInformer.Lister(), 1680 podInformer.Informer().GetIndexer(), 1681 claimsIndexer, 1682 setInformer.Informer().GetIndexer(), 1683 revisionInformer.Informer().GetIndexer(), 1684 requestTracker{0, nil, 0}, 1685 requestTracker{0, nil, 0}, 1686 requestTracker{0, nil, 0}} 1687} 1688 1689func (spc *fakeStatefulPodControl) SetCreateStatefulPodError(err error, after int) { 1690 spc.createPodTracker.err = err 1691 spc.createPodTracker.after = after 1692} 1693 1694func (spc *fakeStatefulPodControl) SetUpdateStatefulPodError(err error, after int) { 1695 spc.updatePodTracker.err = err 1696 spc.updatePodTracker.after = after 1697} 1698 1699func (spc *fakeStatefulPodControl) SetDeleteStatefulPodError(err error, after int) { 1700 spc.deletePodTracker.err = err 1701 spc.deletePodTracker.after = after 1702} 1703 1704func (spc *fakeStatefulPodControl) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { 1705 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1706 if err != nil { 1707 return nil, err 1708 } 1709 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1710 if err != nil { 1711 return nil, err 1712 } 1713 if 0 > ordinal || ordinal >= len(pods) { 1714 return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) 1715 } 1716 sort.Sort(ascendingOrdinal(pods)) 1717 pod := pods[ordinal].DeepCopy() 1718 pod.Status.Phase = v1.PodPending 1719 fakeResourceVersion(pod) 1720 spc.podsIndexer.Update(pod) 1721 return spc.podsLister.Pods(set.Namespace).List(selector) 1722} 1723 1724func (spc *fakeStatefulPodControl) setPodRunning(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { 1725 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1726 if err != nil { 1727 return nil, err 1728 } 1729 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1730 if err != nil { 1731 return nil, err 1732 } 1733 if 0 > ordinal || ordinal >= len(pods) { 1734 return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) 1735 } 1736 sort.Sort(ascendingOrdinal(pods)) 1737 pod := pods[ordinal].DeepCopy() 1738 pod.Status.Phase = v1.PodRunning 1739 fakeResourceVersion(pod) 1740 spc.podsIndexer.Update(pod) 1741 return spc.podsLister.Pods(set.Namespace).List(selector) 1742} 1743 1744func (spc *fakeStatefulPodControl) setPodReady(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { 1745 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1746 if err != nil { 1747 return nil, err 1748 } 1749 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1750 if err != nil { 1751 return nil, err 1752 } 1753 if 0 > ordinal || ordinal >= len(pods) { 1754 return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) 1755 } 1756 sort.Sort(ascendingOrdinal(pods)) 1757 pod := pods[ordinal].DeepCopy() 1758 condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} 1759 podutil.UpdatePodCondition(&pod.Status, &condition) 1760 fakeResourceVersion(pod) 1761 spc.podsIndexer.Update(pod) 1762 return spc.podsLister.Pods(set.Namespace).List(selector) 1763} 1764 1765func (spc *fakeStatefulPodControl) setPodAvailable(set *apps.StatefulSet, ordinal int, lastTransitionTime time.Time) ([]*v1.Pod, error) { 1766 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1767 if err != nil { 1768 return nil, err 1769 } 1770 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1771 if err != nil { 1772 return nil, err 1773 } 1774 if 0 > ordinal || ordinal >= len(pods) { 1775 return nil, fmt.Errorf("ordinal %d out of range [0,%d)", ordinal, len(pods)) 1776 } 1777 sort.Sort(ascendingOrdinal(pods)) 1778 pod := pods[ordinal].DeepCopy() 1779 condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: lastTransitionTime}} 1780 _, existingCondition := podutil.GetPodCondition(&pod.Status, condition.Type) 1781 if existingCondition != nil { 1782 existingCondition.Status = v1.ConditionTrue 1783 existingCondition.LastTransitionTime = metav1.Time{Time: lastTransitionTime} 1784 } else { 1785 existingCondition = &v1.PodCondition{ 1786 Type: v1.PodReady, 1787 Status: v1.ConditionTrue, 1788 LastTransitionTime: metav1.Time{Time: lastTransitionTime}, 1789 } 1790 pod.Status.Conditions = append(pod.Status.Conditions, *existingCondition) 1791 } 1792 podutil.UpdatePodCondition(&pod.Status, &condition) 1793 fakeResourceVersion(pod) 1794 spc.podsIndexer.Update(pod) 1795 return spc.podsLister.Pods(set.Namespace).List(selector) 1796} 1797 1798func (spc *fakeStatefulPodControl) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { 1799 pod := newStatefulSetPod(set, ordinal) 1800 pod.Status.Phase = v1.PodRunning 1801 deleted := metav1.NewTime(time.Now()) 1802 pod.DeletionTimestamp = &deleted 1803 condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue} 1804 fakeResourceVersion(pod) 1805 podutil.UpdatePodCondition(&pod.Status, &condition) 1806 spc.podsIndexer.Update(pod) 1807 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1808 if err != nil { 1809 return nil, err 1810 } 1811 return spc.podsLister.Pods(set.Namespace).List(selector) 1812} 1813 1814func (spc *fakeStatefulPodControl) setPodTerminated(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) { 1815 pod := newStatefulSetPod(set, ordinal) 1816 deleted := metav1.NewTime(time.Now()) 1817 pod.DeletionTimestamp = &deleted 1818 fakeResourceVersion(pod) 1819 spc.podsIndexer.Update(pod) 1820 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1821 if err != nil { 1822 return nil, err 1823 } 1824 return spc.podsLister.Pods(set.Namespace).List(selector) 1825} 1826 1827func (spc *fakeStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { 1828 defer spc.createPodTracker.inc() 1829 if spc.createPodTracker.errorReady() { 1830 defer spc.createPodTracker.reset() 1831 return spc.createPodTracker.err 1832 } 1833 1834 for _, claim := range getPersistentVolumeClaims(set, pod) { 1835 spc.claimsIndexer.Update(&claim) 1836 } 1837 spc.podsIndexer.Update(pod) 1838 return nil 1839} 1840 1841func (spc *fakeStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { 1842 defer spc.updatePodTracker.inc() 1843 if spc.updatePodTracker.errorReady() { 1844 defer spc.updatePodTracker.reset() 1845 return spc.updatePodTracker.err 1846 } 1847 if !identityMatches(set, pod) { 1848 updateIdentity(set, pod) 1849 } 1850 if !storageMatches(set, pod) { 1851 updateStorage(set, pod) 1852 for _, claim := range getPersistentVolumeClaims(set, pod) { 1853 spc.claimsIndexer.Update(&claim) 1854 } 1855 } 1856 spc.podsIndexer.Update(pod) 1857 return nil 1858} 1859 1860func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { 1861 defer spc.deletePodTracker.inc() 1862 if spc.deletePodTracker.errorReady() { 1863 defer spc.deletePodTracker.reset() 1864 return spc.deletePodTracker.err 1865 } 1866 if key, err := controller.KeyFunc(pod); err != nil { 1867 return err 1868 } else if obj, found, err := spc.podsIndexer.GetByKey(key); err != nil { 1869 return err 1870 } else if found { 1871 spc.podsIndexer.Delete(obj) 1872 } 1873 1874 return nil 1875} 1876 1877var _ StatefulPodControlInterface = &fakeStatefulPodControl{} 1878 1879type fakeStatefulSetStatusUpdater struct { 1880 setsLister appslisters.StatefulSetLister 1881 setsIndexer cache.Indexer 1882 updateStatusTracker requestTracker 1883} 1884 1885func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInformer) *fakeStatefulSetStatusUpdater { 1886 return &fakeStatefulSetStatusUpdater{ 1887 setInformer.Lister(), 1888 setInformer.Informer().GetIndexer(), 1889 requestTracker{0, nil, 0}, 1890 } 1891} 1892 1893func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) error { 1894 defer ssu.updateStatusTracker.inc() 1895 if ssu.updateStatusTracker.errorReady() { 1896 defer ssu.updateStatusTracker.reset() 1897 return ssu.updateStatusTracker.err 1898 } 1899 set.Status = *status 1900 ssu.setsIndexer.Update(set) 1901 return nil 1902} 1903 1904func (ssu *fakeStatefulSetStatusUpdater) SetUpdateStatefulSetStatusError(err error, after int) { 1905 ssu.updateStatusTracker.err = err 1906 ssu.updateStatusTracker.after = after 1907} 1908 1909var _ StatefulSetStatusUpdaterInterface = &fakeStatefulSetStatusUpdater{} 1910 1911func assertMonotonicInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error { 1912 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1913 if err != nil { 1914 return err 1915 } 1916 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1917 if err != nil { 1918 return err 1919 } 1920 sort.Sort(ascendingOrdinal(pods)) 1921 for ord := 0; ord < len(pods); ord++ { 1922 if ord > 0 && isRunningAndReady(pods[ord]) && !isRunningAndReady(pods[ord-1]) { 1923 return fmt.Errorf("successor %s is Running and Ready while %s is not", pods[ord].Name, pods[ord-1].Name) 1924 } 1925 1926 if getOrdinal(pods[ord]) != ord { 1927 return fmt.Errorf("pods %s deployed in the wrong order %d", pods[ord].Name, ord) 1928 } 1929 1930 if !storageMatches(set, pods[ord]) { 1931 return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name) 1932 } 1933 1934 for _, claim := range getPersistentVolumeClaims(set, pods[ord]) { 1935 claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name) 1936 if err != nil { 1937 return err 1938 } 1939 if claim == nil { 1940 return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name) 1941 } 1942 } 1943 1944 if !identityMatches(set, pods[ord]) { 1945 return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name) 1946 } 1947 } 1948 return nil 1949} 1950 1951func assertBurstInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error { 1952 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1953 if err != nil { 1954 return err 1955 } 1956 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1957 if err != nil { 1958 return err 1959 } 1960 sort.Sort(ascendingOrdinal(pods)) 1961 for ord := 0; ord < len(pods); ord++ { 1962 if !storageMatches(set, pods[ord]) { 1963 return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name) 1964 } 1965 1966 for _, claim := range getPersistentVolumeClaims(set, pods[ord]) { 1967 claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name) 1968 if err != nil { 1969 return err 1970 } 1971 if claim == nil { 1972 return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name) 1973 } 1974 } 1975 1976 if !identityMatches(set, pods[ord]) { 1977 return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", 1978 pods[ord].Name, 1979 set.Name) 1980 } 1981 } 1982 return nil 1983} 1984 1985func assertUpdateInvariants(set *apps.StatefulSet, spc *fakeStatefulPodControl) error { 1986 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 1987 if err != nil { 1988 return err 1989 } 1990 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 1991 if err != nil { 1992 return err 1993 } 1994 sort.Sort(ascendingOrdinal(pods)) 1995 for ord := 0; ord < len(pods); ord++ { 1996 1997 if !storageMatches(set, pods[ord]) { 1998 return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pods[ord].Name, set.Name) 1999 } 2000 2001 for _, claim := range getPersistentVolumeClaims(set, pods[ord]) { 2002 claim, err := spc.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name) 2003 if err != nil { 2004 return err 2005 } 2006 if claim == nil { 2007 return fmt.Errorf("claim %s for Pod %s was not created", claim.Name, pods[ord].Name) 2008 } 2009 } 2010 2011 if !identityMatches(set, pods[ord]) { 2012 return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pods[ord].Name, set.Name) 2013 } 2014 } 2015 if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType { 2016 return nil 2017 } 2018 if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType { 2019 for i := 0; i < int(set.Status.CurrentReplicas) && i < len(pods); i++ { 2020 if want, got := set.Status.CurrentRevision, getPodRevision(pods[i]); want != got { 2021 return fmt.Errorf("pod %s want current revision %s got %s", pods[i].Name, want, got) 2022 } 2023 } 2024 for i, j := len(pods)-1, 0; j < int(set.Status.UpdatedReplicas); i, j = i-1, j+1 { 2025 if want, got := set.Status.UpdateRevision, getPodRevision(pods[i]); want != got { 2026 return fmt.Errorf("pod %s want update revision %s got %s", pods[i].Name, want, got) 2027 } 2028 } 2029 } 2030 return nil 2031} 2032 2033func fakeResourceVersion(object interface{}) { 2034 obj, isObj := object.(metav1.Object) 2035 if !isObj { 2036 return 2037 } 2038 if version := obj.GetResourceVersion(); version == "" { 2039 obj.SetResourceVersion("1") 2040 } else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil { 2041 obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10)) 2042 } 2043} 2044 2045func scaleUpStatefulSetControl(set *apps.StatefulSet, 2046 ssc StatefulSetControlInterface, 2047 spc *fakeStatefulPodControl, 2048 invariants invariantFunc) error { 2049 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 2050 if err != nil { 2051 return err 2052 } 2053 for set.Status.ReadyReplicas < *set.Spec.Replicas { 2054 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 2055 if err != nil { 2056 return err 2057 } 2058 sort.Sort(ascendingOrdinal(pods)) 2059 2060 // ensure all pods are valid (have a phase) 2061 initialized := false 2062 for ord, pod := range pods { 2063 if pod.Status.Phase == "" { 2064 if pods, err = spc.setPodPending(set, ord); err != nil { 2065 return err 2066 } 2067 break 2068 } 2069 } 2070 if initialized { 2071 continue 2072 } 2073 2074 // select one of the pods and move it forward in status 2075 if len(pods) > 0 { 2076 ord := int(rand.Int63n(int64(len(pods)))) 2077 pod := pods[ord] 2078 switch pod.Status.Phase { 2079 case v1.PodPending: 2080 if pods, err = spc.setPodRunning(set, ord); err != nil { 2081 return err 2082 } 2083 case v1.PodRunning: 2084 if pods, err = spc.setPodReady(set, ord); err != nil { 2085 return err 2086 } 2087 default: 2088 continue 2089 } 2090 } 2091 // run the controller once and check invariants 2092 _, err = ssc.UpdateStatefulSet(set, pods) 2093 if err != nil { 2094 return err 2095 } 2096 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 2097 if err != nil { 2098 return err 2099 } 2100 if err := invariants(set, spc); err != nil { 2101 return err 2102 } 2103 //fmt.Printf("Ravig pod conditions %v %v", set.Status.ReadyReplicas, *set.Spec.Replicas) 2104 } 2105 return invariants(set, spc) 2106} 2107 2108func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, spc *fakeStatefulPodControl, invariants invariantFunc) error { 2109 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 2110 if err != nil { 2111 return err 2112 } 2113 for set.Status.Replicas > *set.Spec.Replicas { 2114 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 2115 if err != nil { 2116 return err 2117 } 2118 sort.Sort(ascendingOrdinal(pods)) 2119 if ordinal := len(pods) - 1; ordinal >= 0 { 2120 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { 2121 return err 2122 } 2123 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 2124 if err != nil { 2125 return err 2126 } 2127 if pods, err = spc.addTerminatingPod(set, ordinal); err != nil { 2128 return err 2129 } 2130 if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { 2131 return err 2132 } 2133 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 2134 if err != nil { 2135 return err 2136 } 2137 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 2138 if err != nil { 2139 return err 2140 } 2141 sort.Sort(ascendingOrdinal(pods)) 2142 2143 if len(pods) > 0 { 2144 spc.podsIndexer.Delete(pods[len(pods)-1]) 2145 } 2146 } 2147 if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { 2148 return err 2149 } 2150 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 2151 if err != nil { 2152 return err 2153 } 2154 if err := invariants(set, spc); err != nil { 2155 return err 2156 } 2157 } 2158 return invariants(set, spc) 2159} 2160 2161func updateComplete(set *apps.StatefulSet, pods []*v1.Pod) bool { 2162 sort.Sort(ascendingOrdinal(pods)) 2163 if len(pods) != int(*set.Spec.Replicas) { 2164 return false 2165 } 2166 if set.Status.ReadyReplicas != *set.Spec.Replicas { 2167 return false 2168 } 2169 2170 switch set.Spec.UpdateStrategy.Type { 2171 case apps.OnDeleteStatefulSetStrategyType: 2172 return true 2173 case apps.RollingUpdateStatefulSetStrategyType: 2174 if set.Spec.UpdateStrategy.RollingUpdate == nil || *set.Spec.UpdateStrategy.RollingUpdate.Partition <= 0 { 2175 if set.Status.CurrentReplicas < *set.Spec.Replicas { 2176 return false 2177 } 2178 for i := range pods { 2179 if getPodRevision(pods[i]) != set.Status.CurrentRevision { 2180 return false 2181 } 2182 } 2183 } else { 2184 partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition) 2185 if len(pods) < partition { 2186 return false 2187 } 2188 for i := partition; i < len(pods); i++ { 2189 if getPodRevision(pods[i]) != set.Status.UpdateRevision { 2190 return false 2191 } 2192 } 2193 } 2194 } 2195 return true 2196} 2197 2198func updateStatefulSetControl(set *apps.StatefulSet, 2199 ssc StatefulSetControlInterface, 2200 spc *fakeStatefulPodControl, 2201 invariants invariantFunc) error { 2202 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 2203 if err != nil { 2204 return err 2205 } 2206 pods, err := spc.podsLister.Pods(set.Namespace).List(selector) 2207 if err != nil { 2208 return err 2209 } 2210 if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { 2211 return err 2212 } 2213 2214 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 2215 if err != nil { 2216 return err 2217 } 2218 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 2219 if err != nil { 2220 return err 2221 } 2222 for !updateComplete(set, pods) { 2223 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 2224 if err != nil { 2225 return err 2226 } 2227 sort.Sort(ascendingOrdinal(pods)) 2228 initialized := false 2229 for ord, pod := range pods { 2230 if pod.Status.Phase == "" { 2231 if pods, err = spc.setPodPending(set, ord); err != nil { 2232 return err 2233 } 2234 break 2235 } 2236 } 2237 if initialized { 2238 continue 2239 } 2240 2241 if len(pods) > 0 { 2242 ord := int(rand.Int63n(int64(len(pods)))) 2243 pod := pods[ord] 2244 switch pod.Status.Phase { 2245 case v1.PodPending: 2246 if pods, err = spc.setPodRunning(set, ord); err != nil { 2247 return err 2248 } 2249 case v1.PodRunning: 2250 if pods, err = spc.setPodReady(set, ord); err != nil { 2251 return err 2252 } 2253 default: 2254 continue 2255 } 2256 } 2257 2258 if _, err = ssc.UpdateStatefulSet(set, pods); err != nil { 2259 return err 2260 } 2261 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) 2262 if err != nil { 2263 return err 2264 } 2265 if err := invariants(set, spc); err != nil { 2266 return err 2267 } 2268 pods, err = spc.podsLister.Pods(set.Namespace).List(selector) 2269 if err != nil { 2270 return err 2271 } 2272 } 2273 return invariants(set, spc) 2274} 2275 2276func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRevision { 2277 rev, err := newRevision(set, revision, set.Status.CollisionCount) 2278 if err != nil { 2279 panic(err) 2280 } 2281 return rev 2282} 2283 2284func isOrHasInternalError(err error) bool { 2285 agg, ok := err.(utilerrors.Aggregate) 2286 return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0]) 2287} 2288