1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package command
16
17import (
18	"context"
19	"encoding/binary"
20	"fmt"
21	"math"
22	"math/rand"
23	"os"
24	"sync"
25	"time"
26
27	v3 "github.com/coreos/etcd/clientv3"
28	"github.com/coreos/etcd/pkg/report"
29
30	"github.com/spf13/cobra"
31	"golang.org/x/time/rate"
32	"gopkg.in/cheggaaa/pb.v1"
33)
34
35var (
36	checkPerfLoad   string
37	checkPerfPrefix string
38)
39
40type checkPerfCfg struct {
41	limit    int
42	clients  int
43	duration int
44}
45
46var checkPerfCfgMap = map[string]checkPerfCfg{
47	// TODO: support read limit
48	"s": {
49		limit:    150,
50		clients:  50,
51		duration: 60,
52	},
53	"m": {
54		limit:    1000,
55		clients:  200,
56		duration: 60,
57	},
58	"l": {
59		limit:    8000,
60		clients:  500,
61		duration: 60,
62	},
63	"xl": {
64		limit:    15000,
65		clients:  1000,
66		duration: 60,
67	},
68}
69
70// NewCheckCommand returns the cobra command for "check".
71func NewCheckCommand() *cobra.Command {
72	cc := &cobra.Command{
73		Use:   "check <subcommand>",
74		Short: "commands for checking properties of the etcd cluster",
75	}
76
77	cc.AddCommand(NewCheckPerfCommand())
78
79	return cc
80}
81
82// NewCheckPerfCommand returns the cobra command for "check perf".
83func NewCheckPerfCommand() *cobra.Command {
84	cmd := &cobra.Command{
85		Use:   "perf [options]",
86		Short: "Check the performance of the etcd cluster",
87		Run:   newCheckPerfCommand,
88	}
89
90	// TODO: support customized configuration
91	cmd.Flags().StringVar(&checkPerfLoad, "load", "s", "The performance check's workload model. Accepted workloads: s(small), m(medium), l(large), xl(xLarge)")
92	cmd.Flags().StringVar(&checkPerfPrefix, "prefix", "/etcdctl-check-perf/", "The prefix for writing the performance check's keys.")
93
94	return cmd
95}
96
97// newCheckPerfCommand executes the "check perf" command.
98func newCheckPerfCommand(cmd *cobra.Command, args []string) {
99	var checkPerfAlias = map[string]string{
100		"s": "s", "small": "s",
101		"m": "m", "medium": "m",
102		"l": "l", "large": "l",
103		"xl": "xl", "xLarge": "xl",
104	}
105
106	model, ok := checkPerfAlias[checkPerfLoad]
107	if !ok {
108		ExitWithError(ExitBadFeature, fmt.Errorf("unknown load option %v", checkPerfLoad))
109	}
110	cfg := checkPerfCfgMap[model]
111
112	requests := make(chan v3.Op, cfg.clients)
113	limit := rate.NewLimiter(rate.Limit(cfg.limit), 1)
114
115	cc := clientConfigFromCmd(cmd)
116	clients := make([]*v3.Client, cfg.clients)
117	for i := 0; i < cfg.clients; i++ {
118		clients[i] = cc.mustClient()
119	}
120
121	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second)
122	resp, err := clients[0].Get(ctx, checkPerfPrefix, v3.WithPrefix(), v3.WithLimit(1))
123	cancel()
124	if err != nil {
125		ExitWithError(ExitError, err)
126	}
127	if len(resp.Kvs) > 0 {
128		ExitWithError(ExitInvalidInput, fmt.Errorf("prefix %q has keys. Delete with etcdctl del --prefix %s first.", checkPerfPrefix, checkPerfPrefix))
129	}
130
131	ksize, vsize := 256, 1024
132	k, v := make([]byte, ksize), string(make([]byte, vsize))
133
134	bar := pb.New(cfg.duration)
135	bar.Format("Bom !")
136	bar.Start()
137
138	r := report.NewReport("%4.4f")
139	var wg sync.WaitGroup
140
141	wg.Add(len(clients))
142	for i := range clients {
143		go func(c *v3.Client) {
144			defer wg.Done()
145			for op := range requests {
146				st := time.Now()
147				_, derr := c.Do(context.Background(), op)
148				r.Results() <- report.Result{Err: derr, Start: st, End: time.Now()}
149			}
150		}(clients[i])
151	}
152
153	go func() {
154		cctx, ccancel := context.WithTimeout(context.Background(), time.Duration(cfg.duration)*time.Second)
155		defer ccancel()
156		for limit.Wait(cctx) == nil {
157			binary.PutVarint(k, int64(rand.Int63n(math.MaxInt64)))
158			requests <- v3.OpPut(checkPerfPrefix+string(k), v)
159		}
160		close(requests)
161	}()
162
163	go func() {
164		for i := 0; i < cfg.duration; i++ {
165			time.Sleep(time.Second)
166			bar.Add(1)
167		}
168		bar.Finish()
169	}()
170
171	sc := r.Stats()
172	wg.Wait()
173	close(r.Results())
174
175	s := <-sc
176
177	ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
178	_, err = clients[0].Delete(ctx, checkPerfPrefix, v3.WithPrefix())
179	cancel()
180	if err != nil {
181		ExitWithError(ExitError, err)
182	}
183
184	ok = true
185	if len(s.ErrorDist) != 0 {
186		fmt.Println("FAIL: too many errors")
187		for k, v := range s.ErrorDist {
188			fmt.Printf("FAIL: ERROR(%v) -> %d\n", k, v)
189		}
190		ok = false
191	}
192
193	if s.RPS/float64(cfg.limit) <= 0.9 {
194		fmt.Printf("FAIL: Throughput too low: %d writes/s\n", int(s.RPS)+1)
195		ok = false
196	} else {
197		fmt.Printf("PASS: Throughput is %d writes/s\n", int(s.RPS)+1)
198	}
199	if s.Slowest > 0.5 { // slowest request > 500ms
200		fmt.Printf("Slowest request took too long: %fs\n", s.Slowest)
201		ok = false
202	} else {
203		fmt.Printf("PASS: Slowest request took %fs\n", s.Slowest)
204	}
205	if s.Stddev > 0.1 { // stddev > 100ms
206		fmt.Printf("Stddev too high: %fs\n", s.Stddev)
207		ok = false
208	} else {
209		fmt.Printf("PASS: Stddev is %fs\n", s.Stddev)
210	}
211
212	if ok {
213		fmt.Println("PASS")
214	} else {
215		fmt.Println("FAIL")
216		os.Exit(ExitError)
217	}
218}
219