1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package apps
18
19import (
20	"context"
21	"encoding/json"
22	"fmt"
23	"io/ioutil"
24	"net/http"
25	"path/filepath"
26	"strconv"
27	"time"
28
29	"github.com/onsi/ginkgo"
30
31	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32	"k8s.io/apimachinery/pkg/util/version"
33	"k8s.io/apimachinery/pkg/util/wait"
34	"k8s.io/kubernetes/test/e2e/framework"
35	e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
36	e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
37	"k8s.io/kubernetes/test/e2e/upgrades"
38)
39
40const mysqlManifestPath = "test/e2e/testing-manifests/statefulset/mysql-upgrade"
41
42// MySQLUpgradeTest implements an upgrade test harness that polls a replicated sql database.
43type MySQLUpgradeTest struct {
44	ip               string
45	successfulWrites int
46	nextWrite        int
47}
48
49// Name returns the tracking name of the test.
50func (MySQLUpgradeTest) Name() string { return "mysql-upgrade" }
51
52// Skip returns true when this test can be skipped.
53func (MySQLUpgradeTest) Skip(upgCtx upgrades.UpgradeContext) bool {
54	minVersion := version.MustParseSemantic("1.5.0")
55
56	for _, vCtx := range upgCtx.Versions {
57		if vCtx.Version.LessThan(minVersion) {
58			return true
59		}
60	}
61	return false
62}
63
64func mysqlKubectlCreate(ns, file string) {
65	data, err := e2etestfiles.Read(filepath.Join(mysqlManifestPath, file))
66	if err != nil {
67		framework.Fail(err.Error())
68	}
69	input := string(data)
70	framework.RunKubectlOrDieInput(ns, input, "create", "-f", "-")
71}
72
73func (t *MySQLUpgradeTest) getServiceIP(f *framework.Framework, ns, svcName string) string {
74	svc, err := f.ClientSet.CoreV1().Services(ns).Get(context.TODO(), svcName, metav1.GetOptions{})
75	framework.ExpectNoError(err)
76	ingress := svc.Status.LoadBalancer.Ingress
77	if len(ingress) == 0 {
78		return ""
79	}
80	return ingress[0].IP
81}
82
83// Setup creates a StatefulSet, HeadlessService, a Service to write to the db, and a Service to read
84// from the db. It then connects to the db with the write Service and populates the db with a table
85// and a few entries. Finally, it connects to the db with the read Service, and confirms the data is
86// available. The db connections are left open to be used later in the test.
87func (t *MySQLUpgradeTest) Setup(f *framework.Framework) {
88	ns := f.Namespace.Name
89	statefulsetPoll := 30 * time.Second
90	statefulsetTimeout := 10 * time.Minute
91
92	ginkgo.By("Creating a configmap")
93	mysqlKubectlCreate(ns, "configmap.yaml")
94
95	ginkgo.By("Creating a mysql StatefulSet")
96	e2estatefulset.CreateStatefulSet(f.ClientSet, mysqlManifestPath, ns)
97
98	ginkgo.By("Creating a mysql-test-server deployment")
99	mysqlKubectlCreate(ns, "tester.yaml")
100
101	ginkgo.By("Getting the ingress IPs from the test-service")
102	err := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) {
103		if t.ip = t.getServiceIP(f, ns, "test-server"); t.ip == "" {
104			return false, nil
105		}
106		if _, err := t.countNames(); err != nil {
107			framework.Logf("Service endpoint is up but isn't responding")
108			return false, nil
109		}
110		return true, nil
111	})
112	framework.ExpectNoError(err)
113	framework.Logf("Service endpoint is up")
114
115	ginkgo.By("Adding 2 names to the database")
116	err = t.addName(strconv.Itoa(t.nextWrite))
117	framework.ExpectNoError(err)
118	err = t.addName(strconv.Itoa(t.nextWrite))
119	framework.ExpectNoError(err)
120
121	ginkgo.By("Verifying that the 2 names have been inserted")
122	count, err := t.countNames()
123	framework.ExpectNoError(err)
124	framework.ExpectEqual(count, 2)
125}
126
127// Test continually polls the db using the read and write connections, inserting data, and checking
128// that all the data is readable.
129func (t *MySQLUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
130	var writeSuccess, readSuccess, writeFailure, readFailure int
131	ginkgo.By("Continuously polling the database during upgrade.")
132	go wait.Until(func() {
133		_, err := t.countNames()
134		if err != nil {
135			framework.Logf("Error while trying to read data: %v", err)
136			readFailure++
137		} else {
138			readSuccess++
139		}
140	}, framework.Poll, done)
141
142	wait.Until(func() {
143		err := t.addName(strconv.Itoa(t.nextWrite))
144		if err != nil {
145			framework.Logf("Error while trying to write data: %v", err)
146			writeFailure++
147		} else {
148			writeSuccess++
149		}
150	}, framework.Poll, done)
151
152	t.successfulWrites = writeSuccess
153	framework.Logf("Successful reads: %d", readSuccess)
154	framework.Logf("Successful writes: %d", writeSuccess)
155	framework.Logf("Failed reads: %d", readFailure)
156	framework.Logf("Failed writes: %d", writeFailure)
157
158	// TODO: Not sure what the ratio defining a successful test run should be. At time of writing the
159	// test, failures only seem to happen when a race condition occurs (read/write starts, doesn't
160	// finish before upgrade interferes).
161
162	readRatio := float64(readSuccess) / float64(readSuccess+readFailure)
163	writeRatio := float64(writeSuccess) / float64(writeSuccess+writeFailure)
164	if readRatio < 0.75 {
165		framework.Failf("Too many failures reading data. Success ratio: %f", readRatio)
166	}
167	if writeRatio < 0.75 {
168		framework.Failf("Too many failures writing data. Success ratio: %f", writeRatio)
169	}
170}
171
172// Teardown performs one final check of the data's availability.
173func (t *MySQLUpgradeTest) Teardown(f *framework.Framework) {
174	count, err := t.countNames()
175	framework.ExpectNoError(err)
176	framework.ExpectEqual(count >= t.successfulWrites, true)
177}
178
179// addName adds a new value to the db.
180func (t *MySQLUpgradeTest) addName(name string) error {
181	val := map[string][]string{"name": {name}}
182	t.nextWrite++
183	r, err := http.PostForm(fmt.Sprintf("http://%s:8080/addName", t.ip), val)
184	if err != nil {
185		return err
186	}
187	defer r.Body.Close()
188	if r.StatusCode != http.StatusOK {
189		b, err := ioutil.ReadAll(r.Body)
190		if err != nil {
191			return err
192		}
193		return fmt.Errorf(string(b))
194	}
195	return nil
196}
197
198// countNames checks to make sure the values in testing.users are available, and returns
199// the count of them.
200func (t *MySQLUpgradeTest) countNames() (int, error) {
201	r, err := http.Get(fmt.Sprintf("http://%s:8080/countNames", t.ip))
202	if err != nil {
203		return 0, err
204	}
205	defer r.Body.Close()
206	if r.StatusCode != http.StatusOK {
207		b, err := ioutil.ReadAll(r.Body)
208		if err != nil {
209			return 0, err
210		}
211		return 0, fmt.Errorf(string(b))
212	}
213	var count int
214	if err := json.NewDecoder(r.Body).Decode(&count); err != nil {
215		return 0, err
216	}
217	return count, nil
218}
219