1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package options
18
19import (
20	"fmt"
21	"net/http"
22	"strconv"
23	"strings"
24	"time"
25
26	"github.com/spf13/pflag"
27
28	"k8s.io/apimachinery/pkg/runtime/schema"
29	"k8s.io/apimachinery/pkg/util/sets"
30	"k8s.io/apiserver/pkg/registry/generic"
31	genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
32	"k8s.io/apiserver/pkg/server"
33	"k8s.io/apiserver/pkg/server/healthz"
34	"k8s.io/apiserver/pkg/server/options/encryptionconfig"
35	serverstorage "k8s.io/apiserver/pkg/server/storage"
36	"k8s.io/apiserver/pkg/storage/storagebackend"
37	storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
38	"k8s.io/apiserver/pkg/storage/value"
39	"k8s.io/klog/v2"
40)
41
42type EtcdOptions struct {
43	// The value of Paging on StorageConfig will be overridden by the
44	// calculated feature gate value.
45	StorageConfig                    storagebackend.Config
46	EncryptionProviderConfigFilepath string
47
48	EtcdServersOverrides []string
49
50	// To enable protobuf as storage format, it is enough
51	// to set it to "application/vnd.kubernetes.protobuf".
52	DefaultStorageMediaType string
53	DeleteCollectionWorkers int
54	EnableGarbageCollection bool
55
56	// Set EnableWatchCache to false to disable all watch caches
57	EnableWatchCache bool
58	// Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set
59	DefaultWatchCacheSize int
60	// WatchCacheSizes represents override to a given resource
61	WatchCacheSizes []string
62}
63
64var storageTypes = sets.NewString(
65	storagebackend.StorageTypeETCD3,
66)
67
68func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions {
69	options := &EtcdOptions{
70		StorageConfig:           *backendConfig,
71		DefaultStorageMediaType: "application/json",
72		DeleteCollectionWorkers: 1,
73		EnableGarbageCollection: true,
74		EnableWatchCache:        true,
75		DefaultWatchCacheSize:   100,
76	}
77	options.StorageConfig.CountMetricPollPeriod = time.Minute
78	return options
79}
80
81func (s *EtcdOptions) Validate() []error {
82	if s == nil {
83		return nil
84	}
85
86	allErrors := []error{}
87	if len(s.StorageConfig.Transport.ServerList) == 0 {
88		allErrors = append(allErrors, fmt.Errorf("--etcd-servers must be specified"))
89	}
90
91	if s.StorageConfig.Type != storagebackend.StorageTypeUnset && !storageTypes.Has(s.StorageConfig.Type) {
92		allErrors = append(allErrors, fmt.Errorf("--storage-backend invalid, allowed values: %s. If not specified, it will default to 'etcd3'", strings.Join(storageTypes.List(), ", ")))
93	}
94
95	for _, override := range s.EtcdServersOverrides {
96		tokens := strings.Split(override, "#")
97		if len(tokens) != 2 {
98			allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated"))
99			continue
100		}
101
102		apiresource := strings.Split(tokens[0], "/")
103		if len(apiresource) != 2 {
104			allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated"))
105			continue
106		}
107
108	}
109
110	return allErrors
111}
112
113// AddEtcdFlags adds flags related to etcd storage for a specific APIServer to the specified FlagSet
114func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
115	if s == nil {
116		return
117	}
118
119	fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, ""+
120		"Per-resource etcd servers overrides, comma separated. The individual override "+
121		"format: group/resource#servers, where servers are URLs, semicolon separated. "+
122		"Note that this applies only to resources compiled into this server binary. ")
123
124	fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, ""+
125		"The media type to use to store objects in storage. "+
126		"Some resources or storage backends may only support a specific media type and will ignore this setting.")
127	fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers,
128		"Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.")
129
130	fs.BoolVar(&s.EnableGarbageCollection, "enable-garbage-collector", s.EnableGarbageCollection, ""+
131		"Enables the generic garbage collector. MUST be synced with the corresponding flag "+
132		"of the kube-controller-manager.")
133
134	fs.BoolVar(&s.EnableWatchCache, "watch-cache", s.EnableWatchCache,
135		"Enable watch caching in the apiserver")
136
137	fs.IntVar(&s.DefaultWatchCacheSize, "default-watch-cache-size", s.DefaultWatchCacheSize,
138		"Default watch cache size. If zero, watch cache will be disabled for resources that do not have a default watch size set.")
139
140	fs.StringSliceVar(&s.WatchCacheSizes, "watch-cache-sizes", s.WatchCacheSizes, ""+
141		"Watch cache size settings for some resources (pods, nodes, etc.), comma separated. "+
142		"The individual setting format: resource[.group]#size, where resource is lowercase plural (no version), "+
143		"group is omitted for resources of apiVersion v1 (the legacy core API) and included for others, "+
144		"and size is a number. It takes effect when watch-cache is enabled. "+
145		"Some resources (replicationcontrollers, endpoints, nodes, pods, services, apiservices.apiregistration.k8s.io) "+
146		"have system defaults set by heuristics, others default to default-watch-cache-size")
147
148	fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type,
149		"The storage backend for persistence. Options: 'etcd3' (default).")
150
151	dummyCacheSize := 0
152	fs.IntVar(&dummyCacheSize, "deserialization-cache-size", 0, "Number of deserialized json objects to cache in memory.")
153	fs.MarkDeprecated("deserialization-cache-size", "the deserialization cache was dropped in 1.13 with support for etcd2")
154
155	fs.StringSliceVar(&s.StorageConfig.Transport.ServerList, "etcd-servers", s.StorageConfig.Transport.ServerList,
156		"List of etcd servers to connect with (scheme://ip:port), comma separated.")
157
158	fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix,
159		"The prefix to prepend to all resource paths in etcd.")
160
161	fs.StringVar(&s.StorageConfig.Transport.KeyFile, "etcd-keyfile", s.StorageConfig.Transport.KeyFile,
162		"SSL key file used to secure etcd communication.")
163
164	fs.StringVar(&s.StorageConfig.Transport.CertFile, "etcd-certfile", s.StorageConfig.Transport.CertFile,
165		"SSL certification file used to secure etcd communication.")
166
167	fs.StringVar(&s.StorageConfig.Transport.TrustedCAFile, "etcd-cafile", s.StorageConfig.Transport.TrustedCAFile,
168		"SSL Certificate Authority file used to secure etcd communication.")
169
170	fs.StringVar(&s.EncryptionProviderConfigFilepath, "experimental-encryption-provider-config", s.EncryptionProviderConfigFilepath,
171		"The file containing configuration for encryption providers to be used for storing secrets in etcd")
172	fs.MarkDeprecated("experimental-encryption-provider-config", "use --encryption-provider-config.")
173
174	fs.StringVar(&s.EncryptionProviderConfigFilepath, "encryption-provider-config", s.EncryptionProviderConfigFilepath,
175		"The file containing configuration for encryption providers to be used for storing secrets in etcd")
176
177	fs.DurationVar(&s.StorageConfig.CompactionInterval, "etcd-compaction-interval", s.StorageConfig.CompactionInterval,
178		"The interval of compaction requests. If 0, the compaction request from apiserver is disabled.")
179
180	fs.DurationVar(&s.StorageConfig.CountMetricPollPeriod, "etcd-count-metric-poll-period", s.StorageConfig.CountMetricPollPeriod, ""+
181		"Frequency of polling etcd for number of resources per type. 0 disables the metric collection.")
182
183	fs.DurationVar(&s.StorageConfig.DBMetricPollInterval, "etcd-db-metric-poll-interval", s.StorageConfig.DBMetricPollInterval,
184		"The interval of requests to poll etcd and update metric. 0 disables the metric collection")
185
186	fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout,
187		"The timeout to use when checking etcd health.")
188
189	fs.Int64Var(&s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds,
190		"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
191}
192
193func (s *EtcdOptions) ApplyTo(c *server.Config) error {
194	if s == nil {
195		return nil
196	}
197	if err := s.addEtcdHealthEndpoint(c); err != nil {
198		return err
199	}
200	transformerOverrides := make(map[schema.GroupResource]value.Transformer)
201	if len(s.EncryptionProviderConfigFilepath) > 0 {
202		var err error
203		transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath)
204		if err != nil {
205			return err
206		}
207	}
208
209	c.RESTOptionsGetter = &SimpleRestOptionsFactory{
210		Options:              *s,
211		TransformerOverrides: transformerOverrides,
212	}
213	return nil
214}
215
216func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
217	if err := s.addEtcdHealthEndpoint(c); err != nil {
218		return err
219	}
220	c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
221	return nil
222}
223
224func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
225	healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)
226	if err != nil {
227		return err
228	}
229	c.AddHealthChecks(healthz.NamedCheck("etcd", func(r *http.Request) error {
230		return healthCheck()
231	}))
232
233	if s.EncryptionProviderConfigFilepath != "" {
234		kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath)
235		if err != nil {
236			return err
237		}
238		c.AddHealthChecks(kmsPluginHealthzChecks...)
239	}
240
241	return nil
242}
243
244type SimpleRestOptionsFactory struct {
245	Options              EtcdOptions
246	TransformerOverrides map[schema.GroupResource]value.Transformer
247}
248
249func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
250	ret := generic.RESTOptions{
251		StorageConfig:           &f.Options.StorageConfig,
252		Decorator:               generic.UndecoratedStorage,
253		EnableGarbageCollection: f.Options.EnableGarbageCollection,
254		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
255		ResourcePrefix:          resource.Group + "/" + resource.Resource,
256		CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
257	}
258	if f.TransformerOverrides != nil {
259		if transformer, ok := f.TransformerOverrides[resource]; ok {
260			ret.StorageConfig.Transformer = transformer
261		}
262	}
263	if f.Options.EnableWatchCache {
264		sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
265		if err != nil {
266			return generic.RESTOptions{}, err
267		}
268		size, ok := sizes[resource]
269		if ok && size > 0 {
270			klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
271		}
272		if ok && size <= 0 {
273			ret.Decorator = generic.UndecoratedStorage
274		} else {
275			ret.Decorator = genericregistry.StorageWithCacher()
276		}
277	}
278	return ret, nil
279}
280
281type StorageFactoryRestOptionsFactory struct {
282	Options        EtcdOptions
283	StorageFactory serverstorage.StorageFactory
284}
285
286func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
287	storageConfig, err := f.StorageFactory.NewConfig(resource)
288	if err != nil {
289		return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
290	}
291
292	ret := generic.RESTOptions{
293		StorageConfig:           storageConfig,
294		Decorator:               generic.UndecoratedStorage,
295		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
296		EnableGarbageCollection: f.Options.EnableGarbageCollection,
297		ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
298		CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
299	}
300	if f.Options.EnableWatchCache {
301		sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
302		if err != nil {
303			return generic.RESTOptions{}, err
304		}
305		size, ok := sizes[resource]
306		if ok && size > 0 {
307			klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
308		}
309		if ok && size <= 0 {
310			ret.Decorator = generic.UndecoratedStorage
311		} else {
312			ret.Decorator = genericregistry.StorageWithCacher()
313		}
314	}
315
316	return ret, nil
317}
318
319// ParseWatchCacheSizes turns a list of cache size values into a map of group resources
320// to requested sizes.
321func ParseWatchCacheSizes(cacheSizes []string) (map[schema.GroupResource]int, error) {
322	watchCacheSizes := make(map[schema.GroupResource]int)
323	for _, c := range cacheSizes {
324		tokens := strings.Split(c, "#")
325		if len(tokens) != 2 {
326			return nil, fmt.Errorf("invalid value of watch cache size: %s", c)
327		}
328
329		size, err := strconv.Atoi(tokens[1])
330		if err != nil {
331			return nil, fmt.Errorf("invalid size of watch cache size: %s", c)
332		}
333		if size < 0 {
334			return nil, fmt.Errorf("watch cache size cannot be negative: %s", c)
335		}
336		watchCacheSizes[schema.ParseGroupResource(tokens[0])] = size
337	}
338	return watchCacheSizes, nil
339}
340
341// WriteWatchCacheSizes turns a map of cache size values into a list of string specifications.
342func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]string, error) {
343	var cacheSizes []string
344
345	for resource, size := range watchCacheSizes {
346		if size < 0 {
347			return nil, fmt.Errorf("watch cache size cannot be negative for resource %s", resource)
348		}
349		cacheSizes = append(cacheSizes, fmt.Sprintf("%s#%d", resource.String(), size))
350	}
351	return cacheSizes, nil
352}
353