1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package apps
18
19import (
20	"bytes"
21	"context"
22	"encoding/json"
23	"fmt"
24	"math/rand"
25	"reflect"
26	"sort"
27	"strings"
28	"text/tabwriter"
29	"time"
30
31	"k8s.io/client-go/tools/cache"
32
33	"github.com/onsi/ginkgo"
34	"github.com/onsi/gomega"
35	appsv1 "k8s.io/api/apps/v1"
36	v1 "k8s.io/api/core/v1"
37	apierrors "k8s.io/apimachinery/pkg/api/errors"
38	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39	"k8s.io/apimachinery/pkg/labels"
40	"k8s.io/apimachinery/pkg/runtime"
41	"k8s.io/apimachinery/pkg/runtime/schema"
42	"k8s.io/apimachinery/pkg/types"
43	"k8s.io/apimachinery/pkg/util/intstr"
44	"k8s.io/apimachinery/pkg/util/sets"
45	"k8s.io/apimachinery/pkg/util/wait"
46	watch "k8s.io/apimachinery/pkg/watch"
47	clientset "k8s.io/client-go/kubernetes"
48	"k8s.io/client-go/kubernetes/scheme"
49	watchtools "k8s.io/client-go/tools/watch"
50	"k8s.io/client-go/util/retry"
51	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
52	extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
53	"k8s.io/kubernetes/pkg/controller/daemon"
54	"k8s.io/kubernetes/test/e2e/framework"
55	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
56	e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
57)
58
59const (
60	// this should not be a multiple of 5, because node status updates
61	// every 5 seconds. See https://github.com/kubernetes/kubernetes/pull/14915.
62	dsRetryPeriod  = 1 * time.Second
63	dsRetryTimeout = 5 * time.Minute
64
65	daemonsetLabelPrefix = "daemonset-"
66	daemonsetNameLabel   = daemonsetLabelPrefix + "name"
67	daemonsetColorLabel  = daemonsetLabelPrefix + "color"
68)
69
70// NamespaceNodeSelectors the annotation key scheduler.alpha.kubernetes.io/node-selector is for assigning
71// node selectors labels to namespaces
72var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"}
73
74type updateDSFunc func(*appsv1.DaemonSet)
75
76// updateDaemonSetWithRetries updates daemonsets with the given applyUpdate func
77// until it succeeds or a timeout expires.
78func updateDaemonSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateDSFunc) (ds *appsv1.DaemonSet, err error) {
79	daemonsets := c.AppsV1().DaemonSets(namespace)
80	var updateErr error
81	pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
82		if ds, err = daemonsets.Get(context.TODO(), name, metav1.GetOptions{}); err != nil {
83			return false, err
84		}
85		// Apply the update, then attempt to push it to the apiserver.
86		applyUpdate(ds)
87		if ds, err = daemonsets.Update(context.TODO(), ds, metav1.UpdateOptions{}); err == nil {
88			framework.Logf("Updating DaemonSet %s", name)
89			return true, nil
90		}
91		updateErr = err
92		return false, nil
93	})
94	if pollErr == wait.ErrWaitTimeout {
95		pollErr = fmt.Errorf("couldn't apply the provided updated to DaemonSet %q: %v", name, updateErr)
96	}
97	return ds, pollErr
98}
99
100// This test must be run in serial because it assumes the Daemon Set pods will
101// always get scheduled.  If we run other tests in parallel, this may not
102// happen.  In the future, running in parallel may work if we have an eviction
103// model which lets the DS controller kick out other pods to make room.
104// See http://issues.k8s.io/21767 for more details
105var _ = SIGDescribe("Daemon set [Serial]", func() {
106	var f *framework.Framework
107
108	ginkgo.AfterEach(func() {
109		// Clean up
110		daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{})
111		framework.ExpectNoError(err, "unable to dump DaemonSets")
112		if daemonsets != nil && len(daemonsets.Items) > 0 {
113			for _, ds := range daemonsets.Items {
114				ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
115				framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
116				err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, &ds))
117				framework.ExpectNoError(err, "error waiting for daemon pod to be reaped")
118			}
119		}
120		if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}); err == nil {
121			framework.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets))
122		} else {
123			framework.Logf("unable to dump daemonsets: %v", err)
124		}
125		if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}); err == nil {
126			framework.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods))
127		} else {
128			framework.Logf("unable to dump pods: %v", err)
129		}
130		err = clearDaemonSetNodeLabels(f.ClientSet)
131		framework.ExpectNoError(err)
132	})
133
134	f = framework.NewDefaultFramework("daemonsets")
135
136	image := WebserverImage
137	dsName := "daemon-set"
138
139	var ns string
140	var c clientset.Interface
141
142	ginkgo.BeforeEach(func() {
143		ns = f.Namespace.Name
144
145		c = f.ClientSet
146
147		updatedNS, err := patchNamespaceAnnotations(c, ns)
148		framework.ExpectNoError(err)
149
150		ns = updatedNS.Name
151
152		err = clearDaemonSetNodeLabels(c)
153		framework.ExpectNoError(err)
154	})
155
156	/*
157	  Release: v1.10
158	  Testname: DaemonSet-Creation
159	  Description: A conformant Kubernetes distribution MUST support the creation of DaemonSets. When a DaemonSet
160	  Pod is deleted, the DaemonSet controller MUST create a replacement Pod.
161	*/
162	framework.ConformanceIt("should run and stop simple daemon", func() {
163		label := map[string]string{daemonsetNameLabel: dsName}
164
165		ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
166		ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSet(dsName, image, label), metav1.CreateOptions{})
167		framework.ExpectNoError(err)
168
169		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
170		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
171		framework.ExpectNoError(err, "error waiting for daemon pod to start")
172		err = checkDaemonStatus(f, dsName)
173		framework.ExpectNoError(err)
174
175		ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.")
176		podList := listDaemonPods(c, ns, label)
177		pod := podList.Items[0]
178		err = c.CoreV1().Pods(ns).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
179		framework.ExpectNoError(err)
180		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
181		framework.ExpectNoError(err, "error waiting for daemon pod to revive")
182	})
183
184	/*
185	  Release: v1.10
186	  Testname: DaemonSet-NodeSelection
187	  Description: A conformant Kubernetes distribution MUST support DaemonSet Pod node selection via label
188	  selectors.
189	*/
190	framework.ConformanceIt("should run and stop complex daemon", func() {
191		complexLabel := map[string]string{daemonsetNameLabel: dsName}
192		nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
193		framework.Logf("Creating daemon %q with a node selector", dsName)
194		ds := newDaemonSet(dsName, image, complexLabel)
195		ds.Spec.Template.Spec.NodeSelector = nodeSelector
196		ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
197		framework.ExpectNoError(err)
198
199		ginkgo.By("Initially, daemon pods should not be running on any nodes.")
200		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
201		framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
202
203		ginkgo.By("Change node label to blue, check that daemon pod is launched.")
204		node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
205		framework.ExpectNoError(err)
206		newNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector)
207		framework.ExpectNoError(err, "error setting labels on node")
208		daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
209		framework.ExpectEqual(len(daemonSetLabels), 1)
210		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name}))
211		framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
212		err = checkDaemonStatus(f, dsName)
213		framework.ExpectNoError(err)
214
215		ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled")
216		nodeSelector[daemonsetColorLabel] = "green"
217		greenNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector)
218		framework.ExpectNoError(err, "error removing labels on node")
219		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
220		framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
221
222		ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate")
223		patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`,
224			daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel])
225		ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
226		framework.ExpectNoError(err, "error patching daemon set")
227		daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels)
228		framework.ExpectEqual(len(daemonSetLabels), 1)
229		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{greenNode.Name}))
230		framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
231		err = checkDaemonStatus(f, dsName)
232		framework.ExpectNoError(err)
233	})
234
235	// We defer adding this test to conformance pending the disposition of moving DaemonSet scheduling logic to the
236	// default scheduler.
237	ginkgo.It("should run and stop complex daemon with node affinity", func() {
238		complexLabel := map[string]string{daemonsetNameLabel: dsName}
239		nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
240		framework.Logf("Creating daemon %q with a node affinity", dsName)
241		ds := newDaemonSet(dsName, image, complexLabel)
242		ds.Spec.Template.Spec.Affinity = &v1.Affinity{
243			NodeAffinity: &v1.NodeAffinity{
244				RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
245					NodeSelectorTerms: []v1.NodeSelectorTerm{
246						{
247							MatchExpressions: []v1.NodeSelectorRequirement{
248								{
249									Key:      daemonsetColorLabel,
250									Operator: v1.NodeSelectorOpIn,
251									Values:   []string{nodeSelector[daemonsetColorLabel]},
252								},
253							},
254						},
255					},
256				},
257			},
258		}
259		ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
260		framework.ExpectNoError(err)
261
262		ginkgo.By("Initially, daemon pods should not be running on any nodes.")
263		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
264		framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
265
266		ginkgo.By("Change node label to blue, check that daemon pod is launched.")
267		node, err := e2enode.GetRandomReadySchedulableNode(f.ClientSet)
268		framework.ExpectNoError(err)
269		newNode, err := setDaemonSetNodeLabels(c, node.Name, nodeSelector)
270		framework.ExpectNoError(err, "error setting labels on node")
271		daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
272		framework.ExpectEqual(len(daemonSetLabels), 1)
273		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodOnNodes(f, ds, []string{newNode.Name}))
274		framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
275		err = checkDaemonStatus(f, dsName)
276		framework.ExpectNoError(err)
277
278		ginkgo.By("Remove the node label and wait for daemons to be unscheduled")
279		_, err = setDaemonSetNodeLabels(c, node.Name, map[string]string{})
280		framework.ExpectNoError(err, "error removing labels on node")
281		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnNoNodes(f, ds))
282		framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
283	})
284
285	/*
286	  Release: v1.10
287	  Testname: DaemonSet-FailedPodCreation
288	  Description: A conformant Kubernetes distribution MUST create new DaemonSet Pods when they fail.
289	*/
290	framework.ConformanceIt("should retry creating failed daemon pods", func() {
291		label := map[string]string{daemonsetNameLabel: dsName}
292
293		ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
294		ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSet(dsName, image, label), metav1.CreateOptions{})
295		framework.ExpectNoError(err)
296
297		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
298		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
299		framework.ExpectNoError(err, "error waiting for daemon pod to start")
300		err = checkDaemonStatus(f, dsName)
301		framework.ExpectNoError(err)
302
303		ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
304		podList := listDaemonPods(c, ns, label)
305		pod := podList.Items[0]
306		pod.ResourceVersion = ""
307		pod.Status.Phase = v1.PodFailed
308		_, err = c.CoreV1().Pods(ns).UpdateStatus(context.TODO(), &pod, metav1.UpdateOptions{})
309		framework.ExpectNoError(err, "error failing a daemon pod")
310		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
311		framework.ExpectNoError(err, "error waiting for daemon pod to revive")
312
313		ginkgo.By("Wait for the failed daemon pod to be completely deleted.")
314		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, waitFailedDaemonPodDeleted(c, &pod))
315		framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted")
316	})
317
318	// This test should not be added to conformance. We will consider deprecating OnDelete when the
319	// extensions/v1beta1 and apps/v1beta1 are removed.
320	ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func() {
321		label := map[string]string{daemonsetNameLabel: dsName}
322
323		framework.Logf("Creating simple daemon set %s", dsName)
324		ds := newDaemonSet(dsName, image, label)
325		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}
326		ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
327		framework.ExpectNoError(err)
328
329		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
330		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
331		framework.ExpectNoError(err, "error waiting for daemon pod to start")
332
333		// Check history and labels
334		ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
335		framework.ExpectNoError(err)
336		waitForHistoryCreated(c, ns, label, 1)
337		first := curHistory(listDaemonHistories(c, ns, label), ds)
338		firstHash := first.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
339		framework.ExpectEqual(first.Revision, int64(1))
340		checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash)
341
342		ginkgo.By("Update daemon pods image.")
343		patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
344		ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
345		framework.ExpectNoError(err)
346
347		ginkgo.By("Check that daemon pods images aren't updated.")
348		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, ds, image, 0))
349		framework.ExpectNoError(err)
350
351		ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
352		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
353		framework.ExpectNoError(err, "error waiting for daemon pod to start")
354
355		// Check history and labels
356		ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
357		framework.ExpectNoError(err)
358		waitForHistoryCreated(c, ns, label, 2)
359		cur := curHistory(listDaemonHistories(c, ns, label), ds)
360		framework.ExpectEqual(cur.Revision, int64(2))
361		framework.ExpectNotEqual(cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey], firstHash)
362		checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash)
363	})
364
365	/*
366	  Release: v1.10
367	  Testname: DaemonSet-RollingUpdate
368	  Description: A conformant Kubernetes distribution MUST support DaemonSet RollingUpdates.
369	*/
370	framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func() {
371		label := map[string]string{daemonsetNameLabel: dsName}
372
373		framework.Logf("Creating simple daemon set %s", dsName)
374		ds := newDaemonSet(dsName, image, label)
375		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
376		ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
377		framework.ExpectNoError(err)
378
379		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
380		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
381		framework.ExpectNoError(err, "error waiting for daemon pod to start")
382
383		// Check history and labels
384		ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
385		framework.ExpectNoError(err)
386		waitForHistoryCreated(c, ns, label, 1)
387		cur := curHistory(listDaemonHistories(c, ns, label), ds)
388		hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
389		framework.ExpectEqual(cur.Revision, int64(1))
390		checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
391
392		ginkgo.By("Update daemon pods image.")
393		patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
394		ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
395		framework.ExpectNoError(err)
396
397		// Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
398		// Get the number of nodes, and set the timeout appropriately.
399		nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
400		framework.ExpectNoError(err)
401		nodeCount := len(nodes.Items)
402		retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
403
404		ginkgo.By("Check that daemon pods images are updated.")
405		err = wait.PollImmediate(dsRetryPeriod, retryTimeout, checkDaemonPodsImageAndAvailability(c, ds, AgnhostImage, 1))
406		framework.ExpectNoError(err)
407
408		ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
409		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
410		framework.ExpectNoError(err, "error waiting for daemon pod to start")
411
412		// Check history and labels
413		ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
414		framework.ExpectNoError(err)
415		waitForHistoryCreated(c, ns, label, 2)
416		cur = curHistory(listDaemonHistories(c, ns, label), ds)
417		hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
418		framework.ExpectEqual(cur.Revision, int64(2))
419		checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
420	})
421
422	/*
423	  Release: v1.10
424	  Testname: DaemonSet-Rollback
425	  Description: A conformant Kubernetes distribution MUST support automated, minimally disruptive
426	  rollback of updates to a DaemonSet.
427	*/
428	framework.ConformanceIt("should rollback without unnecessary restarts", func() {
429		schedulableNodes, err := e2enode.GetReadySchedulableNodes(c)
430		framework.ExpectNoError(err)
431		gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.")
432		framework.Logf("Create a RollingUpdate DaemonSet")
433		label := map[string]string{daemonsetNameLabel: dsName}
434		ds := newDaemonSet(dsName, image, label)
435		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
436		ds, err = c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
437		framework.ExpectNoError(err)
438
439		framework.Logf("Check that daemon pods launch on every node of the cluster")
440		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
441		framework.ExpectNoError(err, "error waiting for daemon pod to start")
442
443		framework.Logf("Update the DaemonSet to trigger a rollout")
444		// We use a nonexistent image here, so that we make sure it won't finish
445		newImage := "foo:non-existent"
446		newDS, err := updateDaemonSetWithRetries(c, ns, ds.Name, func(update *appsv1.DaemonSet) {
447			update.Spec.Template.Spec.Containers[0].Image = newImage
448		})
449		framework.ExpectNoError(err)
450
451		// Make sure we're in the middle of a rollout
452		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkAtLeastOneNewPod(c, ns, label, newImage))
453		framework.ExpectNoError(err)
454
455		pods := listDaemonPods(c, ns, label)
456		var existingPods, newPods []*v1.Pod
457		for i := range pods.Items {
458			pod := pods.Items[i]
459			image := pod.Spec.Containers[0].Image
460			switch image {
461			case ds.Spec.Template.Spec.Containers[0].Image:
462				existingPods = append(existingPods, &pod)
463			case newDS.Spec.Template.Spec.Containers[0].Image:
464				newPods = append(newPods, &pod)
465			default:
466				framework.Failf("unexpected pod found, image = %s", image)
467			}
468		}
469		schedulableNodes, err = e2enode.GetReadySchedulableNodes(c)
470		framework.ExpectNoError(err)
471		if len(schedulableNodes.Items) < 2 {
472			framework.ExpectEqual(len(existingPods), 0)
473		} else {
474			framework.ExpectNotEqual(len(existingPods), 0)
475		}
476		framework.ExpectNotEqual(len(newPods), 0)
477
478		framework.Logf("Roll back the DaemonSet before rollout is complete")
479		rollbackDS, err := updateDaemonSetWithRetries(c, ns, ds.Name, func(update *appsv1.DaemonSet) {
480			update.Spec.Template.Spec.Containers[0].Image = image
481		})
482		framework.ExpectNoError(err)
483
484		framework.Logf("Make sure DaemonSet rollback is complete")
485		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1))
486		framework.ExpectNoError(err)
487
488		// After rollback is done, compare current pods with previous old pods during rollout, to make sure they're not restarted
489		pods = listDaemonPods(c, ns, label)
490		rollbackPods := map[string]bool{}
491		for _, pod := range pods.Items {
492			rollbackPods[pod.Name] = true
493		}
494		for _, pod := range existingPods {
495			framework.ExpectEqual(rollbackPods[pod.Name], true, fmt.Sprintf("unexpected pod %s be restarted", pod.Name))
496		}
497	})
498
499	// TODO: This test is expected to be promoted to conformance after the feature is promoted
500	ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate", func() {
501		label := map[string]string{daemonsetNameLabel: dsName}
502
503		framework.Logf("Creating surge daemon set %s", dsName)
504		maxSurgeOverlap := 60 * time.Second
505		maxSurge := 1
506		surgePercent := intstr.FromString("20%")
507		zero := intstr.FromInt(0)
508		oldVersion := "1"
509		ds := newDaemonSet(dsName, image, label)
510		ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{
511			{Name: "VERSION", Value: oldVersion},
512		}
513		// delay shutdown by 15s to allow containers to overlap in time
514		ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{
515			PreStop: &v1.Handler{
516				Exec: &v1.ExecAction{
517					Command: []string{"/bin/sh", "-c", "sleep 15"},
518				},
519			},
520		}
521		// use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready)
522		ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
523			Handler: v1.Handler{
524				Exec: &v1.ExecAction{
525					Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`},
526				},
527			},
528			InitialDelaySeconds: 7,
529			PeriodSeconds:       3,
530			SuccessThreshold:    1,
531			FailureThreshold:    1,
532		}
533		// use a simple surge strategy
534		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
535			Type: appsv1.RollingUpdateDaemonSetStrategyType,
536			RollingUpdate: &appsv1.RollingUpdateDaemonSet{
537				MaxUnavailable: &zero,
538				MaxSurge:       &surgePercent,
539			},
540		}
541		// The pod must be ready for at least 10s before we delete the old pod
542		ds.Spec.MinReadySeconds = 10
543
544		ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
545		framework.ExpectNoError(err)
546
547		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
548		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
549		framework.ExpectNoError(err, "error waiting for daemon pod to start")
550
551		// Check history and labels
552		ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
553		framework.ExpectNoError(err)
554		waitForHistoryCreated(c, ns, label, 1)
555		cur := curHistory(listDaemonHistories(c, ns, label), ds)
556		hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
557		framework.ExpectEqual(cur.Revision, int64(1))
558		checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
559
560		newVersion := "2"
561		ginkgo.By("Update daemon pods environment var")
562		patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion)
563		ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
564		framework.ExpectNoError(err)
565
566		// Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
567		// Get the number of nodes, and set the timeout appropriately.
568		nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
569		framework.ExpectNoError(err)
570		nodeCount := len(nodes.Items)
571		retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
572
573		ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout")
574		ageOfOldPod := make(map[string]time.Time)
575		deliberatelyDeletedPods := sets.NewString()
576		err = wait.PollImmediate(dsRetryPeriod, retryTimeout, func() (bool, error) {
577			podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
578			if err != nil {
579				return false, err
580			}
581			pods := podList.Items
582
583			var buf bytes.Buffer
584			pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0)
585			fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n")
586
587			now := time.Now()
588			podUIDs := sets.NewString()
589			deletedPodUIDs := sets.NewString()
590			nodes := sets.NewString()
591			versions := sets.NewString()
592			nodesToVersions := make(map[string]map[string]int)
593			nodesToDeletedVersions := make(map[string]map[string]int)
594			var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int
595			for _, pod := range pods {
596				if !metav1.IsControlledBy(&pod, ds) {
597					continue
598				}
599				nodeName := pod.Spec.NodeName
600				nodes.Insert(nodeName)
601				podVersion := pod.Spec.Containers[0].Env[0].Value
602				if pod.DeletionTimestamp != nil {
603					if !deliberatelyDeletedPods.Has(string(pod.UID)) {
604						versions := nodesToDeletedVersions[nodeName]
605						if versions == nil {
606							versions = make(map[string]int)
607							nodesToDeletedVersions[nodeName] = versions
608						}
609						versions[podVersion]++
610					}
611				} else {
612					versions := nodesToVersions[nodeName]
613					if versions == nil {
614						versions = make(map[string]int)
615						nodesToVersions[nodeName] = versions
616					}
617					versions[podVersion]++
618				}
619
620				ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
621				if podVersion == newVersion {
622					surgeCount++
623					if !ready || pod.DeletionTimestamp != nil {
624						if deliberatelyDeletedPods.Has(string(pod.UID)) {
625							newDeliberatelyDeletedCount++
626						}
627						newUnavailableCount++
628					}
629				} else {
630					if !ready || pod.DeletionTimestamp != nil {
631						oldUnavailableCount++
632					}
633				}
634				fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready)
635			}
636
637			// print a stable sorted list of pods by node for debugging
638			pw.Flush()
639			lines := strings.Split(buf.String(), "\n")
640			lines = lines[:len(lines)-1]
641			sort.Strings(lines[1:])
642			for _, line := range lines {
643				framework.Logf("%s", line)
644			}
645
646			// if there is an old and new pod at the same time, record a timestamp
647			deletedPerNode := make(map[string]int)
648			for _, pod := range pods {
649				if !metav1.IsControlledBy(&pod, ds) {
650					continue
651				}
652				// ignore deleted pods
653				if pod.DeletionTimestamp != nil {
654					deletedPodUIDs.Insert(string(pod.UID))
655					if !deliberatelyDeletedPods.Has(string(pod.UID)) {
656						deletedPerNode[pod.Spec.NodeName]++
657					}
658					continue
659				}
660				podUIDs.Insert(string(pod.UID))
661				podVersion := pod.Spec.Containers[0].Env[0].Value
662				if podVersion == newVersion {
663					continue
664				}
665				// if this is a pod in an older version AND there is a new version of this pod, record when
666				// we started seeing this, otherwise delete the record (perhaps the node was drained)
667				if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 {
668					if _, ok := ageOfOldPod[string(pod.UID)]; !ok {
669						ageOfOldPod[string(pod.UID)] = now
670					}
671				} else {
672					delete(ageOfOldPod, string(pod.UID))
673				}
674			}
675			// purge the old pods list of any deleted pods
676			for uid := range ageOfOldPod {
677				if !podUIDs.Has(uid) {
678					delete(ageOfOldPod, uid)
679				}
680			}
681			deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs)
682
683			for _, versions := range nodesToVersions {
684				if versions[oldVersion] == 0 {
685					nodesWithoutOldVersion++
686				}
687			}
688
689			var errs []string
690
691			// invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving
692			for node, count := range deletedPerNode {
693				if count > 1 {
694					errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count))
695				}
696			}
697
698			// invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period)
699			for uid, firstSeen := range ageOfOldPod {
700				if now.Sub(firstSeen) > maxSurgeOverlap {
701					errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap))
702				}
703			}
704
705			// invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or
706			//            if we deliberately deleted one of the new pods
707			if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) {
708				errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion))
709			}
710			// invariant: the total number of versions created should be 2
711			if versions.Len() > 2 {
712				errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len()))
713			}
714			for _, node := range nodes.List() {
715				// ignore pods that haven't been scheduled yet
716				if len(node) == 0 {
717					continue
718				}
719				versionCount := make(map[string]int)
720				// invariant: surge should never have more than one instance of a pod per node running
721				for version, count := range nodesToVersions[node] {
722					if count > 1 {
723						errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version))
724					}
725					versionCount[version] += count
726				}
727				// invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted)
728				for version, count := range nodesToDeletedVersions[node] {
729					if count > 2 {
730						errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version))
731					}
732					versionCount[version] += count
733				}
734				// invariant: on any node, we should never have more than two instances of a version (if we are getting evicted)
735				for version, count := range versionCount {
736					if count > 2 {
737						errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version))
738					}
739				}
740			}
741
742			if len(errs) > 0 {
743				sort.Strings(errs)
744				return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n"))
745			}
746
747			// Make sure every daemon pod on the node has been updated
748			nodeNames := schedulableNodes(c, ds)
749			for _, node := range nodeNames {
750				switch {
751				case
752					// if we don't have the new version yet
753					nodesToVersions[node][newVersion] == 0,
754					// if there are more than one version on a node
755					len(nodesToVersions[node]) > 1,
756					// if there are still any deleted pods
757					len(nodesToDeletedVersions[node]) > 0,
758					// if any of the new pods are unavailable
759					newUnavailableCount > 0:
760
761					// inject a failure randomly to ensure the controller recovers
762					switch rand.Intn(25) {
763					// cause a random old pod to go unready
764					case 0:
765						// select a not-deleted pod of the old version
766						if pod := randomPod(pods, func(pod *v1.Pod) bool {
767							return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value
768						}); pod != nil {
769							// make the /tmp/ready file read only, which will cause readiness to fail
770							if _, err := framework.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil {
771								framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err)
772							} else {
773								framework.Logf("Marked old pod %s as unready", pod.Name)
774							}
775						}
776					case 1:
777						// delete a random pod
778						if pod := randomPod(pods, func(pod *v1.Pod) bool {
779							return pod.DeletionTimestamp == nil
780						}); pod != nil {
781							if err := c.CoreV1().Pods(ds.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil {
782								framework.Logf("Failed to delete pod %s early: %v", pod.Name, err)
783							} else {
784								framework.Logf("Deleted pod %s prematurely", pod.Name)
785								deliberatelyDeletedPods.Insert(string(pod.UID))
786							}
787						}
788					}
789
790					// then wait
791					return false, nil
792				}
793			}
794			return true, nil
795		})
796		framework.ExpectNoError(err)
797
798		ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
799		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
800		framework.ExpectNoError(err, "error waiting for daemon pod to start")
801
802		// Check history and labels
803		ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
804		framework.ExpectNoError(err)
805		waitForHistoryCreated(c, ns, label, 2)
806		cur = curHistory(listDaemonHistories(c, ns, label), ds)
807		hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
808		framework.ExpectEqual(cur.Revision, int64(2))
809		checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
810	})
811
812	/*
813		Release: v1.22
814		Testname: DaemonSet, list and delete a collection of DaemonSets
815		Description: When a DaemonSet is created it MUST succeed. It
816		MUST succeed when listing DaemonSets via a label selector. It
817		MUST succeed when deleting the DaemonSet via deleteCollection.
818	*/
819	framework.ConformanceIt("should list and delete a collection of DaemonSets", func() {
820		label := map[string]string{daemonsetNameLabel: dsName}
821		labelSelector := labels.SelectorFromSet(label).String()
822
823		dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
824		cs := f.ClientSet
825		one := int64(1)
826
827		ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
828		testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
829		framework.ExpectNoError(err)
830
831		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
832		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, testDaemonset))
833		framework.ExpectNoError(err, "error waiting for daemon pod to start")
834		err = checkDaemonStatus(f, dsName)
835		framework.ExpectNoError(err)
836
837		ginkgo.By("listing all DeamonSets")
838		dsList, err := cs.AppsV1().DaemonSets("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
839		framework.ExpectNoError(err, "failed to list Daemon Sets")
840		framework.ExpectEqual(len(dsList.Items), 1, "filtered list wasn't found")
841
842		ginkgo.By("DeleteCollection of the DaemonSets")
843		err = dsClient.DeleteCollection(context.TODO(), metav1.DeleteOptions{GracePeriodSeconds: &one}, metav1.ListOptions{LabelSelector: labelSelector})
844		framework.ExpectNoError(err, "failed to delete DaemonSets")
845
846		ginkgo.By("Verify that ReplicaSets have been deleted")
847		dsList, err = c.AppsV1().DaemonSets("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
848		framework.ExpectNoError(err, "failed to list DaemonSets")
849		framework.ExpectEqual(len(dsList.Items), 0, "filtered list should have no daemonset")
850	})
851
852	/*	Release: v1.22
853		Testname: DaemonSet, status sub-resource
854		Description: When a DaemonSet is created it MUST succeed.
855		Attempt to read, update and patch its status sub-resource; all
856		mutating sub-resource operations MUST be visible to subsequent reads.
857	*/
858	framework.ConformanceIt("should verify changes to a daemon set status", func() {
859		label := map[string]string{daemonsetNameLabel: dsName}
860		labelSelector := labels.SelectorFromSet(label).String()
861
862		dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
863		cs := f.ClientSet
864
865		w := &cache.ListWatch{
866			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
867				options.LabelSelector = labelSelector
868				return dsClient.Watch(context.TODO(), options)
869			},
870		}
871
872		dsList, err := cs.AppsV1().DaemonSets("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
873		framework.ExpectNoError(err, "failed to list Daemon Sets")
874
875		ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
876		testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
877		framework.ExpectNoError(err)
878
879		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
880		err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, testDaemonset))
881		framework.ExpectNoError(err, "error waiting for daemon pod to start")
882		err = checkDaemonStatus(f, dsName)
883		framework.ExpectNoError(err)
884
885		ginkgo.By("Getting /status")
886		dsResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}
887		dsStatusUnstructured, err := f.DynamicClient.Resource(dsResource).Namespace(ns).Get(context.TODO(), dsName, metav1.GetOptions{}, "status")
888		framework.ExpectNoError(err, "Failed to fetch the status of daemon set %s in namespace %s", dsName, ns)
889		dsStatusBytes, err := json.Marshal(dsStatusUnstructured)
890		framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err)
891
892		var dsStatus appsv1.DaemonSet
893		err = json.Unmarshal(dsStatusBytes, &dsStatus)
894		framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a daemon set object type")
895		framework.Logf("Daemon Set %s has Conditions: %v", dsName, dsStatus.Status.Conditions)
896
897		ginkgo.By("updating the DaemonSet Status")
898		var statusToUpdate, updatedStatus *appsv1.DaemonSet
899
900		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
901			statusToUpdate, err = dsClient.Get(context.TODO(), dsName, metav1.GetOptions{})
902			framework.ExpectNoError(err, "Unable to retrieve daemon set %s", dsName)
903
904			statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, appsv1.DaemonSetCondition{
905				Type:    "StatusUpdate",
906				Status:  "True",
907				Reason:  "E2E",
908				Message: "Set from e2e test",
909			})
910
911			updatedStatus, err = dsClient.UpdateStatus(context.TODO(), statusToUpdate, metav1.UpdateOptions{})
912			return err
913		})
914		framework.ExpectNoError(err, "Failed to update status. %v", err)
915		framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions)
916
917		ginkgo.By("watching for the daemon set status to be updated")
918		ctx, cancel := context.WithTimeout(context.Background(), dsRetryTimeout)
919		defer cancel()
920		_, err = watchtools.Until(ctx, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
921			if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
922				found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
923					ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
924					ds.Labels[daemonsetNameLabel] == dsName
925				if !found {
926					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
927					return false, nil
928				}
929				for _, cond := range ds.Status.Conditions {
930					if cond.Type == "StatusUpdate" &&
931						cond.Reason == "E2E" &&
932						cond.Message == "Set from e2e test" {
933						framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
934						return found, nil
935					}
936					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
937				}
938			}
939			object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
940			framework.Logf("Observed %v event: %+v", object, event.Type)
941			return false, nil
942		})
943		framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
944		framework.Logf("Daemon set %s has an updated status", dsName)
945
946		ginkgo.By("patching the DaemonSet Status")
947		daemonSetStatusPatch := appsv1.DaemonSet{
948			Status: appsv1.DaemonSetStatus{
949				Conditions: []appsv1.DaemonSetCondition{
950					{
951						Type:   "StatusPatched",
952						Status: "True",
953					},
954				},
955			},
956		}
957
958		payload, err := json.Marshal(daemonSetStatusPatch)
959		framework.ExpectNoError(err, "Failed to marshal JSON. %v", err)
960		_, err = dsClient.Patch(context.TODO(), dsName, types.MergePatchType, payload, metav1.PatchOptions{}, "status")
961		framework.ExpectNoError(err, "Failed to patch daemon set status", err)
962
963		ginkgo.By("watching for the daemon set status to be patched")
964		ctx, cancel = context.WithTimeout(context.Background(), dsRetryTimeout)
965		defer cancel()
966		_, err = watchtools.Until(ctx, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
967			if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
968				found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
969					ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
970					ds.Labels[daemonsetNameLabel] == dsName
971				if !found {
972					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
973					return false, nil
974				}
975				for _, cond := range ds.Status.Conditions {
976					if cond.Type == "StatusPatched" {
977						framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
978						return found, nil
979					}
980					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
981				}
982			}
983			object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
984			framework.Logf("Observed %v event: %v", object, event.Type)
985			return false, nil
986		})
987		framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
988		framework.Logf("Daemon set %s has a patched status", dsName)
989	})
990})
991
992// randomPod selects a random pod within pods that causes fn to return true, or nil
993// if no pod can be found matching the criteria.
994func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod {
995	podCount := len(pods)
996	for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ {
997		pod := &pods[(offset+i)%podCount]
998		if fn(pod) {
999			return pod
1000		}
1001	}
1002	return nil
1003}
1004
1005// getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image
1006func getDaemonSetImagePatch(containerName, containerImage string) string {
1007	return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)
1008}
1009
1010func newDaemonSet(dsName, image string, label map[string]string) *appsv1.DaemonSet {
1011	return &appsv1.DaemonSet{
1012		ObjectMeta: metav1.ObjectMeta{
1013			Name: dsName,
1014		},
1015		Spec: appsv1.DaemonSetSpec{
1016			Selector: &metav1.LabelSelector{
1017				MatchLabels: label,
1018			},
1019			Template: v1.PodTemplateSpec{
1020				ObjectMeta: metav1.ObjectMeta{
1021					Labels: label,
1022				},
1023				Spec: v1.PodSpec{
1024					Containers: []v1.Container{
1025						{
1026							Name:  "app",
1027							Image: image,
1028							Ports: []v1.ContainerPort{{ContainerPort: 9376}},
1029						},
1030					},
1031				},
1032			},
1033		},
1034	}
1035}
1036
1037func newDaemonSetWithLabel(dsName, image string, label map[string]string) *appsv1.DaemonSet {
1038	return &appsv1.DaemonSet{
1039		ObjectMeta: metav1.ObjectMeta{
1040			Name:   dsName,
1041			Labels: label,
1042		},
1043		Spec: appsv1.DaemonSetSpec{
1044			Selector: &metav1.LabelSelector{
1045				MatchLabels: label,
1046			},
1047			Template: v1.PodTemplateSpec{
1048				ObjectMeta: metav1.ObjectMeta{
1049					Labels: label,
1050				},
1051				Spec: v1.PodSpec{
1052					Containers: []v1.Container{
1053						{
1054							Name:  "app",
1055							Image: image,
1056							Ports: []v1.ContainerPort{{ContainerPort: 9376}},
1057						},
1058					},
1059				},
1060			},
1061		},
1062	}
1063}
1064
1065func listDaemonPods(c clientset.Interface, ns string, label map[string]string) *v1.PodList {
1066	selector := labels.Set(label).AsSelector()
1067	options := metav1.ListOptions{LabelSelector: selector.String()}
1068	podList, err := c.CoreV1().Pods(ns).List(context.TODO(), options)
1069	framework.ExpectNoError(err)
1070	gomega.Expect(len(podList.Items)).To(gomega.BeNumerically(">", 0))
1071	return podList
1072}
1073
1074func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
1075	daemonSetLabels := map[string]string{}
1076	otherLabels := map[string]string{}
1077	for k, v := range labels {
1078		if strings.HasPrefix(k, daemonsetLabelPrefix) {
1079			daemonSetLabels[k] = v
1080		} else {
1081			otherLabels[k] = v
1082		}
1083	}
1084	return daemonSetLabels, otherLabels
1085}
1086
1087func clearDaemonSetNodeLabels(c clientset.Interface) error {
1088	nodeList, err := e2enode.GetReadySchedulableNodes(c)
1089	if err != nil {
1090		return err
1091	}
1092	for _, node := range nodeList.Items {
1093		_, err := setDaemonSetNodeLabels(c, node.Name, map[string]string{})
1094		if err != nil {
1095			return err
1096		}
1097	}
1098	return nil
1099}
1100
1101// patchNamespaceAnnotations sets node selectors related annotations on tests namespaces to empty
1102func patchNamespaceAnnotations(c clientset.Interface, nsName string) (*v1.Namespace, error) {
1103	nsClient := c.CoreV1().Namespaces()
1104
1105	annotations := make(map[string]string)
1106	for _, n := range NamespaceNodeSelectors {
1107		annotations[n] = ""
1108	}
1109	nsPatch, err := json.Marshal(map[string]interface{}{
1110		"metadata": map[string]interface{}{
1111			"annotations": annotations,
1112		},
1113	})
1114	if err != nil {
1115		return nil, err
1116	}
1117
1118	return nsClient.Patch(context.TODO(), nsName, types.StrategicMergePatchType, nsPatch, metav1.PatchOptions{})
1119}
1120
1121func setDaemonSetNodeLabels(c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) {
1122	nodeClient := c.CoreV1().Nodes()
1123	var newNode *v1.Node
1124	var newLabels map[string]string
1125	err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, func() (bool, error) {
1126		node, err := nodeClient.Get(context.TODO(), nodeName, metav1.GetOptions{})
1127		if err != nil {
1128			return false, err
1129		}
1130
1131		// remove all labels this test is creating
1132		daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels)
1133		if reflect.DeepEqual(daemonSetLabels, labels) {
1134			newNode = node
1135			return true, nil
1136		}
1137		node.Labels = otherLabels
1138		for k, v := range labels {
1139			node.Labels[k] = v
1140		}
1141		newNode, err = nodeClient.Update(context.TODO(), node, metav1.UpdateOptions{})
1142		if err == nil {
1143			newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels)
1144			return true, err
1145		}
1146		if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
1147			framework.Logf("failed to update node due to resource version conflict")
1148			return false, nil
1149		}
1150		return false, err
1151	})
1152	if err != nil {
1153		return nil, err
1154	} else if len(newLabels) != len(labels) {
1155		return nil, fmt.Errorf("could not set daemon set test labels as expected")
1156	}
1157
1158	return newNode, nil
1159}
1160
1161func checkDaemonPodOnNodes(f *framework.Framework, ds *appsv1.DaemonSet, nodeNames []string) func() (bool, error) {
1162	return func() (bool, error) {
1163		podList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{})
1164		if err != nil {
1165			framework.Logf("could not get the pod list: %v", err)
1166			return false, nil
1167		}
1168		pods := podList.Items
1169
1170		nodesToPodCount := make(map[string]int)
1171		for _, pod := range pods {
1172			if !metav1.IsControlledBy(&pod, ds) {
1173				continue
1174			}
1175			if pod.DeletionTimestamp != nil {
1176				continue
1177			}
1178			if podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
1179				nodesToPodCount[pod.Spec.NodeName]++
1180			}
1181		}
1182		framework.Logf("Number of nodes with available pods: %d", len(nodesToPodCount))
1183
1184		// Ensure that exactly 1 pod is running on all nodes in nodeNames.
1185		for _, nodeName := range nodeNames {
1186			if nodesToPodCount[nodeName] != 1 {
1187				framework.Logf("Node %s is running more than one daemon pod", nodeName)
1188				return false, nil
1189			}
1190		}
1191
1192		framework.Logf("Number of running nodes: %d, number of available pods: %d", len(nodeNames), len(nodesToPodCount))
1193		// Ensure that sizes of the lists are the same. We've verified that every element of nodeNames is in
1194		// nodesToPodCount, so verifying the lengths are equal ensures that there aren't pods running on any
1195		// other nodes.
1196		return len(nodesToPodCount) == len(nodeNames), nil
1197	}
1198}
1199
1200func checkRunningOnAllNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) {
1201	return func() (bool, error) {
1202		nodeNames := schedulableNodes(f.ClientSet, ds)
1203		return checkDaemonPodOnNodes(f, ds, nodeNames)()
1204	}
1205}
1206
1207func schedulableNodes(c clientset.Interface, ds *appsv1.DaemonSet) []string {
1208	nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
1209	framework.ExpectNoError(err)
1210	nodeNames := make([]string, 0)
1211	for _, node := range nodeList.Items {
1212		if !canScheduleOnNode(node, ds) {
1213			framework.Logf("DaemonSet pods can't tolerate node %s with taints %+v, skip checking this node", node.Name, node.Spec.Taints)
1214			continue
1215		}
1216		nodeNames = append(nodeNames, node.Name)
1217	}
1218	return nodeNames
1219}
1220
1221func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func() (bool, error) {
1222	return func() (bool, error) {
1223		pods := listDaemonPods(c, ns, label)
1224		for _, pod := range pods.Items {
1225			if pod.Spec.Containers[0].Image == newImage {
1226				return true, nil
1227			}
1228		}
1229		return false, nil
1230	}
1231}
1232
1233// canScheduleOnNode checks if a given DaemonSet can schedule pods on the given node
1234func canScheduleOnNode(node v1.Node, ds *appsv1.DaemonSet) bool {
1235	newPod := daemon.NewPod(ds, node.Name)
1236	fitsNodeName, fitsNodeAffinity, fitsTaints := daemon.Predicates(newPod, &node, node.Spec.Taints)
1237	return fitsNodeName && fitsNodeAffinity && fitsTaints
1238}
1239
1240func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func() (bool, error) {
1241	return checkDaemonPodOnNodes(f, ds, make([]string, 0))
1242}
1243
1244func checkDaemonStatus(f *framework.Framework, dsName string) error {
1245	ds, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Get(context.TODO(), dsName, metav1.GetOptions{})
1246	if err != nil {
1247		return fmt.Errorf("could not get daemon set from v1")
1248	}
1249	desired, scheduled, ready := ds.Status.DesiredNumberScheduled, ds.Status.CurrentNumberScheduled, ds.Status.NumberReady
1250	if desired != scheduled && desired != ready {
1251		return fmt.Errorf("error in daemon status. DesiredScheduled: %d, CurrentScheduled: %d, Ready: %d", desired, scheduled, ready)
1252	}
1253	return nil
1254}
1255
1256func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *appsv1.DaemonSet, image string, maxUnavailable int) func() (bool, error) {
1257	return func() (bool, error) {
1258		podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
1259		if err != nil {
1260			return false, err
1261		}
1262		pods := podList.Items
1263
1264		unavailablePods := 0
1265		nodesToUpdatedPodCount := make(map[string]int)
1266		for _, pod := range pods {
1267			// Ignore the pod on the node that is supposed to be deleted
1268			if pod.DeletionTimestamp != nil {
1269				continue
1270			}
1271			if !metav1.IsControlledBy(&pod, ds) {
1272				continue
1273			}
1274			podImage := pod.Spec.Containers[0].Image
1275			if podImage != image {
1276				framework.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage)
1277			} else {
1278				nodesToUpdatedPodCount[pod.Spec.NodeName]++
1279			}
1280			if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
1281				framework.Logf("Pod %s is not available", pod.Name)
1282				unavailablePods++
1283			}
1284		}
1285		if unavailablePods > maxUnavailable {
1286			return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable)
1287		}
1288		// Make sure every daemon pod on the node has been updated
1289		nodeNames := schedulableNodes(c, ds)
1290		for _, node := range nodeNames {
1291			if nodesToUpdatedPodCount[node] == 0 {
1292				return false, nil
1293			}
1294		}
1295		return true, nil
1296	}
1297}
1298
1299func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) {
1300	for _, pod := range podList.Items {
1301		// Ignore all the DS pods that will be deleted
1302		if pod.DeletionTimestamp != nil {
1303			continue
1304		}
1305		podHash := pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
1306		gomega.Expect(len(podHash)).To(gomega.BeNumerically(">", 0))
1307		if len(hash) > 0 {
1308			framework.ExpectEqual(podHash, hash, "unexpected hash for pod %s", pod.Name)
1309		}
1310	}
1311}
1312
1313func waitForHistoryCreated(c clientset.Interface, ns string, label map[string]string, numHistory int) {
1314	listHistoryFn := func() (bool, error) {
1315		selector := labels.Set(label).AsSelector()
1316		options := metav1.ListOptions{LabelSelector: selector.String()}
1317		historyList, err := c.AppsV1().ControllerRevisions(ns).List(context.TODO(), options)
1318		if err != nil {
1319			return false, err
1320		}
1321		if len(historyList.Items) == numHistory {
1322			return true, nil
1323		}
1324		framework.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory)
1325		return false, nil
1326	}
1327	err := wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, listHistoryFn)
1328	framework.ExpectNoError(err, "error waiting for controllerrevisions to be created")
1329}
1330
1331func listDaemonHistories(c clientset.Interface, ns string, label map[string]string) *appsv1.ControllerRevisionList {
1332	selector := labels.Set(label).AsSelector()
1333	options := metav1.ListOptions{LabelSelector: selector.String()}
1334	historyList, err := c.AppsV1().ControllerRevisions(ns).List(context.TODO(), options)
1335	framework.ExpectNoError(err)
1336	gomega.Expect(len(historyList.Items)).To(gomega.BeNumerically(">", 0))
1337	return historyList
1338}
1339
1340func curHistory(historyList *appsv1.ControllerRevisionList, ds *appsv1.DaemonSet) *appsv1.ControllerRevision {
1341	var curHistory *appsv1.ControllerRevision
1342	foundCurHistories := 0
1343	for i := range historyList.Items {
1344		history := &historyList.Items[i]
1345		// Every history should have the hash label
1346		gomega.Expect(len(history.Labels[appsv1.DefaultDaemonSetUniqueLabelKey])).To(gomega.BeNumerically(">", 0))
1347		match, err := daemon.Match(ds, history)
1348		framework.ExpectNoError(err)
1349		if match {
1350			curHistory = history
1351			foundCurHistories++
1352		}
1353	}
1354	framework.ExpectEqual(foundCurHistories, 1)
1355	gomega.Expect(curHistory).NotTo(gomega.BeNil())
1356	return curHistory
1357}
1358
1359func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func() (bool, error) {
1360	return func() (bool, error) {
1361		if _, err := c.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
1362			if apierrors.IsNotFound(err) {
1363				return true, nil
1364			}
1365			return false, fmt.Errorf("failed to get failed daemon pod %q: %v", pod.Name, err)
1366		}
1367		return false, nil
1368	}
1369}
1370