1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package apps 18 19import ( 20 "bytes" 21 "context" 22 "encoding/json" 23 "fmt" 24 "math/rand" 25 "reflect" 26 "sort" 27 "strings" 28 "text/tabwriter" 29 "time" 30 31 "k8s.io/client-go/tools/cache" 32 33 "github.com/onsi/ginkgo" 34 "github.com/onsi/gomega" 35 appsv1 "k8s.io/api/apps/v1" 36 v1 "k8s.io/api/core/v1" 37 apierrors "k8s.io/apimachinery/pkg/api/errors" 38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 39 "k8s.io/apimachinery/pkg/labels" 40 "k8s.io/apimachinery/pkg/runtime" 41 "k8s.io/apimachinery/pkg/runtime/schema" 42 "k8s.io/apimachinery/pkg/types" 43 "k8s.io/apimachinery/pkg/util/intstr" 44 "k8s.io/apimachinery/pkg/util/sets" 45 "k8s.io/apimachinery/pkg/util/wait" 46 watch "k8s.io/apimachinery/pkg/watch" 47 clientset "k8s.io/client-go/kubernetes" 48 "k8s.io/client-go/kubernetes/scheme" 49 watchtools "k8s.io/client-go/tools/watch" 50 "k8s.io/client-go/util/retry" 51 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 52 extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" 53 "k8s.io/kubernetes/pkg/controller/daemon" 54 "k8s.io/kubernetes/test/e2e/framework" 55 e2enode "k8s.io/kubernetes/test/e2e/framework/node" 56 e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" 57) 58 59const ( 60 // this should not be a multiple of 5, because node status updates 61 // every 5 seconds. See https://github.com/kubernetes/kubernetes/pull/14915. 62 dsRetryPeriod = 1 * time.Second 63 dsRetryTimeout = 5 * time.Minute 64 65 daemonsetLabelPrefix = "daemonset-" 66 daemonsetNameLabel = daemonsetLabelPrefix + "name" 67 daemonsetColorLabel = daemonsetLabelPrefix + "color" 68) 69 70// NamespaceNodeSelectors the annotation key scheduler.alpha.kubernetes.io/node-selector is for assigning 71// node selectors labels to namespaces 72var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"} 73 74type updateDSFunc func(*appsv1.DaemonSet) 75 76// updateDaemonSetWithRetries updates daemonsets with the given applyUpdate func 77// until it succeeds or a timeout expires. 78func updateDaemonSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateDSFunc) (ds *appsv1.DaemonSet, err error) { 79 daemonsets := c.AppsV1().DaemonSets(namespace) 80 var updateErr error 81 pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) { 82 if ds, err = daemonsets.Get(context.TODO(), name, metav1.GetOptions{}); err != nil { 83 return false, err 84 } 85 // Apply the update, then attempt to push it to the apiserver. 86 applyUpdate(ds) 87 if ds, err = daemonsets.Update(context.TODO(), ds, metav1.UpdateOptions{}); err == nil { 88 framework.Logf("Updating DaemonSet %s", name) 89 return true, nil 90 } 91 updateErr = err 92 return false, nil 93 }) 94 if pollErr == wait.ErrWaitTimeout { 95 pollErr = fmt.Errorf("couldn't apply the provided updated to DaemonSet %q: %v", name, updateErr) 96 } 97 return ds, pollErr 98} 99 100// This test must be run in serial because it assumes the Daemon Set pods will 101// always get scheduled. If we run other tests in parallel, this may not 102// happen. In the future, running in parallel may work if we have an eviction 103// model which lets the DS controller kick out other pods to make room. 104// See http://issues.k8s.io/21767 for more details 105var _ = SIGDescribe("Daemon set [Serial]", func() { 106 var f *framework.Framework 107 108 ginkgo.AfterEach(func() { 109 // Clean up 110 daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}) 111 framework.ExpectNoError(err, "unable to dump DaemonSets") 112 if daemonsets != nil && len(daemonsets.Items) > 0 { 113 for _, ds := range daemonsets.Items { 114 ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name)) 115 framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name)) 116 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, &ds)) 117 framework.ExpectNoError(err, "error waiting for daemon pod to be reaped") 118 } 119 } 120 if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}); err == nil { 121 framework.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets)) 122 } else { 123 framework.Logf("unable to dump daemonsets: %v", err) 124 } 125 if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}); err == nil { 126 framework.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods)) 127 } else { 128 framework.Logf("unable to dump pods: %v", err) 129 } 130 err = clearDaemonSetNodeLabels(f.ClientSet) 131 framework.ExpectNoError(err) 132 }) 133 134 f = framework.NewDefaultFramework("daemonsets") 135 136 image := WebserverImage 137 dsName := "daemon-set" 138 139 var ns string 140 var c clientset.Interface 141 142 ginkgo.BeforeEach(func() { 143 ns = f.Namespace.Name 144 145 c = f.ClientSet 146 147 updatedNS, err := patchNamespaceAnnotations(c, ns) 148 framework.ExpectNoError(err) 149 150 ns = updatedNS.Name 151 152 err = clearDaemonSetNodeLabels(c) 153 framework.ExpectNoError(err) 154 }) 155 156 /* 157 Release: v1.10 158 Testname: DaemonSet-Creation 159 Description: A conformant Kubernetes distribution MUST support the creation of DaemonSets. When a DaemonSet 160 Pod is deleted, the DaemonSet controller MUST create a replacement Pod. 161 */ 162 framework.ConformanceIt("should run and stop simple daemon", func() { 163 label := map[string]string{daemonsetNameLabel: dsName} 164 165 ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName)) 166 ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSet(dsName, image, label), metav1.CreateOptions{}) 167 framework.ExpectNoError(err) 168 169 ginkgo.By("Check that daemon pods launch on every node of the cluster.") 170 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 171 framework.ExpectNoError(err, "error waiting for daemon pod to start") 172 err = checkDaemonStatus(f, dsName) 173 framework.ExpectNoError(err) 174 175 ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.") 176 podList := listDaemonPods(c, ns, label) 177 pod := podList.Items[0] 178 err = c.CoreV1().Pods(ns).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) 179 framework.ExpectNoError(err) 180 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 181 framework.ExpectNoError(err, "error waiting for daemon pod to revive") 182 }) 183 184 /* 185 Release: v1.10 186 Testname: DaemonSet-NodeSelection 187 Description: A conformant Kubernetes distribution MUST support DaemonSet Pod node selection via label 188 selectors. 189 */ 190 framework.ConformanceIt("should run and stop complex daemon", func() { 191 complexLabel := map[string]string{daemonsetNameLabel: dsName} 192 nodeSelector := map[string]string{daemonsetColorLabel: "blue"} 193 framework.Logf("Creating daemon %q with a node selector", dsName) 194 ds := newDaemonSet(dsName, image, complexLabel) 195 ds.Spec.Template.Spec.NodeSelector = nodeSelector 196 ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) 197 framework.ExpectNoError(err) 198 199 ginkgo.By("Initially, daemon pods should not be running on any nodes.") 200 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds)) 201 framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes") 202 203 ginkgo.By("Change node label to blue, check that daemon pod is launched.") 204 node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet) 205 framework.ExpectNoError(err) 206 newNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector) 207 framework.ExpectNoError(err, "error setting labels on node") 208 daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels) 209 framework.ExpectEqual(len(daemonSetLabels), 1) 210 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name})) 211 framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes") 212 err = checkDaemonStatus(f, dsName) 213 framework.ExpectNoError(err) 214 215 ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled") 216 nodeSelector[daemonsetColorLabel] = "green" 217 greenNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector) 218 framework.ExpectNoError(err, "error removing labels on node") 219 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds)) 220 framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes") 221 222 ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate") 223 patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`, 224 daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel]) 225 ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) 226 framework.ExpectNoError(err, "error patching daemon set") 227 daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels) 228 framework.ExpectEqual(len(daemonSetLabels), 1) 229 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{greenNode.Name})) 230 framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes") 231 err = checkDaemonStatus(f, dsName) 232 framework.ExpectNoError(err) 233 }) 234 235 // We defer adding this test to conformance pending the disposition of moving DaemonSet scheduling logic to the 236 // default scheduler. 237 ginkgo.It("should run and stop complex daemon with node affinity", func() { 238 complexLabel := map[string]string{daemonsetNameLabel: dsName} 239 nodeSelector := map[string]string{daemonsetColorLabel: "blue"} 240 framework.Logf("Creating daemon %q with a node affinity", dsName) 241 ds := newDaemonSet(dsName, image, complexLabel) 242 ds.Spec.Template.Spec.Affinity = &v1.Affinity{ 243 NodeAffinity: &v1.NodeAffinity{ 244 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ 245 NodeSelectorTerms: []v1.NodeSelectorTerm{ 246 { 247 MatchExpressions: []v1.NodeSelectorRequirement{ 248 { 249 Key: daemonsetColorLabel, 250 Operator: v1.NodeSelectorOpIn, 251 Values: []string{nodeSelector[daemonsetColorLabel]}, 252 }, 253 }, 254 }, 255 }, 256 }, 257 }, 258 } 259 ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) 260 framework.ExpectNoError(err) 261 262 ginkgo.By("Initially, daemon pods should not be running on any nodes.") 263 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds)) 264 framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes") 265 266 ginkgo.By("Change node label to blue, check that daemon pod is launched.") 267 node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet) 268 framework.ExpectNoError(err) 269 newNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector) 270 framework.ExpectNoError(err, "error setting labels on node") 271 daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels) 272 framework.ExpectEqual(len(daemonSetLabels), 1) 273 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name})) 274 framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes") 275 err = checkDaemonStatus(f, dsName) 276 framework.ExpectNoError(err) 277 278 ginkgo.By("Remove the node label and wait for daemons to be unscheduled") 279 _, err = setDaemonSetNodeLabels(c, node.Name, map[string]string{}) 280 framework.ExpectNoError(err, "error removing labels on node") 281 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds)) 282 framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes") 283 }) 284 285 /* 286 Release: v1.10 287 Testname: DaemonSet-FailedPodCreation 288 Description: A conformant Kubernetes distribution MUST create new DaemonSet Pods when they fail. 289 */ 290 framework.ConformanceIt("should retry creating failed daemon pods", func() { 291 label := map[string]string{daemonsetNameLabel: dsName} 292 293 ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName)) 294 ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSet(dsName, image, label), metav1.CreateOptions{}) 295 framework.ExpectNoError(err) 296 297 ginkgo.By("Check that daemon pods launch on every node of the cluster.") 298 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 299 framework.ExpectNoError(err, "error waiting for daemon pod to start") 300 err = checkDaemonStatus(f, dsName) 301 framework.ExpectNoError(err) 302 303 ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.") 304 podList := listDaemonPods(c, ns, label) 305 pod := podList.Items[0] 306 pod.ResourceVersion = "" 307 pod.Status.Phase = v1.PodFailed 308 _, err = c.CoreV1().Pods(ns).UpdateStatus(context.TODO(), &pod, metav1.UpdateOptions{}) 309 framework.ExpectNoError(err, "error failing a daemon pod") 310 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 311 framework.ExpectNoError(err, "error waiting for daemon pod to revive") 312 313 ginkgo.By("Wait for the failed daemon pod to be completely deleted.") 314 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, waitFailedDaemonPodDeleted(c, &pod)) 315 framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted") 316 }) 317 318 // This test should not be added to conformance. We will consider deprecating OnDelete when the 319 // extensions/v1beta1 and apps/v1beta1 are removed. 320 ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func() { 321 label := map[string]string{daemonsetNameLabel: dsName} 322 323 framework.Logf("Creating simple daemon set %s", dsName) 324 ds := newDaemonSet(dsName, image, label) 325 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType} 326 ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) 327 framework.ExpectNoError(err) 328 329 ginkgo.By("Check that daemon pods launch on every node of the cluster.") 330 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 331 framework.ExpectNoError(err, "error waiting for daemon pod to start") 332 333 // Check history and labels 334 ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) 335 framework.ExpectNoError(err) 336 waitForHistoryCreated(c, ns, label, 1) 337 first := curHistory(listDaemonHistories(c, ns, label), ds) 338 firstHash := first.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] 339 framework.ExpectEqual(first.Revision, int64(1)) 340 checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash) 341 342 ginkgo.By("Update daemon pods image.") 343 patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage) 344 ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) 345 framework.ExpectNoError(err) 346 347 ginkgo.By("Check that daemon pods images aren't updated.") 348 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, ds, image, 0)) 349 framework.ExpectNoError(err) 350 351 ginkgo.By("Check that daemon pods are still running on every node of the cluster.") 352 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 353 framework.ExpectNoError(err, "error waiting for daemon pod to start") 354 355 // Check history and labels 356 ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) 357 framework.ExpectNoError(err) 358 waitForHistoryCreated(c, ns, label, 2) 359 cur := curHistory(listDaemonHistories(c, ns, label), ds) 360 framework.ExpectEqual(cur.Revision, int64(2)) 361 framework.ExpectNotEqual(cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey], firstHash) 362 checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash) 363 }) 364 365 /* 366 Release: v1.10 367 Testname: DaemonSet-RollingUpdate 368 Description: A conformant Kubernetes distribution MUST support DaemonSet RollingUpdates. 369 */ 370 framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func() { 371 label := map[string]string{daemonsetNameLabel: dsName} 372 373 framework.Logf("Creating simple daemon set %s", dsName) 374 ds := newDaemonSet(dsName, image, label) 375 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType} 376 ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) 377 framework.ExpectNoError(err) 378 379 ginkgo.By("Check that daemon pods launch on every node of the cluster.") 380 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 381 framework.ExpectNoError(err, "error waiting for daemon pod to start") 382 383 // Check history and labels 384 ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) 385 framework.ExpectNoError(err) 386 waitForHistoryCreated(c, ns, label, 1) 387 cur := curHistory(listDaemonHistories(c, ns, label), ds) 388 hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] 389 framework.ExpectEqual(cur.Revision, int64(1)) 390 checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) 391 392 ginkgo.By("Update daemon pods image.") 393 patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage) 394 ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) 395 framework.ExpectNoError(err) 396 397 // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster. 398 // Get the number of nodes, and set the timeout appropriately. 399 nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) 400 framework.ExpectNoError(err) 401 nodeCount := len(nodes.Items) 402 retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second 403 404 ginkgo.By("Check that daemon pods images are updated.") 405 err = wait.PollImmediate(dsRetryPeriod, retryTimeout, checkDaemonPodsImageAndAvailability(c, ds, AgnhostImage, 1)) 406 framework.ExpectNoError(err) 407 408 ginkgo.By("Check that daemon pods are still running on every node of the cluster.") 409 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 410 framework.ExpectNoError(err, "error waiting for daemon pod to start") 411 412 // Check history and labels 413 ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) 414 framework.ExpectNoError(err) 415 waitForHistoryCreated(c, ns, label, 2) 416 cur = curHistory(listDaemonHistories(c, ns, label), ds) 417 hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] 418 framework.ExpectEqual(cur.Revision, int64(2)) 419 checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) 420 }) 421 422 /* 423 Release: v1.10 424 Testname: DaemonSet-Rollback 425 Description: A conformant Kubernetes distribution MUST support automated, minimally disruptive 426 rollback of updates to a DaemonSet. 427 */ 428 framework.ConformanceIt("should rollback without unnecessary restarts", func() { 429 schedulableNodes, err := e2enode.GetReadySchedulableNodes(c) 430 framework.ExpectNoError(err) 431 gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.") 432 framework.Logf("Create a RollingUpdate DaemonSet") 433 label := map[string]string{daemonsetNameLabel: dsName} 434 ds := newDaemonSet(dsName, image, label) 435 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType} 436 ds, err = c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) 437 framework.ExpectNoError(err) 438 439 framework.Logf("Check that daemon pods launch on every node of the cluster") 440 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 441 framework.ExpectNoError(err, "error waiting for daemon pod to start") 442 443 framework.Logf("Update the DaemonSet to trigger a rollout") 444 // We use a nonexistent image here, so that we make sure it won't finish 445 newImage := "foo:non-existent" 446 newDS, err := updateDaemonSetWithRetries(c, ns, ds.Name, func(update *appsv1.DaemonSet) { 447 update.Spec.Template.Spec.Containers[0].Image = newImage 448 }) 449 framework.ExpectNoError(err) 450 451 // Make sure we're in the middle of a rollout 452 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkAtLeastOneNewPod(c, ns, label, newImage)) 453 framework.ExpectNoError(err) 454 455 pods := listDaemonPods(c, ns, label) 456 var existingPods, newPods []*v1.Pod 457 for i := range pods.Items { 458 pod := pods.Items[i] 459 image := pod.Spec.Containers[0].Image 460 switch image { 461 case ds.Spec.Template.Spec.Containers[0].Image: 462 existingPods = append(existingPods, &pod) 463 case newDS.Spec.Template.Spec.Containers[0].Image: 464 newPods = append(newPods, &pod) 465 default: 466 framework.Failf("unexpected pod found, image = %s", image) 467 } 468 } 469 schedulableNodes, err = e2enode.GetReadySchedulableNodes(c) 470 framework.ExpectNoError(err) 471 if len(schedulableNodes.Items) < 2 { 472 framework.ExpectEqual(len(existingPods), 0) 473 } else { 474 framework.ExpectNotEqual(len(existingPods), 0) 475 } 476 framework.ExpectNotEqual(len(newPods), 0) 477 478 framework.Logf("Roll back the DaemonSet before rollout is complete") 479 rollbackDS, err := updateDaemonSetWithRetries(c, ns, ds.Name, func(update *appsv1.DaemonSet) { 480 update.Spec.Template.Spec.Containers[0].Image = image 481 }) 482 framework.ExpectNoError(err) 483 484 framework.Logf("Make sure DaemonSet rollback is complete") 485 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1)) 486 framework.ExpectNoError(err) 487 488 // After rollback is done, compare current pods with previous old pods during rollout, to make sure they're not restarted 489 pods = listDaemonPods(c, ns, label) 490 rollbackPods := map[string]bool{} 491 for _, pod := range pods.Items { 492 rollbackPods[pod.Name] = true 493 } 494 for _, pod := range existingPods { 495 framework.ExpectEqual(rollbackPods[pod.Name], true, fmt.Sprintf("unexpected pod %s be restarted", pod.Name)) 496 } 497 }) 498 499 // TODO: This test is expected to be promoted to conformance after the feature is promoted 500 ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate", func() { 501 label := map[string]string{daemonsetNameLabel: dsName} 502 503 framework.Logf("Creating surge daemon set %s", dsName) 504 maxSurgeOverlap := 60 * time.Second 505 maxSurge := 1 506 surgePercent := intstr.FromString("20%") 507 zero := intstr.FromInt(0) 508 oldVersion := "1" 509 ds := newDaemonSet(dsName, image, label) 510 ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{ 511 {Name: "VERSION", Value: oldVersion}, 512 } 513 // delay shutdown by 15s to allow containers to overlap in time 514 ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{ 515 PreStop: &v1.Handler{ 516 Exec: &v1.ExecAction{ 517 Command: []string{"/bin/sh", "-c", "sleep 15"}, 518 }, 519 }, 520 } 521 // use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready) 522 ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{ 523 Handler: v1.Handler{ 524 Exec: &v1.ExecAction{ 525 Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`}, 526 }, 527 }, 528 InitialDelaySeconds: 7, 529 PeriodSeconds: 3, 530 SuccessThreshold: 1, 531 FailureThreshold: 1, 532 } 533 // use a simple surge strategy 534 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ 535 Type: appsv1.RollingUpdateDaemonSetStrategyType, 536 RollingUpdate: &appsv1.RollingUpdateDaemonSet{ 537 MaxUnavailable: &zero, 538 MaxSurge: &surgePercent, 539 }, 540 } 541 // The pod must be ready for at least 10s before we delete the old pod 542 ds.Spec.MinReadySeconds = 10 543 544 ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) 545 framework.ExpectNoError(err) 546 547 ginkgo.By("Check that daemon pods launch on every node of the cluster.") 548 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 549 framework.ExpectNoError(err, "error waiting for daemon pod to start") 550 551 // Check history and labels 552 ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) 553 framework.ExpectNoError(err) 554 waitForHistoryCreated(c, ns, label, 1) 555 cur := curHistory(listDaemonHistories(c, ns, label), ds) 556 hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] 557 framework.ExpectEqual(cur.Revision, int64(1)) 558 checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) 559 560 newVersion := "2" 561 ginkgo.By("Update daemon pods environment var") 562 patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion) 563 ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) 564 framework.ExpectNoError(err) 565 566 // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster. 567 // Get the number of nodes, and set the timeout appropriately. 568 nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) 569 framework.ExpectNoError(err) 570 nodeCount := len(nodes.Items) 571 retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second 572 573 ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout") 574 ageOfOldPod := make(map[string]time.Time) 575 deliberatelyDeletedPods := sets.NewString() 576 err = wait.PollImmediate(dsRetryPeriod, retryTimeout, func() (bool, error) { 577 podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{}) 578 if err != nil { 579 return false, err 580 } 581 pods := podList.Items 582 583 var buf bytes.Buffer 584 pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0) 585 fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n") 586 587 now := time.Now() 588 podUIDs := sets.NewString() 589 deletedPodUIDs := sets.NewString() 590 nodes := sets.NewString() 591 versions := sets.NewString() 592 nodesToVersions := make(map[string]map[string]int) 593 nodesToDeletedVersions := make(map[string]map[string]int) 594 var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int 595 for _, pod := range pods { 596 if !metav1.IsControlledBy(&pod, ds) { 597 continue 598 } 599 nodeName := pod.Spec.NodeName 600 nodes.Insert(nodeName) 601 podVersion := pod.Spec.Containers[0].Env[0].Value 602 if pod.DeletionTimestamp != nil { 603 if !deliberatelyDeletedPods.Has(string(pod.UID)) { 604 versions := nodesToDeletedVersions[nodeName] 605 if versions == nil { 606 versions = make(map[string]int) 607 nodesToDeletedVersions[nodeName] = versions 608 } 609 versions[podVersion]++ 610 } 611 } else { 612 versions := nodesToVersions[nodeName] 613 if versions == nil { 614 versions = make(map[string]int) 615 nodesToVersions[nodeName] = versions 616 } 617 versions[podVersion]++ 618 } 619 620 ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) 621 if podVersion == newVersion { 622 surgeCount++ 623 if !ready || pod.DeletionTimestamp != nil { 624 if deliberatelyDeletedPods.Has(string(pod.UID)) { 625 newDeliberatelyDeletedCount++ 626 } 627 newUnavailableCount++ 628 } 629 } else { 630 if !ready || pod.DeletionTimestamp != nil { 631 oldUnavailableCount++ 632 } 633 } 634 fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready) 635 } 636 637 // print a stable sorted list of pods by node for debugging 638 pw.Flush() 639 lines := strings.Split(buf.String(), "\n") 640 lines = lines[:len(lines)-1] 641 sort.Strings(lines[1:]) 642 for _, line := range lines { 643 framework.Logf("%s", line) 644 } 645 646 // if there is an old and new pod at the same time, record a timestamp 647 deletedPerNode := make(map[string]int) 648 for _, pod := range pods { 649 if !metav1.IsControlledBy(&pod, ds) { 650 continue 651 } 652 // ignore deleted pods 653 if pod.DeletionTimestamp != nil { 654 deletedPodUIDs.Insert(string(pod.UID)) 655 if !deliberatelyDeletedPods.Has(string(pod.UID)) { 656 deletedPerNode[pod.Spec.NodeName]++ 657 } 658 continue 659 } 660 podUIDs.Insert(string(pod.UID)) 661 podVersion := pod.Spec.Containers[0].Env[0].Value 662 if podVersion == newVersion { 663 continue 664 } 665 // if this is a pod in an older version AND there is a new version of this pod, record when 666 // we started seeing this, otherwise delete the record (perhaps the node was drained) 667 if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 { 668 if _, ok := ageOfOldPod[string(pod.UID)]; !ok { 669 ageOfOldPod[string(pod.UID)] = now 670 } 671 } else { 672 delete(ageOfOldPod, string(pod.UID)) 673 } 674 } 675 // purge the old pods list of any deleted pods 676 for uid := range ageOfOldPod { 677 if !podUIDs.Has(uid) { 678 delete(ageOfOldPod, uid) 679 } 680 } 681 deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs) 682 683 for _, versions := range nodesToVersions { 684 if versions[oldVersion] == 0 { 685 nodesWithoutOldVersion++ 686 } 687 } 688 689 var errs []string 690 691 // invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving 692 for node, count := range deletedPerNode { 693 if count > 1 { 694 errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count)) 695 } 696 } 697 698 // invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period) 699 for uid, firstSeen := range ageOfOldPod { 700 if now.Sub(firstSeen) > maxSurgeOverlap { 701 errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap)) 702 } 703 } 704 705 // invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or 706 // if we deliberately deleted one of the new pods 707 if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) { 708 errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion)) 709 } 710 // invariant: the total number of versions created should be 2 711 if versions.Len() > 2 { 712 errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len())) 713 } 714 for _, node := range nodes.List() { 715 // ignore pods that haven't been scheduled yet 716 if len(node) == 0 { 717 continue 718 } 719 versionCount := make(map[string]int) 720 // invariant: surge should never have more than one instance of a pod per node running 721 for version, count := range nodesToVersions[node] { 722 if count > 1 { 723 errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version)) 724 } 725 versionCount[version] += count 726 } 727 // invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted) 728 for version, count := range nodesToDeletedVersions[node] { 729 if count > 2 { 730 errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version)) 731 } 732 versionCount[version] += count 733 } 734 // invariant: on any node, we should never have more than two instances of a version (if we are getting evicted) 735 for version, count := range versionCount { 736 if count > 2 { 737 errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version)) 738 } 739 } 740 } 741 742 if len(errs) > 0 { 743 sort.Strings(errs) 744 return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n")) 745 } 746 747 // Make sure every daemon pod on the node has been updated 748 nodeNames := schedulableNodes(c, ds) 749 for _, node := range nodeNames { 750 switch { 751 case 752 // if we don't have the new version yet 753 nodesToVersions[node][newVersion] == 0, 754 // if there are more than one version on a node 755 len(nodesToVersions[node]) > 1, 756 // if there are still any deleted pods 757 len(nodesToDeletedVersions[node]) > 0, 758 // if any of the new pods are unavailable 759 newUnavailableCount > 0: 760 761 // inject a failure randomly to ensure the controller recovers 762 switch rand.Intn(25) { 763 // cause a random old pod to go unready 764 case 0: 765 // select a not-deleted pod of the old version 766 if pod := randomPod(pods, func(pod *v1.Pod) bool { 767 return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value 768 }); pod != nil { 769 // make the /tmp/ready file read only, which will cause readiness to fail 770 if _, err := framework.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil { 771 framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err) 772 } else { 773 framework.Logf("Marked old pod %s as unready", pod.Name) 774 } 775 } 776 case 1: 777 // delete a random pod 778 if pod := randomPod(pods, func(pod *v1.Pod) bool { 779 return pod.DeletionTimestamp == nil 780 }); pod != nil { 781 if err := c.CoreV1().Pods(ds.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil { 782 framework.Logf("Failed to delete pod %s early: %v", pod.Name, err) 783 } else { 784 framework.Logf("Deleted pod %s prematurely", pod.Name) 785 deliberatelyDeletedPods.Insert(string(pod.UID)) 786 } 787 } 788 } 789 790 // then wait 791 return false, nil 792 } 793 } 794 return true, nil 795 }) 796 framework.ExpectNoError(err) 797 798 ginkgo.By("Check that daemon pods are still running on every node of the cluster.") 799 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) 800 framework.ExpectNoError(err, "error waiting for daemon pod to start") 801 802 // Check history and labels 803 ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) 804 framework.ExpectNoError(err) 805 waitForHistoryCreated(c, ns, label, 2) 806 cur = curHistory(listDaemonHistories(c, ns, label), ds) 807 hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] 808 framework.ExpectEqual(cur.Revision, int64(2)) 809 checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) 810 }) 811 812 /* 813 Release: v1.22 814 Testname: DaemonSet, list and delete a collection of DaemonSets 815 Description: When a DaemonSet is created it MUST succeed. It 816 MUST succeed when listing DaemonSets via a label selector. It 817 MUST succeed when deleting the DaemonSet via deleteCollection. 818 */ 819 framework.ConformanceIt("should list and delete a collection of DaemonSets", func() { 820 label := map[string]string{daemonsetNameLabel: dsName} 821 labelSelector := labels.SelectorFromSet(label).String() 822 823 dsClient := f.ClientSet.AppsV1().DaemonSets(ns) 824 cs := f.ClientSet 825 one := int64(1) 826 827 ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName)) 828 testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{}) 829 framework.ExpectNoError(err) 830 831 ginkgo.By("Check that daemon pods launch on every node of the cluster.") 832 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, testDaemonset)) 833 framework.ExpectNoError(err, "error waiting for daemon pod to start") 834 err = checkDaemonStatus(f, dsName) 835 framework.ExpectNoError(err) 836 837 ginkgo.By("listing all DeamonSets") 838 dsList, err := cs.AppsV1().DaemonSets("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) 839 framework.ExpectNoError(err, "failed to list Daemon Sets") 840 framework.ExpectEqual(len(dsList.Items), 1, "filtered list wasn't found") 841 842 ginkgo.By("DeleteCollection of the DaemonSets") 843 err = dsClient.DeleteCollection(context.TODO(), metav1.DeleteOptions{GracePeriodSeconds: &one}, metav1.ListOptions{LabelSelector: labelSelector}) 844 framework.ExpectNoError(err, "failed to delete DaemonSets") 845 846 ginkgo.By("Verify that ReplicaSets have been deleted") 847 dsList, err = c.AppsV1().DaemonSets("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) 848 framework.ExpectNoError(err, "failed to list DaemonSets") 849 framework.ExpectEqual(len(dsList.Items), 0, "filtered list should have no daemonset") 850 }) 851 852 /* Release: v1.22 853 Testname: DaemonSet, status sub-resource 854 Description: When a DaemonSet is created it MUST succeed. 855 Attempt to read, update and patch its status sub-resource; all 856 mutating sub-resource operations MUST be visible to subsequent reads. 857 */ 858 framework.ConformanceIt("should verify changes to a daemon set status", func() { 859 label := map[string]string{daemonsetNameLabel: dsName} 860 labelSelector := labels.SelectorFromSet(label).String() 861 862 dsClient := f.ClientSet.AppsV1().DaemonSets(ns) 863 cs := f.ClientSet 864 865 w := &cache.ListWatch{ 866 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { 867 options.LabelSelector = labelSelector 868 return dsClient.Watch(context.TODO(), options) 869 }, 870 } 871 872 dsList, err := cs.AppsV1().DaemonSets("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) 873 framework.ExpectNoError(err, "failed to list Daemon Sets") 874 875 ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName)) 876 testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{}) 877 framework.ExpectNoError(err) 878 879 ginkgo.By("Check that daemon pods launch on every node of the cluster.") 880 err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, testDaemonset)) 881 framework.ExpectNoError(err, "error waiting for daemon pod to start") 882 err = checkDaemonStatus(f, dsName) 883 framework.ExpectNoError(err) 884 885 ginkgo.By("Getting /status") 886 dsResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"} 887 dsStatusUnstructured, err := f.DynamicClient.Resource(dsResource).Namespace(ns).Get(context.TODO(), dsName, metav1.GetOptions{}, "status") 888 framework.ExpectNoError(err, "Failed to fetch the status of daemon set %s in namespace %s", dsName, ns) 889 dsStatusBytes, err := json.Marshal(dsStatusUnstructured) 890 framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err) 891 892 var dsStatus appsv1.DaemonSet 893 err = json.Unmarshal(dsStatusBytes, &dsStatus) 894 framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a daemon set object type") 895 framework.Logf("Daemon Set %s has Conditions: %v", dsName, dsStatus.Status.Conditions) 896 897 ginkgo.By("updating the DaemonSet Status") 898 var statusToUpdate, updatedStatus *appsv1.DaemonSet 899 900 err = retry.RetryOnConflict(retry.DefaultRetry, func() error { 901 statusToUpdate, err = dsClient.Get(context.TODO(), dsName, metav1.GetOptions{}) 902 framework.ExpectNoError(err, "Unable to retrieve daemon set %s", dsName) 903 904 statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, appsv1.DaemonSetCondition{ 905 Type: "StatusUpdate", 906 Status: "True", 907 Reason: "E2E", 908 Message: "Set from e2e test", 909 }) 910 911 updatedStatus, err = dsClient.UpdateStatus(context.TODO(), statusToUpdate, metav1.UpdateOptions{}) 912 return err 913 }) 914 framework.ExpectNoError(err, "Failed to update status. %v", err) 915 framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions) 916 917 ginkgo.By("watching for the daemon set status to be updated") 918 ctx, cancel := context.WithTimeout(context.Background(), dsRetryTimeout) 919 defer cancel() 920 _, err = watchtools.Until(ctx, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) { 921 if ds, ok := event.Object.(*appsv1.DaemonSet); ok { 922 found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name && 923 ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace && 924 ds.Labels[daemonsetNameLabel] == dsName 925 if !found { 926 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) 927 return false, nil 928 } 929 for _, cond := range ds.Status.Conditions { 930 if cond.Type == "StatusUpdate" && 931 cond.Reason == "E2E" && 932 cond.Message == "Set from e2e test" { 933 framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions) 934 return found, nil 935 } 936 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) 937 } 938 } 939 object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0] 940 framework.Logf("Observed %v event: %+v", object, event.Type) 941 return false, nil 942 }) 943 framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns) 944 framework.Logf("Daemon set %s has an updated status", dsName) 945 946 ginkgo.By("patching the DaemonSet Status") 947 daemonSetStatusPatch := appsv1.DaemonSet{ 948 Status: appsv1.DaemonSetStatus{ 949 Conditions: []appsv1.DaemonSetCondition{ 950 { 951 Type: "StatusPatched", 952 Status: "True", 953 }, 954 }, 955 }, 956 } 957 958 payload, err := json.Marshal(daemonSetStatusPatch) 959 framework.ExpectNoError(err, "Failed to marshal JSON. %v", err) 960 _, err = dsClient.Patch(context.TODO(), dsName, types.MergePatchType, payload, metav1.PatchOptions{}, "status") 961 framework.ExpectNoError(err, "Failed to patch daemon set status", err) 962 963 ginkgo.By("watching for the daemon set status to be patched") 964 ctx, cancel = context.WithTimeout(context.Background(), dsRetryTimeout) 965 defer cancel() 966 _, err = watchtools.Until(ctx, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) { 967 if ds, ok := event.Object.(*appsv1.DaemonSet); ok { 968 found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name && 969 ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace && 970 ds.Labels[daemonsetNameLabel] == dsName 971 if !found { 972 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) 973 return false, nil 974 } 975 for _, cond := range ds.Status.Conditions { 976 if cond.Type == "StatusPatched" { 977 framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions) 978 return found, nil 979 } 980 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) 981 } 982 } 983 object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0] 984 framework.Logf("Observed %v event: %v", object, event.Type) 985 return false, nil 986 }) 987 framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns) 988 framework.Logf("Daemon set %s has a patched status", dsName) 989 }) 990}) 991 992// randomPod selects a random pod within pods that causes fn to return true, or nil 993// if no pod can be found matching the criteria. 994func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod { 995 podCount := len(pods) 996 for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ { 997 pod := &pods[(offset+i)%podCount] 998 if fn(pod) { 999 return pod 1000 } 1001 } 1002 return nil 1003} 1004 1005// getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image 1006func getDaemonSetImagePatch(containerName, containerImage string) string { 1007 return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage) 1008} 1009 1010func newDaemonSet(dsName, image string, label map[string]string) *appsv1.DaemonSet { 1011 return &appsv1.DaemonSet{ 1012 ObjectMeta: metav1.ObjectMeta{ 1013 Name: dsName, 1014 }, 1015 Spec: appsv1.DaemonSetSpec{ 1016 Selector: &metav1.LabelSelector{ 1017 MatchLabels: label, 1018 }, 1019 Template: v1.PodTemplateSpec{ 1020 ObjectMeta: metav1.ObjectMeta{ 1021 Labels: label, 1022 }, 1023 Spec: v1.PodSpec{ 1024 Containers: []v1.Container{ 1025 { 1026 Name: "app", 1027 Image: image, 1028 Ports: []v1.ContainerPort{{ContainerPort: 9376}}, 1029 }, 1030 }, 1031 }, 1032 }, 1033 }, 1034 } 1035} 1036 1037func newDaemonSetWithLabel(dsName, image string, label map[string]string) *appsv1.DaemonSet { 1038 return &appsv1.DaemonSet{ 1039 ObjectMeta: metav1.ObjectMeta{ 1040 Name: dsName, 1041 Labels: label, 1042 }, 1043 Spec: appsv1.DaemonSetSpec{ 1044 Selector: &metav1.LabelSelector{ 1045 MatchLabels: label, 1046 }, 1047 Template: v1.PodTemplateSpec{ 1048 ObjectMeta: metav1.ObjectMeta{ 1049 Labels: label, 1050 }, 1051 Spec: v1.PodSpec{ 1052 Containers: []v1.Container{ 1053 { 1054 Name: "app", 1055 Image: image, 1056 Ports: []v1.ContainerPort{{ContainerPort: 9376}}, 1057 }, 1058 }, 1059 }, 1060 }, 1061 }, 1062 } 1063} 1064 1065func listDaemonPods(c clientset.Interface, ns string, label map[string]string) *v1.PodList { 1066 selector := labels.Set(label).AsSelector() 1067 options := metav1.ListOptions{LabelSelector: selector.String()} 1068 podList, err := c.CoreV1().Pods(ns).List(context.TODO(), options) 1069 framework.ExpectNoError(err) 1070 gomega.Expect(len(podList.Items)).To(gomega.BeNumerically(">", 0)) 1071 return podList 1072} 1073 1074func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) { 1075 daemonSetLabels := map[string]string{} 1076 otherLabels := map[string]string{} 1077 for k, v := range labels { 1078 if strings.HasPrefix(k, daemonsetLabelPrefix) { 1079 daemonSetLabels[k] = v 1080 } else { 1081 otherLabels[k] = v 1082 } 1083 } 1084 return daemonSetLabels, otherLabels 1085} 1086 1087func clearDaemonSetNodeLabels(c clientset.Interface) error { 1088 nodeList, err := e2enode.GetReadySchedulableNodes(c) 1089 if err != nil { 1090 return err 1091 } 1092 for _, node := range nodeList.Items { 1093 _, err := setDaemonSetNodeLabels(c, node.Name, map[string]string{}) 1094 if err != nil { 1095 return err 1096 } 1097 } 1098 return nil 1099} 1100 1101// patchNamespaceAnnotations sets node selectors related annotations on tests namespaces to empty 1102func patchNamespaceAnnotations(c clientset.Interface, nsName string) (*v1.Namespace, error) { 1103 nsClient := c.CoreV1().Namespaces() 1104 1105 annotations := make(map[string]string) 1106 for _, n := range NamespaceNodeSelectors { 1107 annotations[n] = "" 1108 } 1109 nsPatch, err := json.Marshal(map[string]interface{}{ 1110 "metadata": map[string]interface{}{ 1111 "annotations": annotations, 1112 }, 1113 }) 1114 if err != nil { 1115 return nil, err 1116 } 1117 1118 return nsClient.Patch(context.TODO(), nsName, types.StrategicMergePatchType, nsPatch, metav1.PatchOptions{}) 1119} 1120 1121func setDaemonSetNodeLabels(c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) { 1122 nodeClient := c.CoreV1().Nodes() 1123 var newNode *v1.Node 1124 var newLabels map[string]string 1125 err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, func() (bool, error) { 1126 node, err := nodeClient.Get(context.TODO(), nodeName, metav1.GetOptions{}) 1127 if err != nil { 1128 return false, err 1129 } 1130 1131 // remove all labels this test is creating 1132 daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels) 1133 if reflect.DeepEqual(daemonSetLabels, labels) { 1134 newNode = node 1135 return true, nil 1136 } 1137 node.Labels = otherLabels 1138 for k, v := range labels { 1139 node.Labels[k] = v 1140 } 1141 newNode, err = nodeClient.Update(context.TODO(), node, metav1.UpdateOptions{}) 1142 if err == nil { 1143 newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels) 1144 return true, err 1145 } 1146 if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { 1147 framework.Logf("failed to update node due to resource version conflict") 1148 return false, nil 1149 } 1150 return false, err 1151 }) 1152 if err != nil { 1153 return nil, err 1154 } else if len(newLabels) != len(labels) { 1155 return nil, fmt.Errorf("could not set daemon set test labels as expected") 1156 } 1157 1158 return newNode, nil 1159} 1160 1161func checkDaemonPodOnNodes(f *framework.Framework, ds *appsv1.DaemonSet, nodeNames []string) func() (bool, error) { 1162 return func() (bool, error) { 1163 podList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}) 1164 if err != nil { 1165 framework.Logf("could not get the pod list: %v", err) 1166 return false, nil 1167 } 1168 pods := podList.Items 1169 1170 nodesToPodCount := make(map[string]int) 1171 for _, pod := range pods { 1172 if !metav1.IsControlledBy(&pod, ds) { 1173 continue 1174 } 1175 if pod.DeletionTimestamp != nil { 1176 continue 1177 } 1178 if podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) { 1179 nodesToPodCount[pod.Spec.NodeName]++ 1180 } 1181 } 1182 framework.Logf("Number of nodes with available pods: %d", len(nodesToPodCount)) 1183 1184 // Ensure that exactly 1 pod is running on all nodes in nodeNames. 1185 for _, nodeName := range nodeNames { 1186 if nodesToPodCount[nodeName] != 1 { 1187 framework.Logf("Node %s is running more than one daemon pod", nodeName) 1188 return false, nil 1189 } 1190 } 1191 1192 framework.Logf("Number of running nodes: %d, number of available pods: %d", len(nodeNames), len(nodesToPodCount)) 1193 // Ensure that sizes of the lists are the same. We've verified that every element of nodeNames is in 1194 // nodesToPodCount, so verifying the lengths are equal ensures that there aren't pods running on any 1195 // other nodes. 1196 return len(nodesToPodCount) == len(nodeNames), nil 1197 } 1198} 1199 1200func checkRunningOnAllNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) { 1201 return func() (bool, error) { 1202 nodeNames := schedulableNodes(f.ClientSet, ds) 1203 return checkDaemonPodOnNodes(f, ds, nodeNames)() 1204 } 1205} 1206 1207func schedulableNodes(c clientset.Interface, ds *appsv1.DaemonSet) []string { 1208 nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) 1209 framework.ExpectNoError(err) 1210 nodeNames := make([]string, 0) 1211 for _, node := range nodeList.Items { 1212 if !canScheduleOnNode(node, ds) { 1213 framework.Logf("DaemonSet pods can't tolerate node %s with taints %+v, skip checking this node", node.Name, node.Spec.Taints) 1214 continue 1215 } 1216 nodeNames = append(nodeNames, node.Name) 1217 } 1218 return nodeNames 1219} 1220 1221func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func() (bool, error) { 1222 return func() (bool, error) { 1223 pods := listDaemonPods(c, ns, label) 1224 for _, pod := range pods.Items { 1225 if pod.Spec.Containers[0].Image == newImage { 1226 return true, nil 1227 } 1228 } 1229 return false, nil 1230 } 1231} 1232 1233// canScheduleOnNode checks if a given DaemonSet can schedule pods on the given node 1234func canScheduleOnNode(node v1.Node, ds *appsv1.DaemonSet) bool { 1235 newPod := daemon.NewPod(ds, node.Name) 1236 fitsNodeName, fitsNodeAffinity, fitsTaints := daemon.Predicates(newPod, &node, node.Spec.Taints) 1237 return fitsNodeName && fitsNodeAffinity && fitsTaints 1238} 1239 1240func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) { 1241 return checkDaemonPodOnNodes(f, ds, make([]string, 0)) 1242} 1243 1244func checkDaemonStatus(f *framework.Framework, dsName string) error { 1245 ds, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Get(context.TODO(), dsName, metav1.GetOptions{}) 1246 if err != nil { 1247 return fmt.Errorf("could not get daemon set from v1") 1248 } 1249 desired, scheduled, ready := ds.Status.DesiredNumberScheduled, ds.Status.CurrentNumberScheduled, ds.Status.NumberReady 1250 if desired != scheduled && desired != ready { 1251 return fmt.Errorf("error in daemon status. DesiredScheduled: %d, CurrentScheduled: %d, Ready: %d", desired, scheduled, ready) 1252 } 1253 return nil 1254} 1255 1256func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *appsv1.DaemonSet, image string, maxUnavailable int) func() (bool, error) { 1257 return func() (bool, error) { 1258 podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{}) 1259 if err != nil { 1260 return false, err 1261 } 1262 pods := podList.Items 1263 1264 unavailablePods := 0 1265 nodesToUpdatedPodCount := make(map[string]int) 1266 for _, pod := range pods { 1267 // Ignore the pod on the node that is supposed to be deleted 1268 if pod.DeletionTimestamp != nil { 1269 continue 1270 } 1271 if !metav1.IsControlledBy(&pod, ds) { 1272 continue 1273 } 1274 podImage := pod.Spec.Containers[0].Image 1275 if podImage != image { 1276 framework.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage) 1277 } else { 1278 nodesToUpdatedPodCount[pod.Spec.NodeName]++ 1279 } 1280 if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) { 1281 framework.Logf("Pod %s is not available", pod.Name) 1282 unavailablePods++ 1283 } 1284 } 1285 if unavailablePods > maxUnavailable { 1286 return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable) 1287 } 1288 // Make sure every daemon pod on the node has been updated 1289 nodeNames := schedulableNodes(c, ds) 1290 for _, node := range nodeNames { 1291 if nodesToUpdatedPodCount[node] == 0 { 1292 return false, nil 1293 } 1294 } 1295 return true, nil 1296 } 1297} 1298 1299func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) { 1300 for _, pod := range podList.Items { 1301 // Ignore all the DS pods that will be deleted 1302 if pod.DeletionTimestamp != nil { 1303 continue 1304 } 1305 podHash := pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] 1306 gomega.Expect(len(podHash)).To(gomega.BeNumerically(">", 0)) 1307 if len(hash) > 0 { 1308 framework.ExpectEqual(podHash, hash, "unexpected hash for pod %s", pod.Name) 1309 } 1310 } 1311} 1312 1313func waitForHistoryCreated(c clientset.Interface, ns string, label map[string]string, numHistory int) { 1314 listHistoryFn := func() (bool, error) { 1315 selector := labels.Set(label).AsSelector() 1316 options := metav1.ListOptions{LabelSelector: selector.String()} 1317 historyList, err := c.AppsV1().ControllerRevisions(ns).List(context.TODO(), options) 1318 if err != nil { 1319 return false, err 1320 } 1321 if len(historyList.Items) == numHistory { 1322 return true, nil 1323 } 1324 framework.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory) 1325 return false, nil 1326 } 1327 err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, listHistoryFn) 1328 framework.ExpectNoError(err, "error waiting for controllerrevisions to be created") 1329} 1330 1331func listDaemonHistories(c clientset.Interface, ns string, label map[string]string) *appsv1.ControllerRevisionList { 1332 selector := labels.Set(label).AsSelector() 1333 options := metav1.ListOptions{LabelSelector: selector.String()} 1334 historyList, err := c.AppsV1().ControllerRevisions(ns).List(context.TODO(), options) 1335 framework.ExpectNoError(err) 1336 gomega.Expect(len(historyList.Items)).To(gomega.BeNumerically(">", 0)) 1337 return historyList 1338} 1339 1340func curHistory(historyList *appsv1.ControllerRevisionList, ds *appsv1.DaemonSet) *appsv1.ControllerRevision { 1341 var curHistory *appsv1.ControllerRevision 1342 foundCurHistories := 0 1343 for i := range historyList.Items { 1344 history := &historyList.Items[i] 1345 // Every history should have the hash label 1346 gomega.Expect(len(history.Labels[appsv1.DefaultDaemonSetUniqueLabelKey])).To(gomega.BeNumerically(">", 0)) 1347 match, err := daemon.Match(ds, history) 1348 framework.ExpectNoError(err) 1349 if match { 1350 curHistory = history 1351 foundCurHistories++ 1352 } 1353 } 1354 framework.ExpectEqual(foundCurHistories, 1) 1355 gomega.Expect(curHistory).NotTo(gomega.BeNil()) 1356 return curHistory 1357} 1358 1359func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func() (bool, error) { 1360 return func() (bool, error) { 1361 if _, err := c.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil { 1362 if apierrors.IsNotFound(err) { 1363 return true, nil 1364 } 1365 return false, fmt.Errorf("failed to get failed daemon pod %q: %v", pod.Name, err) 1366 } 1367 return false, nil 1368 } 1369} 1370