1package integrationtests
2
3import (
4	"context"
5	"fmt"
6	"io/ioutil"
7	"net"
8	"time"
9
10	. "github.com/onsi/ginkgo"
11	. "github.com/onsi/gomega"
12
13	corev1 "k8s.io/api/core/v1"
14	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15	"k8s.io/apimachinery/pkg/runtime/schema"
16	"k8s.io/client-go/kubernetes/scheme"
17	"k8s.io/client-go/rest"
18	"sigs.k8s.io/controller-runtime/pkg/internal/testing/integration"
19)
20
21var _ = Describe("The Testing Framework", func() {
22	var controlPlane *integration.ControlPlane
23	ctx := context.TODO()
24
25	AfterEach(func() {
26		Expect(controlPlane.Stop()).To(Succeed())
27	})
28
29	It("Successfully manages the control plane lifecycle", func() {
30		var err error
31
32		controlPlane = &integration.ControlPlane{}
33
34		By("Starting all the control plane processes")
35		err = controlPlane.Start()
36		Expect(err).NotTo(HaveOccurred(), "Expected controlPlane to start successfully")
37
38		apiServerURL := controlPlane.APIURL()
39		etcdClientURL := controlPlane.APIServer.EtcdURL
40
41		isEtcdListeningForClients := isSomethingListeningOnPort(etcdClientURL.Host)
42		isAPIServerListening := isSomethingListeningOnPort(apiServerURL.Host)
43
44		By("Ensuring Etcd is listening")
45		Expect(isEtcdListeningForClients()).To(BeTrue(),
46			fmt.Sprintf("Expected Etcd to listen for clients on %s,", etcdClientURL.Host))
47
48		By("Ensuring APIServer is listening")
49		c, err := controlPlane.RESTClientConfig()
50		Expect(err).NotTo(HaveOccurred())
51		CheckAPIServerIsReady(c)
52
53		By("getting a kubeclient & run it against the control plane")
54		c.APIPath = "/api"
55		c.ContentConfig.GroupVersion = &schema.GroupVersion{Version: "v1"}
56		kubeClient, err := rest.RESTClientFor(c)
57		Expect(err).NotTo(HaveOccurred())
58		result := &corev1.PodList{}
59		err = kubeClient.Get().
60			Namespace("default").
61			Resource("pods").
62			VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
63			Do(ctx).
64			Into(result)
65		Expect(err).NotTo(HaveOccurred())
66		Expect(result.Items).To(BeEmpty())
67
68		By("getting a kubectl & run it against the control plane")
69		kubeCtl := controlPlane.KubeCtl()
70		stdout, stderr, err := kubeCtl.Run("get", "pods")
71		Expect(err).NotTo(HaveOccurred())
72		bytes, err := ioutil.ReadAll(stdout)
73		Expect(err).NotTo(HaveOccurred())
74		Expect(bytes).To(BeEmpty())
75		Expect(stderr).To(ContainSubstring("No resources found"))
76
77		By("Stopping all the control plane processes")
78		err = controlPlane.Stop()
79		Expect(err).NotTo(HaveOccurred(), "Expected controlPlane to stop successfully")
80
81		By("Ensuring Etcd is not listening anymore")
82		Expect(isEtcdListeningForClients()).To(BeFalse(), "Expected Etcd not to listen for clients anymore")
83
84		By("Ensuring APIServer is not listening anymore")
85		Expect(isAPIServerListening()).To(BeFalse(), "Expected APIServer not to listen anymore")
86
87		By("Not erroring when stopping a stopped ControlPlane")
88		Expect(func() {
89			Expect(controlPlane.Stop()).To(Succeed())
90		}).NotTo(Panic())
91	})
92
93	Context("when Stop() is called on the control plane", func() {
94		Context("but the control plane is not started yet", func() {
95			It("does not error", func() {
96				controlPlane = &integration.ControlPlane{}
97
98				stoppingTheControlPlane := func() {
99					Expect(controlPlane.Stop()).To(Succeed())
100				}
101
102				Expect(stoppingTheControlPlane).NotTo(Panic())
103			})
104		})
105	})
106
107	Context("when the control plane is configured with its components", func() {
108		It("it does not default them", func() {
109			myEtcd, myAPIServer :=
110				&integration.Etcd{StartTimeout: 15 * time.Second},
111				&integration.APIServer{StopTimeout: 16 * time.Second}
112
113			controlPlane = &integration.ControlPlane{
114				Etcd:      myEtcd,
115				APIServer: myAPIServer,
116			}
117
118			Expect(controlPlane.Start()).To(Succeed())
119			Expect(controlPlane.Etcd).To(BeIdenticalTo(myEtcd))
120			Expect(controlPlane.APIServer).To(BeIdenticalTo(myAPIServer))
121			Expect(controlPlane.Etcd.StartTimeout).To(Equal(15 * time.Second))
122			Expect(controlPlane.APIServer.StopTimeout).To(Equal(16 * time.Second))
123		})
124	})
125
126	Context("when etcd already started", func() {
127		It("starts the control plane successfully", func() {
128			myEtcd := &integration.Etcd{}
129			Expect(myEtcd.Start()).To(Succeed())
130
131			controlPlane = &integration.ControlPlane{
132				Etcd: myEtcd,
133			}
134
135			Expect(controlPlane.Start()).To(Succeed())
136		})
137	})
138
139	Context("when control plane is already started", func() {
140		It("can attempt to start again without errors", func() {
141			controlPlane = &integration.ControlPlane{}
142			Expect(controlPlane.Start()).To(Succeed())
143			Expect(controlPlane.Start()).To(Succeed())
144		})
145	})
146
147	Context("when control plane starts and stops", func() {
148		It("can attempt to start again without errors", func() {
149			controlPlane = &integration.ControlPlane{}
150			Expect(controlPlane.Start()).To(Succeed())
151			Expect(controlPlane.Stop()).To(Succeed())
152			Expect(controlPlane.Start()).To(Succeed())
153		})
154	})
155
156	Measure("It should be fast to bring up and tear down the control plane", func(b Benchmarker) {
157		b.Time("lifecycle", func() {
158			controlPlane = &integration.ControlPlane{}
159
160			Expect(controlPlane.Start()).To(Succeed())
161			Expect(controlPlane.Stop()).To(Succeed())
162		})
163	}, 10)
164})
165
166type portChecker func() bool
167
168func isSomethingListeningOnPort(hostAndPort string) portChecker {
169	return func() bool {
170		conn, err := net.DialTimeout("tcp", hostAndPort, 1*time.Second)
171
172		if err != nil {
173			return false
174		}
175		conn.Close()
176		return true
177	}
178}
179
180// CheckAPIServerIsReady checks if the APIServer is really ready and not only
181// listening.
182//
183// While porting some tests in k/k
184// (https://github.com/hoegaarden/kubernetes/blob/287fdef1bd98646bc521f4433c1009936d5cf7a2/hack/make-rules/test-cmd-util.sh#L1524-L1535)
185// we found, that the APIServer was
186// listening but not serving certain APIs yet.
187//
188// We changed the readiness detection in the PR at
189// https://github.com/kubernetes-sigs/testing_frameworks/pull/48. To confirm
190// this changed behaviour does what it should do, we used the same test as in
191// k/k's test-cmd (see link above) and test if certain well-known known APIs
192// are actually available.
193func CheckAPIServerIsReady(c *rest.Config) {
194	ctx := context.TODO()
195	// check pods, replicationcontrollers and services
196	c.APIPath = "/api"
197	c.ContentConfig.GroupVersion = &schema.GroupVersion{Version: "v1"}
198	kubeClient, err := rest.RESTClientFor(c)
199	Expect(err).NotTo(HaveOccurred())
200
201	_, err = kubeClient.Get().
202		Namespace("default").
203		Resource("pods").
204		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
205		Do(ctx).
206		Get()
207	Expect(err).NotTo(HaveOccurred())
208
209	_, err = kubeClient.Get().
210		Namespace("default").
211		Resource("replicationcontrollers").
212		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
213		Do(ctx).
214		Get()
215	Expect(err).NotTo(HaveOccurred())
216
217	_, err = kubeClient.Get().
218		Namespace("default").
219		Resource("services").
220		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
221		Do(ctx).
222		Get()
223	Expect(err).NotTo(HaveOccurred())
224
225	// check daemonsets, deployments, replicasets and statefulsets,
226	c.APIPath = "/apis"
227	c.ContentConfig.GroupVersion = &schema.GroupVersion{Group: "apps", Version: "v1"}
228	kubeClient, err = rest.RESTClientFor(c)
229	Expect(err).NotTo(HaveOccurred())
230
231	_, err = kubeClient.Get().
232		Namespace("default").
233		Resource("daemonsets").
234		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
235		Do(ctx).
236		Get()
237	Expect(err).NotTo(HaveOccurred())
238
239	_, err = kubeClient.Get().
240		Namespace("default").
241		Resource("deployments").
242		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
243		Do(ctx).
244		Get()
245	Expect(err).NotTo(HaveOccurred())
246
247	_, err = kubeClient.Get().
248		Namespace("default").
249		Resource("replicasets").
250		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
251		Do(ctx).
252		Get()
253	Expect(err).NotTo(HaveOccurred())
254
255	_, err = kubeClient.Get().
256		Namespace("default").
257		Resource("statefulsets").
258		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
259		Do(ctx).
260		Get()
261	Expect(err).NotTo(HaveOccurred())
262
263	// check horizontalpodautoscalers
264	c.ContentConfig.GroupVersion = &schema.GroupVersion{Group: "autoscaling", Version: "v1"}
265	kubeClient, err = rest.RESTClientFor(c)
266	Expect(err).NotTo(HaveOccurred())
267
268	_, err = kubeClient.Get().
269		Namespace("default").
270		Resource("horizontalpodautoscalers").
271		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
272		Do(ctx).
273		Get()
274	Expect(err).NotTo(HaveOccurred())
275
276	// check jobs
277	c.ContentConfig.GroupVersion = &schema.GroupVersion{Group: "batch", Version: "v1"}
278	kubeClient, err = rest.RESTClientFor(c)
279	Expect(err).NotTo(HaveOccurred())
280
281	_, err = kubeClient.Get().
282		Namespace("default").
283		Resource("jobs").
284		VersionedParams(&metav1.ListOptions{}, scheme.ParameterCodec).
285		Do(ctx).
286		Get()
287	Expect(err).NotTo(HaveOccurred())
288}
289