1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package apps 18 19import ( 20 "context" 21 "encoding/json" 22 "fmt" 23 "io/ioutil" 24 "net/http" 25 "path/filepath" 26 "strconv" 27 "time" 28 29 "github.com/onsi/ginkgo" 30 31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 32 "k8s.io/apimachinery/pkg/util/version" 33 "k8s.io/apimachinery/pkg/util/wait" 34 "k8s.io/kubernetes/test/e2e/framework" 35 e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset" 36 e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" 37 "k8s.io/kubernetes/test/e2e/upgrades" 38) 39 40const mysqlManifestPath = "test/e2e/testing-manifests/statefulset/mysql-upgrade" 41 42// MySQLUpgradeTest implements an upgrade test harness that polls a replicated sql database. 43type MySQLUpgradeTest struct { 44 ip string 45 successfulWrites int 46 nextWrite int 47} 48 49// Name returns the tracking name of the test. 50func (MySQLUpgradeTest) Name() string { return "mysql-upgrade" } 51 52// Skip returns true when this test can be skipped. 53func (MySQLUpgradeTest) Skip(upgCtx upgrades.UpgradeContext) bool { 54 minVersion := version.MustParseSemantic("1.5.0") 55 56 for _, vCtx := range upgCtx.Versions { 57 if vCtx.Version.LessThan(minVersion) { 58 return true 59 } 60 } 61 return false 62} 63 64func mysqlKubectlCreate(ns, file string) { 65 data, err := e2etestfiles.Read(filepath.Join(mysqlManifestPath, file)) 66 if err != nil { 67 framework.Fail(err.Error()) 68 } 69 input := string(data) 70 framework.RunKubectlOrDieInput(ns, input, "create", "-f", "-") 71} 72 73func (t *MySQLUpgradeTest) getServiceIP(f *framework.Framework, ns, svcName string) string { 74 svc, err := f.ClientSet.CoreV1().Services(ns).Get(context.TODO(), svcName, metav1.GetOptions{}) 75 framework.ExpectNoError(err) 76 ingress := svc.Status.LoadBalancer.Ingress 77 if len(ingress) == 0 { 78 return "" 79 } 80 return ingress[0].IP 81} 82 83// Setup creates a StatefulSet, HeadlessService, a Service to write to the db, and a Service to read 84// from the db. It then connects to the db with the write Service and populates the db with a table 85// and a few entries. Finally, it connects to the db with the read Service, and confirms the data is 86// available. The db connections are left open to be used later in the test. 87func (t *MySQLUpgradeTest) Setup(f *framework.Framework) { 88 ns := f.Namespace.Name 89 statefulsetPoll := 30 * time.Second 90 statefulsetTimeout := 10 * time.Minute 91 92 ginkgo.By("Creating a configmap") 93 mysqlKubectlCreate(ns, "configmap.yaml") 94 95 ginkgo.By("Creating a mysql StatefulSet") 96 e2estatefulset.CreateStatefulSet(f.ClientSet, mysqlManifestPath, ns) 97 98 ginkgo.By("Creating a mysql-test-server deployment") 99 mysqlKubectlCreate(ns, "tester.yaml") 100 101 ginkgo.By("Getting the ingress IPs from the test-service") 102 err := wait.PollImmediate(statefulsetPoll, statefulsetTimeout, func() (bool, error) { 103 if t.ip = t.getServiceIP(f, ns, "test-server"); t.ip == "" { 104 return false, nil 105 } 106 if _, err := t.countNames(); err != nil { 107 framework.Logf("Service endpoint is up but isn't responding") 108 return false, nil 109 } 110 return true, nil 111 }) 112 framework.ExpectNoError(err) 113 framework.Logf("Service endpoint is up") 114 115 ginkgo.By("Adding 2 names to the database") 116 err = t.addName(strconv.Itoa(t.nextWrite)) 117 framework.ExpectNoError(err) 118 err = t.addName(strconv.Itoa(t.nextWrite)) 119 framework.ExpectNoError(err) 120 121 ginkgo.By("Verifying that the 2 names have been inserted") 122 count, err := t.countNames() 123 framework.ExpectNoError(err) 124 framework.ExpectEqual(count, 2) 125} 126 127// Test continually polls the db using the read and write connections, inserting data, and checking 128// that all the data is readable. 129func (t *MySQLUpgradeTest) Test(f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) { 130 var writeSuccess, readSuccess, writeFailure, readFailure int 131 ginkgo.By("Continuously polling the database during upgrade.") 132 go wait.Until(func() { 133 _, err := t.countNames() 134 if err != nil { 135 framework.Logf("Error while trying to read data: %v", err) 136 readFailure++ 137 } else { 138 readSuccess++ 139 } 140 }, framework.Poll, done) 141 142 wait.Until(func() { 143 err := t.addName(strconv.Itoa(t.nextWrite)) 144 if err != nil { 145 framework.Logf("Error while trying to write data: %v", err) 146 writeFailure++ 147 } else { 148 writeSuccess++ 149 } 150 }, framework.Poll, done) 151 152 t.successfulWrites = writeSuccess 153 framework.Logf("Successful reads: %d", readSuccess) 154 framework.Logf("Successful writes: %d", writeSuccess) 155 framework.Logf("Failed reads: %d", readFailure) 156 framework.Logf("Failed writes: %d", writeFailure) 157 158 // TODO: Not sure what the ratio defining a successful test run should be. At time of writing the 159 // test, failures only seem to happen when a race condition occurs (read/write starts, doesn't 160 // finish before upgrade interferes). 161 162 readRatio := float64(readSuccess) / float64(readSuccess+readFailure) 163 writeRatio := float64(writeSuccess) / float64(writeSuccess+writeFailure) 164 if readRatio < 0.75 { 165 framework.Failf("Too many failures reading data. Success ratio: %f", readRatio) 166 } 167 if writeRatio < 0.75 { 168 framework.Failf("Too many failures writing data. Success ratio: %f", writeRatio) 169 } 170} 171 172// Teardown performs one final check of the data's availability. 173func (t *MySQLUpgradeTest) Teardown(f *framework.Framework) { 174 count, err := t.countNames() 175 framework.ExpectNoError(err) 176 framework.ExpectEqual(count >= t.successfulWrites, true) 177} 178 179// addName adds a new value to the db. 180func (t *MySQLUpgradeTest) addName(name string) error { 181 val := map[string][]string{"name": {name}} 182 t.nextWrite++ 183 r, err := http.PostForm(fmt.Sprintf("http://%s:8080/addName", t.ip), val) 184 if err != nil { 185 return err 186 } 187 defer r.Body.Close() 188 if r.StatusCode != http.StatusOK { 189 b, err := ioutil.ReadAll(r.Body) 190 if err != nil { 191 return err 192 } 193 return fmt.Errorf(string(b)) 194 } 195 return nil 196} 197 198// countNames checks to make sure the values in testing.users are available, and returns 199// the count of them. 200func (t *MySQLUpgradeTest) countNames() (int, error) { 201 r, err := http.Get(fmt.Sprintf("http://%s:8080/countNames", t.ip)) 202 if err != nil { 203 return 0, err 204 } 205 defer r.Body.Close() 206 if r.StatusCode != http.StatusOK { 207 b, err := ioutil.ReadAll(r.Body) 208 if err != nil { 209 return 0, err 210 } 211 return 0, fmt.Errorf(string(b)) 212 } 213 var count int 214 if err := json.NewDecoder(r.Body).Decode(&count); err != nil { 215 return 0, err 216 } 217 return count, nil 218} 219