1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package options 18 19import ( 20 "fmt" 21 "net/http" 22 "strconv" 23 "strings" 24 "time" 25 26 "github.com/spf13/pflag" 27 28 "k8s.io/apimachinery/pkg/runtime/schema" 29 "k8s.io/apimachinery/pkg/util/sets" 30 "k8s.io/apiserver/pkg/registry/generic" 31 genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" 32 "k8s.io/apiserver/pkg/server" 33 "k8s.io/apiserver/pkg/server/healthz" 34 "k8s.io/apiserver/pkg/server/options/encryptionconfig" 35 serverstorage "k8s.io/apiserver/pkg/server/storage" 36 "k8s.io/apiserver/pkg/storage/storagebackend" 37 storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" 38 "k8s.io/apiserver/pkg/storage/value" 39 "k8s.io/klog/v2" 40) 41 42type EtcdOptions struct { 43 // The value of Paging on StorageConfig will be overridden by the 44 // calculated feature gate value. 45 StorageConfig storagebackend.Config 46 EncryptionProviderConfigFilepath string 47 48 EtcdServersOverrides []string 49 50 // To enable protobuf as storage format, it is enough 51 // to set it to "application/vnd.kubernetes.protobuf". 52 DefaultStorageMediaType string 53 DeleteCollectionWorkers int 54 EnableGarbageCollection bool 55 56 // Set EnableWatchCache to false to disable all watch caches 57 EnableWatchCache bool 58 // Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set 59 DefaultWatchCacheSize int 60 // WatchCacheSizes represents override to a given resource 61 WatchCacheSizes []string 62} 63 64var storageTypes = sets.NewString( 65 storagebackend.StorageTypeETCD3, 66) 67 68func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions { 69 options := &EtcdOptions{ 70 StorageConfig: *backendConfig, 71 DefaultStorageMediaType: "application/json", 72 DeleteCollectionWorkers: 1, 73 EnableGarbageCollection: true, 74 EnableWatchCache: true, 75 DefaultWatchCacheSize: 100, 76 } 77 options.StorageConfig.CountMetricPollPeriod = time.Minute 78 return options 79} 80 81func (s *EtcdOptions) Validate() []error { 82 if s == nil { 83 return nil 84 } 85 86 allErrors := []error{} 87 if len(s.StorageConfig.Transport.ServerList) == 0 { 88 allErrors = append(allErrors, fmt.Errorf("--etcd-servers must be specified")) 89 } 90 91 if s.StorageConfig.Type != storagebackend.StorageTypeUnset && !storageTypes.Has(s.StorageConfig.Type) { 92 allErrors = append(allErrors, fmt.Errorf("--storage-backend invalid, allowed values: %s. If not specified, it will default to 'etcd3'", strings.Join(storageTypes.List(), ", "))) 93 } 94 95 for _, override := range s.EtcdServersOverrides { 96 tokens := strings.Split(override, "#") 97 if len(tokens) != 2 { 98 allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated")) 99 continue 100 } 101 102 apiresource := strings.Split(tokens[0], "/") 103 if len(apiresource) != 2 { 104 allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated")) 105 continue 106 } 107 108 } 109 110 return allErrors 111} 112 113// AddEtcdFlags adds flags related to etcd storage for a specific APIServer to the specified FlagSet 114func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { 115 if s == nil { 116 return 117 } 118 119 fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, ""+ 120 "Per-resource etcd servers overrides, comma separated. The individual override "+ 121 "format: group/resource#servers, where servers are URLs, semicolon separated. "+ 122 "Note that this applies only to resources compiled into this server binary. ") 123 124 fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, ""+ 125 "The media type to use to store objects in storage. "+ 126 "Some resources or storage backends may only support a specific media type and will ignore this setting.") 127 fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers, 128 "Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.") 129 130 fs.BoolVar(&s.EnableGarbageCollection, "enable-garbage-collector", s.EnableGarbageCollection, ""+ 131 "Enables the generic garbage collector. MUST be synced with the corresponding flag "+ 132 "of the kube-controller-manager.") 133 134 fs.BoolVar(&s.EnableWatchCache, "watch-cache", s.EnableWatchCache, 135 "Enable watch caching in the apiserver") 136 137 fs.IntVar(&s.DefaultWatchCacheSize, "default-watch-cache-size", s.DefaultWatchCacheSize, 138 "Default watch cache size. If zero, watch cache will be disabled for resources that do not have a default watch size set.") 139 140 fs.StringSliceVar(&s.WatchCacheSizes, "watch-cache-sizes", s.WatchCacheSizes, ""+ 141 "Watch cache size settings for some resources (pods, nodes, etc.), comma separated. "+ 142 "The individual setting format: resource[.group]#size, where resource is lowercase plural (no version), "+ 143 "group is omitted for resources of apiVersion v1 (the legacy core API) and included for others, "+ 144 "and size is a number. It takes effect when watch-cache is enabled. "+ 145 "Some resources (replicationcontrollers, endpoints, nodes, pods, services, apiservices.apiregistration.k8s.io) "+ 146 "have system defaults set by heuristics, others default to default-watch-cache-size") 147 148 fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type, 149 "The storage backend for persistence. Options: 'etcd3' (default).") 150 151 dummyCacheSize := 0 152 fs.IntVar(&dummyCacheSize, "deserialization-cache-size", 0, "Number of deserialized json objects to cache in memory.") 153 fs.MarkDeprecated("deserialization-cache-size", "the deserialization cache was dropped in 1.13 with support for etcd2") 154 155 fs.StringSliceVar(&s.StorageConfig.Transport.ServerList, "etcd-servers", s.StorageConfig.Transport.ServerList, 156 "List of etcd servers to connect with (scheme://ip:port), comma separated.") 157 158 fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix, 159 "The prefix to prepend to all resource paths in etcd.") 160 161 fs.StringVar(&s.StorageConfig.Transport.KeyFile, "etcd-keyfile", s.StorageConfig.Transport.KeyFile, 162 "SSL key file used to secure etcd communication.") 163 164 fs.StringVar(&s.StorageConfig.Transport.CertFile, "etcd-certfile", s.StorageConfig.Transport.CertFile, 165 "SSL certification file used to secure etcd communication.") 166 167 fs.StringVar(&s.StorageConfig.Transport.TrustedCAFile, "etcd-cafile", s.StorageConfig.Transport.TrustedCAFile, 168 "SSL Certificate Authority file used to secure etcd communication.") 169 170 fs.StringVar(&s.EncryptionProviderConfigFilepath, "experimental-encryption-provider-config", s.EncryptionProviderConfigFilepath, 171 "The file containing configuration for encryption providers to be used for storing secrets in etcd") 172 fs.MarkDeprecated("experimental-encryption-provider-config", "use --encryption-provider-config.") 173 174 fs.StringVar(&s.EncryptionProviderConfigFilepath, "encryption-provider-config", s.EncryptionProviderConfigFilepath, 175 "The file containing configuration for encryption providers to be used for storing secrets in etcd") 176 177 fs.DurationVar(&s.StorageConfig.CompactionInterval, "etcd-compaction-interval", s.StorageConfig.CompactionInterval, 178 "The interval of compaction requests. If 0, the compaction request from apiserver is disabled.") 179 180 fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+ 181 "Frequency of polling etcd for number of resources per type. 0 disables the metric collection.") 182 183 fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval, 184 "The interval of requests to poll etcd and update metric. 0 disables the metric collection") 185 186 fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout, 187 "The timeout to use when checking etcd health.") 188 189 fs.Int64Var(&s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, 190 "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") 191} 192 193func (s *EtcdOptions) ApplyTo(c *server.Config) error { 194 if s == nil { 195 return nil 196 } 197 if err := s.addEtcdHealthEndpoint(c); err != nil { 198 return err 199 } 200 transformerOverrides := make(map[schema.GroupResource]value.Transformer) 201 if len(s.EncryptionProviderConfigFilepath) > 0 { 202 var err error 203 transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath) 204 if err != nil { 205 return err 206 } 207 } 208 209 c.RESTOptionsGetter = &SimpleRestOptionsFactory{ 210 Options: *s, 211 TransformerOverrides: transformerOverrides, 212 } 213 return nil 214} 215 216func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error { 217 if err := s.addEtcdHealthEndpoint(c); err != nil { 218 return err 219 } 220 c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory} 221 return nil 222} 223 224func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { 225 healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig) 226 if err != nil { 227 return err 228 } 229 c.AddHealthChecks(healthz.NamedCheck("etcd", func(r *http.Request) error { 230 return healthCheck() 231 })) 232 233 if s.EncryptionProviderConfigFilepath != "" { 234 kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath) 235 if err != nil { 236 return err 237 } 238 c.AddHealthChecks(kmsPluginHealthzChecks...) 239 } 240 241 return nil 242} 243 244type SimpleRestOptionsFactory struct { 245 Options EtcdOptions 246 TransformerOverrides map[schema.GroupResource]value.Transformer 247} 248 249func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { 250 ret := generic.RESTOptions{ 251 StorageConfig: &f.Options.StorageConfig, 252 Decorator: generic.UndecoratedStorage, 253 EnableGarbageCollection: f.Options.EnableGarbageCollection, 254 DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, 255 ResourcePrefix: resource.Group + "/" + resource.Resource, 256 CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, 257 } 258 if f.TransformerOverrides != nil { 259 if transformer, ok := f.TransformerOverrides[resource]; ok { 260 ret.StorageConfig.Transformer = transformer 261 } 262 } 263 if f.Options.EnableWatchCache { 264 sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) 265 if err != nil { 266 return generic.RESTOptions{}, err 267 } 268 size, ok := sizes[resource] 269 if ok && size > 0 { 270 klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource) 271 } 272 if ok && size <= 0 { 273 ret.Decorator = generic.UndecoratedStorage 274 } else { 275 ret.Decorator = genericregistry.StorageWithCacher() 276 } 277 } 278 return ret, nil 279} 280 281type StorageFactoryRestOptionsFactory struct { 282 Options EtcdOptions 283 StorageFactory serverstorage.StorageFactory 284} 285 286func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { 287 storageConfig, err := f.StorageFactory.NewConfig(resource) 288 if err != nil { 289 return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) 290 } 291 292 ret := generic.RESTOptions{ 293 StorageConfig: storageConfig, 294 Decorator: generic.UndecoratedStorage, 295 DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, 296 EnableGarbageCollection: f.Options.EnableGarbageCollection, 297 ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), 298 CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, 299 } 300 if f.Options.EnableWatchCache { 301 sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) 302 if err != nil { 303 return generic.RESTOptions{}, err 304 } 305 size, ok := sizes[resource] 306 if ok && size > 0 { 307 klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource) 308 } 309 if ok && size <= 0 { 310 ret.Decorator = generic.UndecoratedStorage 311 } else { 312 ret.Decorator = genericregistry.StorageWithCacher() 313 } 314 } 315 316 return ret, nil 317} 318 319// ParseWatchCacheSizes turns a list of cache size values into a map of group resources 320// to requested sizes. 321func ParseWatchCacheSizes(cacheSizes []string) (map[schema.GroupResource]int, error) { 322 watchCacheSizes := make(map[schema.GroupResource]int) 323 for _, c := range cacheSizes { 324 tokens := strings.Split(c, "#") 325 if len(tokens) != 2 { 326 return nil, fmt.Errorf("invalid value of watch cache size: %s", c) 327 } 328 329 size, err := strconv.Atoi(tokens[1]) 330 if err != nil { 331 return nil, fmt.Errorf("invalid size of watch cache size: %s", c) 332 } 333 if size < 0 { 334 return nil, fmt.Errorf("watch cache size cannot be negative: %s", c) 335 } 336 watchCacheSizes[schema.ParseGroupResource(tokens[0])] = size 337 } 338 return watchCacheSizes, nil 339} 340 341// WriteWatchCacheSizes turns a map of cache size values into a list of string specifications. 342func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]string, error) { 343 var cacheSizes []string 344 345 for resource, size := range watchCacheSizes { 346 if size < 0 { 347 return nil, fmt.Errorf("watch cache size cannot be negative for resource %s", resource) 348 } 349 cacheSizes = append(cacheSizes, fmt.Sprintf("%s#%d", resource.String(), size)) 350 } 351 return cacheSizes, nil 352} 353