1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package benchmark
18
19import (
20	"context"
21	"encoding/json"
22	"flag"
23	"fmt"
24	"io/ioutil"
25	"math"
26	"os"
27	"path"
28	"sort"
29	"time"
30
31	v1 "k8s.io/api/core/v1"
32	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33	"k8s.io/apimachinery/pkg/labels"
34	"k8s.io/apimachinery/pkg/runtime/schema"
35	"k8s.io/apimachinery/pkg/util/sets"
36	"k8s.io/client-go/dynamic"
37	coreinformers "k8s.io/client-go/informers/core/v1"
38	clientset "k8s.io/client-go/kubernetes"
39	restclient "k8s.io/client-go/rest"
40	"k8s.io/component-base/metrics/legacyregistry"
41	"k8s.io/component-base/metrics/testutil"
42	"k8s.io/klog/v2"
43	"k8s.io/kube-scheduler/config/v1beta1"
44	"k8s.io/kubernetes/pkg/scheduler/apis/config"
45	kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
46	"k8s.io/kubernetes/test/integration/util"
47	testutils "k8s.io/kubernetes/test/utils"
48)
49
50const (
51	dateFormat                = "2006-01-02T15:04:05Z"
52	testNamespace             = "sched-test"
53	setupNamespace            = "sched-setup"
54	throughputSampleFrequency = time.Second
55)
56
57var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
58
59func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
60	gvk := v1beta1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration")
61	cfg := config.KubeSchedulerConfiguration{}
62	_, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode(nil, &gvk, &cfg)
63	if err != nil {
64		return nil, err
65	}
66	return &cfg, nil
67}
68
69// mustSetupScheduler starts the following components:
70// - k8s api server
71// - scheduler
72// It returns regular and dynamic clients, and destroyFunc which should be used to
73// remove resources after finished.
74// Notes on rate limiter:
75//   - client rate limit is set to 5000.
76func mustSetupScheduler(config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) {
77	apiURL, apiShutdown := util.StartApiserver()
78	var err error
79
80	// TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to
81	// support this when there is any testcase that depends on such configuration.
82	cfg := &restclient.Config{
83		Host:          apiURL,
84		ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}},
85		QPS:           5000.0,
86		Burst:         5000,
87	}
88
89	// use default component config if config here is nil
90	if config == nil {
91		config, err = newDefaultComponentConfig()
92		if err != nil {
93			klog.Fatalf("Error creating default component config: %v", err)
94		}
95	}
96
97	client := clientset.NewForConfigOrDie(cfg)
98	dynClient := dynamic.NewForConfigOrDie(cfg)
99
100	// Not all config options will be effective but only those mostly related with scheduler performance will
101	// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
102	_, podInformer, schedulerShutdown := util.StartScheduler(client, cfg, config)
103	fakePVControllerShutdown := util.StartFakePVController(client)
104
105	shutdownFunc := func() {
106		fakePVControllerShutdown()
107		schedulerShutdown()
108		apiShutdown()
109	}
110
111	return shutdownFunc, podInformer, client, dynClient
112}
113
114// Returns the list of scheduled pods in the specified namespaces.
115// Note that no namespaces specified matches all namespaces.
116func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, error) {
117	pods, err := podInformer.Lister().List(labels.Everything())
118	if err != nil {
119		return nil, err
120	}
121
122	s := sets.NewString(namespaces...)
123	scheduled := make([]*v1.Pod, 0, len(pods))
124	for i := range pods {
125		pod := pods[i]
126		if len(pod.Spec.NodeName) > 0 && (len(s) == 0 || s.Has(pod.Namespace)) {
127			scheduled = append(scheduled, pod)
128		}
129	}
130	return scheduled, nil
131}
132
133// DataItem is the data point.
134type DataItem struct {
135	// Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice
136	// that all data items with the same label combination should have the same buckets.
137	Data map[string]float64 `json:"data"`
138	// Unit is the data unit. Notice that all data items with the same label combination
139	// should have the same unit.
140	Unit string `json:"unit"`
141	// Labels is the labels of the data item.
142	Labels map[string]string `json:"labels,omitempty"`
143}
144
145// DataItems is the data point set. It is the struct that perf dashboard expects.
146type DataItems struct {
147	Version   string     `json:"version"`
148	DataItems []DataItem `json:"dataItems"`
149}
150
151// makeBasePod creates a Pod object to be used as a template.
152func makeBasePod() *v1.Pod {
153	basePod := &v1.Pod{
154		ObjectMeta: metav1.ObjectMeta{
155			GenerateName: "pod-",
156		},
157		Spec: testutils.MakePodSpec(),
158	}
159	return basePod
160}
161
162func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
163	b, err := json.Marshal(dataItems)
164	if err != nil {
165		return err
166	}
167
168	destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
169	if *dataItemsDir != "" {
170		// Ensure the "dataItemsDir" path to be valid.
171		if err := os.MkdirAll(*dataItemsDir, 0750); err != nil {
172			return fmt.Errorf("dataItemsDir path %v does not exist and cannot be created: %v", *dataItemsDir, err)
173		}
174		destFile = path.Join(*dataItemsDir, destFile)
175	}
176
177	return ioutil.WriteFile(destFile, b, 0644)
178}
179
180// metricsCollectorConfig is the config to be marshalled to YAML config file.
181type metricsCollectorConfig struct {
182	Metrics []string
183}
184
185// metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint.
186// Currently only Histrogram metrics are supported.
187type metricsCollector struct {
188	*metricsCollectorConfig
189	labels map[string]string
190}
191
192func newMetricsCollector(config *metricsCollectorConfig, labels map[string]string) *metricsCollector {
193	return &metricsCollector{
194		metricsCollectorConfig: config,
195		labels:                 labels,
196	}
197}
198
199func (*metricsCollector) run(ctx context.Context) {
200	// metricCollector doesn't need to start before the tests, so nothing to do here.
201}
202
203func (pc *metricsCollector) collect() []DataItem {
204	var dataItems []DataItem
205	for _, metric := range pc.Metrics {
206		dataItem := collectHistogram(metric, pc.labels)
207		if dataItem != nil {
208			dataItems = append(dataItems, *dataItem)
209		}
210	}
211	return dataItems
212}
213
214func collectHistogram(metric string, labels map[string]string) *DataItem {
215	hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, metric)
216	if err != nil {
217		klog.Error(err)
218		return nil
219	}
220	if hist.Histogram == nil {
221		klog.Errorf("metric %q is not a Histogram metric", metric)
222		return nil
223	}
224	if err := hist.Validate(); err != nil {
225		klog.Error(err)
226		return nil
227	}
228
229	q50 := hist.Quantile(0.50)
230	q90 := hist.Quantile(0.90)
231	q95 := hist.Quantile(0.95)
232	q99 := hist.Quantile(0.99)
233	avg := hist.Average()
234
235	msFactor := float64(time.Second) / float64(time.Millisecond)
236
237	// Copy labels and add "Metric" label for this metric.
238	labelMap := map[string]string{"Metric": metric}
239	for k, v := range labels {
240		labelMap[k] = v
241	}
242	return &DataItem{
243		Labels: labelMap,
244		Data: map[string]float64{
245			"Perc50":  q50 * msFactor,
246			"Perc90":  q90 * msFactor,
247			"Perc95":  q95 * msFactor,
248			"Perc99":  q99 * msFactor,
249			"Average": avg * msFactor,
250		},
251		Unit: "ms",
252	}
253}
254
255type throughputCollector struct {
256	podInformer           coreinformers.PodInformer
257	schedulingThroughputs []float64
258	labels                map[string]string
259	namespaces            []string
260}
261
262func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector {
263	return &throughputCollector{
264		podInformer: podInformer,
265		labels:      labels,
266		namespaces:  namespaces,
267	}
268}
269
270func (tc *throughputCollector) run(ctx context.Context) {
271	podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...)
272	if err != nil {
273		klog.Fatalf("%v", err)
274	}
275	lastScheduledCount := len(podsScheduled)
276	ticker := time.NewTicker(throughputSampleFrequency)
277	defer ticker.Stop()
278	for {
279		select {
280		case <-ctx.Done():
281			return
282		case <-ticker.C:
283			podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...)
284			if err != nil {
285				klog.Fatalf("%v", err)
286			}
287
288			scheduled := len(podsScheduled)
289			// Only do sampling if number of scheduled pods is greater than zero
290			if scheduled > 0 {
291				samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second)
292				throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds
293				tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
294				lastScheduledCount = scheduled
295				klog.Infof("%d pods scheduled", lastScheduledCount)
296			}
297
298		}
299	}
300}
301
302func (tc *throughputCollector) collect() []DataItem {
303	throughputSummary := DataItem{Labels: tc.labels}
304	if length := len(tc.schedulingThroughputs); length > 0 {
305		sort.Float64s(tc.schedulingThroughputs)
306		sum := 0.0
307		for i := range tc.schedulingThroughputs {
308			sum += tc.schedulingThroughputs[i]
309		}
310
311		throughputSummary.Labels["Metric"] = "SchedulingThroughput"
312		throughputSummary.Data = map[string]float64{
313			"Average": sum / float64(length),
314			"Perc50":  tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
315			"Perc90":  tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
316			"Perc95":  tc.schedulingThroughputs[int(math.Ceil(float64(length*95)/100))-1],
317			"Perc99":  tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
318		}
319		throughputSummary.Unit = "pods/s"
320	}
321
322	return []DataItem{throughputSummary}
323}
324