1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package benchmark 18 19import ( 20 "context" 21 "encoding/json" 22 "flag" 23 "fmt" 24 "io/ioutil" 25 "math" 26 "os" 27 "path" 28 "sort" 29 "time" 30 31 v1 "k8s.io/api/core/v1" 32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 33 "k8s.io/apimachinery/pkg/labels" 34 "k8s.io/apimachinery/pkg/runtime/schema" 35 "k8s.io/apimachinery/pkg/util/sets" 36 "k8s.io/client-go/dynamic" 37 coreinformers "k8s.io/client-go/informers/core/v1" 38 clientset "k8s.io/client-go/kubernetes" 39 restclient "k8s.io/client-go/rest" 40 "k8s.io/component-base/metrics/legacyregistry" 41 "k8s.io/component-base/metrics/testutil" 42 "k8s.io/klog/v2" 43 "k8s.io/kube-scheduler/config/v1beta1" 44 "k8s.io/kubernetes/pkg/scheduler/apis/config" 45 kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" 46 "k8s.io/kubernetes/test/integration/util" 47 testutils "k8s.io/kubernetes/test/utils" 48) 49 50const ( 51 dateFormat = "2006-01-02T15:04:05Z" 52 testNamespace = "sched-test" 53 setupNamespace = "sched-setup" 54 throughputSampleFrequency = time.Second 55) 56 57var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard") 58 59func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) { 60 gvk := v1beta1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration") 61 cfg := config.KubeSchedulerConfiguration{} 62 _, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode(nil, &gvk, &cfg) 63 if err != nil { 64 return nil, err 65 } 66 return &cfg, nil 67} 68 69// mustSetupScheduler starts the following components: 70// - k8s api server 71// - scheduler 72// It returns regular and dynamic clients, and destroyFunc which should be used to 73// remove resources after finished. 74// Notes on rate limiter: 75// - client rate limit is set to 5000. 76func mustSetupScheduler(config *config.KubeSchedulerConfiguration) (util.ShutdownFunc, coreinformers.PodInformer, clientset.Interface, dynamic.Interface) { 77 apiURL, apiShutdown := util.StartApiserver() 78 var err error 79 80 // TODO: client connection configuration, such as QPS or Burst is configurable in theory, this could be derived from the `config`, need to 81 // support this when there is any testcase that depends on such configuration. 82 cfg := &restclient.Config{ 83 Host: apiURL, 84 ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, 85 QPS: 5000.0, 86 Burst: 5000, 87 } 88 89 // use default component config if config here is nil 90 if config == nil { 91 config, err = newDefaultComponentConfig() 92 if err != nil { 93 klog.Fatalf("Error creating default component config: %v", err) 94 } 95 } 96 97 client := clientset.NewForConfigOrDie(cfg) 98 dynClient := dynamic.NewForConfigOrDie(cfg) 99 100 // Not all config options will be effective but only those mostly related with scheduler performance will 101 // be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`. 102 _, podInformer, schedulerShutdown := util.StartScheduler(client, cfg, config) 103 fakePVControllerShutdown := util.StartFakePVController(client) 104 105 shutdownFunc := func() { 106 fakePVControllerShutdown() 107 schedulerShutdown() 108 apiShutdown() 109 } 110 111 return shutdownFunc, podInformer, client, dynClient 112} 113 114// Returns the list of scheduled pods in the specified namespaces. 115// Note that no namespaces specified matches all namespaces. 116func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, error) { 117 pods, err := podInformer.Lister().List(labels.Everything()) 118 if err != nil { 119 return nil, err 120 } 121 122 s := sets.NewString(namespaces...) 123 scheduled := make([]*v1.Pod, 0, len(pods)) 124 for i := range pods { 125 pod := pods[i] 126 if len(pod.Spec.NodeName) > 0 && (len(s) == 0 || s.Has(pod.Namespace)) { 127 scheduled = append(scheduled, pod) 128 } 129 } 130 return scheduled, nil 131} 132 133// DataItem is the data point. 134type DataItem struct { 135 // Data is a map from bucket to real data point (e.g. "Perc90" -> 23.5). Notice 136 // that all data items with the same label combination should have the same buckets. 137 Data map[string]float64 `json:"data"` 138 // Unit is the data unit. Notice that all data items with the same label combination 139 // should have the same unit. 140 Unit string `json:"unit"` 141 // Labels is the labels of the data item. 142 Labels map[string]string `json:"labels,omitempty"` 143} 144 145// DataItems is the data point set. It is the struct that perf dashboard expects. 146type DataItems struct { 147 Version string `json:"version"` 148 DataItems []DataItem `json:"dataItems"` 149} 150 151// makeBasePod creates a Pod object to be used as a template. 152func makeBasePod() *v1.Pod { 153 basePod := &v1.Pod{ 154 ObjectMeta: metav1.ObjectMeta{ 155 GenerateName: "pod-", 156 }, 157 Spec: testutils.MakePodSpec(), 158 } 159 return basePod 160} 161 162func dataItems2JSONFile(dataItems DataItems, namePrefix string) error { 163 b, err := json.Marshal(dataItems) 164 if err != nil { 165 return err 166 } 167 168 destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat)) 169 if *dataItemsDir != "" { 170 // Ensure the "dataItemsDir" path to be valid. 171 if err := os.MkdirAll(*dataItemsDir, 0750); err != nil { 172 return fmt.Errorf("dataItemsDir path %v does not exist and cannot be created: %v", *dataItemsDir, err) 173 } 174 destFile = path.Join(*dataItemsDir, destFile) 175 } 176 177 return ioutil.WriteFile(destFile, b, 0644) 178} 179 180// metricsCollectorConfig is the config to be marshalled to YAML config file. 181type metricsCollectorConfig struct { 182 Metrics []string 183} 184 185// metricsCollector collects metrics from legacyregistry.DefaultGatherer.Gather() endpoint. 186// Currently only Histrogram metrics are supported. 187type metricsCollector struct { 188 *metricsCollectorConfig 189 labels map[string]string 190} 191 192func newMetricsCollector(config *metricsCollectorConfig, labels map[string]string) *metricsCollector { 193 return &metricsCollector{ 194 metricsCollectorConfig: config, 195 labels: labels, 196 } 197} 198 199func (*metricsCollector) run(ctx context.Context) { 200 // metricCollector doesn't need to start before the tests, so nothing to do here. 201} 202 203func (pc *metricsCollector) collect() []DataItem { 204 var dataItems []DataItem 205 for _, metric := range pc.Metrics { 206 dataItem := collectHistogram(metric, pc.labels) 207 if dataItem != nil { 208 dataItems = append(dataItems, *dataItem) 209 } 210 } 211 return dataItems 212} 213 214func collectHistogram(metric string, labels map[string]string) *DataItem { 215 hist, err := testutil.GetHistogramFromGatherer(legacyregistry.DefaultGatherer, metric) 216 if err != nil { 217 klog.Error(err) 218 return nil 219 } 220 if hist.Histogram == nil { 221 klog.Errorf("metric %q is not a Histogram metric", metric) 222 return nil 223 } 224 if err := hist.Validate(); err != nil { 225 klog.Error(err) 226 return nil 227 } 228 229 q50 := hist.Quantile(0.50) 230 q90 := hist.Quantile(0.90) 231 q95 := hist.Quantile(0.95) 232 q99 := hist.Quantile(0.99) 233 avg := hist.Average() 234 235 msFactor := float64(time.Second) / float64(time.Millisecond) 236 237 // Copy labels and add "Metric" label for this metric. 238 labelMap := map[string]string{"Metric": metric} 239 for k, v := range labels { 240 labelMap[k] = v 241 } 242 return &DataItem{ 243 Labels: labelMap, 244 Data: map[string]float64{ 245 "Perc50": q50 * msFactor, 246 "Perc90": q90 * msFactor, 247 "Perc95": q95 * msFactor, 248 "Perc99": q99 * msFactor, 249 "Average": avg * msFactor, 250 }, 251 Unit: "ms", 252 } 253} 254 255type throughputCollector struct { 256 podInformer coreinformers.PodInformer 257 schedulingThroughputs []float64 258 labels map[string]string 259 namespaces []string 260} 261 262func newThroughputCollector(podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string) *throughputCollector { 263 return &throughputCollector{ 264 podInformer: podInformer, 265 labels: labels, 266 namespaces: namespaces, 267 } 268} 269 270func (tc *throughputCollector) run(ctx context.Context) { 271 podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...) 272 if err != nil { 273 klog.Fatalf("%v", err) 274 } 275 lastScheduledCount := len(podsScheduled) 276 ticker := time.NewTicker(throughputSampleFrequency) 277 defer ticker.Stop() 278 for { 279 select { 280 case <-ctx.Done(): 281 return 282 case <-ticker.C: 283 podsScheduled, err := getScheduledPods(tc.podInformer, tc.namespaces...) 284 if err != nil { 285 klog.Fatalf("%v", err) 286 } 287 288 scheduled := len(podsScheduled) 289 // Only do sampling if number of scheduled pods is greater than zero 290 if scheduled > 0 { 291 samplingRatioSeconds := float64(throughputSampleFrequency) / float64(time.Second) 292 throughput := float64(scheduled-lastScheduledCount) / samplingRatioSeconds 293 tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput) 294 lastScheduledCount = scheduled 295 klog.Infof("%d pods scheduled", lastScheduledCount) 296 } 297 298 } 299 } 300} 301 302func (tc *throughputCollector) collect() []DataItem { 303 throughputSummary := DataItem{Labels: tc.labels} 304 if length := len(tc.schedulingThroughputs); length > 0 { 305 sort.Float64s(tc.schedulingThroughputs) 306 sum := 0.0 307 for i := range tc.schedulingThroughputs { 308 sum += tc.schedulingThroughputs[i] 309 } 310 311 throughputSummary.Labels["Metric"] = "SchedulingThroughput" 312 throughputSummary.Data = map[string]float64{ 313 "Average": sum / float64(length), 314 "Perc50": tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1], 315 "Perc90": tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1], 316 "Perc95": tc.schedulingThroughputs[int(math.Ceil(float64(length*95)/100))-1], 317 "Perc99": tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1], 318 } 319 throughputSummary.Unit = "pods/s" 320 } 321 322 return []DataItem{throughputSummary} 323} 324