1// Copyright 2017 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package command 16 17import ( 18 "context" 19 "encoding/binary" 20 "fmt" 21 "math" 22 "math/rand" 23 "os" 24 "sync" 25 "time" 26 27 v3 "github.com/coreos/etcd/clientv3" 28 "github.com/coreos/etcd/pkg/report" 29 30 "github.com/spf13/cobra" 31 "golang.org/x/time/rate" 32 "gopkg.in/cheggaaa/pb.v1" 33) 34 35var ( 36 checkPerfLoad string 37 checkPerfPrefix string 38) 39 40type checkPerfCfg struct { 41 limit int 42 clients int 43 duration int 44} 45 46var checkPerfCfgMap = map[string]checkPerfCfg{ 47 // TODO: support read limit 48 "s": { 49 limit: 150, 50 clients: 50, 51 duration: 60, 52 }, 53 "m": { 54 limit: 1000, 55 clients: 200, 56 duration: 60, 57 }, 58 "l": { 59 limit: 8000, 60 clients: 500, 61 duration: 60, 62 }, 63 "xl": { 64 limit: 15000, 65 clients: 1000, 66 duration: 60, 67 }, 68} 69 70// NewCheckCommand returns the cobra command for "check". 71func NewCheckCommand() *cobra.Command { 72 cc := &cobra.Command{ 73 Use: "check <subcommand>", 74 Short: "commands for checking properties of the etcd cluster", 75 } 76 77 cc.AddCommand(NewCheckPerfCommand()) 78 79 return cc 80} 81 82// NewCheckPerfCommand returns the cobra command for "check perf". 83func NewCheckPerfCommand() *cobra.Command { 84 cmd := &cobra.Command{ 85 Use: "perf [options]", 86 Short: "Check the performance of the etcd cluster", 87 Run: newCheckPerfCommand, 88 } 89 90 // TODO: support customized configuration 91 cmd.Flags().StringVar(&checkPerfLoad, "load", "s", "The performance check's workload model. Accepted workloads: s(small), m(medium), l(large), xl(xLarge)") 92 cmd.Flags().StringVar(&checkPerfPrefix, "prefix", "/etcdctl-check-perf/", "The prefix for writing the performance check's keys.") 93 94 return cmd 95} 96 97// newCheckPerfCommand executes the "check perf" command. 98func newCheckPerfCommand(cmd *cobra.Command, args []string) { 99 var checkPerfAlias = map[string]string{ 100 "s": "s", "small": "s", 101 "m": "m", "medium": "m", 102 "l": "l", "large": "l", 103 "xl": "xl", "xLarge": "xl", 104 } 105 106 model, ok := checkPerfAlias[checkPerfLoad] 107 if !ok { 108 ExitWithError(ExitBadFeature, fmt.Errorf("unknown load option %v", checkPerfLoad)) 109 } 110 cfg := checkPerfCfgMap[model] 111 112 requests := make(chan v3.Op, cfg.clients) 113 limit := rate.NewLimiter(rate.Limit(cfg.limit), 1) 114 115 cc := clientConfigFromCmd(cmd) 116 clients := make([]*v3.Client, cfg.clients) 117 for i := 0; i < cfg.clients; i++ { 118 clients[i] = cc.mustClient() 119 } 120 121 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second) 122 resp, err := clients[0].Get(ctx, checkPerfPrefix, v3.WithPrefix(), v3.WithLimit(1)) 123 cancel() 124 if err != nil { 125 ExitWithError(ExitError, err) 126 } 127 if len(resp.Kvs) > 0 { 128 ExitWithError(ExitInvalidInput, fmt.Errorf("prefix %q has keys. Delete with etcdctl del --prefix %s first.", checkPerfPrefix, checkPerfPrefix)) 129 } 130 131 ksize, vsize := 256, 1024 132 k, v := make([]byte, ksize), string(make([]byte, vsize)) 133 134 bar := pb.New(cfg.duration) 135 bar.Format("Bom !") 136 bar.Start() 137 138 r := report.NewReport("%4.4f") 139 var wg sync.WaitGroup 140 141 wg.Add(len(clients)) 142 for i := range clients { 143 go func(c *v3.Client) { 144 defer wg.Done() 145 for op := range requests { 146 st := time.Now() 147 _, derr := c.Do(context.Background(), op) 148 r.Results() <- report.Result{Err: derr, Start: st, End: time.Now()} 149 } 150 }(clients[i]) 151 } 152 153 go func() { 154 cctx, ccancel := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second) 155 defer ccancel() 156 for limit.Wait(cctx) == nil { 157 binary.PutVarint(k, int64(rand.Int63n(math.MaxInt64))) 158 requests <- v3.OpPut(checkPerfPrefix+string(k), v) 159 } 160 close(requests) 161 }() 162 163 go func() { 164 for i := 0; i < cfg.duration; i++ { 165 time.Sleep(time.Second) 166 bar.Add(1) 167 } 168 bar.Finish() 169 }() 170 171 sc := r.Stats() 172 wg.Wait() 173 close(r.Results()) 174 175 s := <-sc 176 177 ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) 178 _, err = clients[0].Delete(ctx, checkPerfPrefix, v3.WithPrefix()) 179 cancel() 180 if err != nil { 181 ExitWithError(ExitError, err) 182 } 183 184 ok = true 185 if len(s.ErrorDist) != 0 { 186 fmt.Println("FAIL: too many errors") 187 for k, v := range s.ErrorDist { 188 fmt.Printf("FAIL: ERROR(%v) -> %d\n", k, v) 189 } 190 ok = false 191 } 192 193 if s.RPS/float64(cfg.limit) <= 0.9 { 194 fmt.Printf("FAIL: Throughput too low: %d writes/s\n", int(s.RPS)+1) 195 ok = false 196 } else { 197 fmt.Printf("PASS: Throughput is %d writes/s\n", int(s.RPS)+1) 198 } 199 if s.Slowest > 0.5 { // slowest request > 500ms 200 fmt.Printf("Slowest request took too long: %fs\n", s.Slowest) 201 ok = false 202 } else { 203 fmt.Printf("PASS: Slowest request took %fs\n", s.Slowest) 204 } 205 if s.Stddev > 0.1 { // stddev > 100ms 206 fmt.Printf("Stddev too high: %fs\n", s.Stddev) 207 ok = false 208 } else { 209 fmt.Printf("PASS: Stddev is %fs\n", s.Stddev) 210 } 211 212 if ok { 213 fmt.Println("PASS") 214 } else { 215 fmt.Println("FAIL") 216 os.Exit(ExitError) 217 } 218} 219