1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17// Package app implements a server that runs a set of active 18// components. This includes replication controllers, service endpoints and 19// nodes. 20// 21package app 22 23import ( 24 "errors" 25 "fmt" 26 "net" 27 "net/http" 28 "strings" 29 "time" 30 31 "k8s.io/klog/v2" 32 33 v1 "k8s.io/api/core/v1" 34 "k8s.io/apimachinery/pkg/runtime/schema" 35 "k8s.io/apiserver/pkg/quota/v1/generic" 36 utilfeature "k8s.io/apiserver/pkg/util/feature" 37 clientset "k8s.io/client-go/kubernetes" 38 "k8s.io/client-go/metadata" 39 restclient "k8s.io/client-go/rest" 40 cloudnodelifecyclecontroller "k8s.io/cloud-provider/controllers/nodelifecycle" 41 routecontroller "k8s.io/cloud-provider/controllers/route" 42 servicecontroller "k8s.io/cloud-provider/controllers/service" 43 "k8s.io/component-base/metrics/prometheus/ratelimiter" 44 csitrans "k8s.io/csi-translation-lib" 45 "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" 46 "k8s.io/kubernetes/pkg/controller" 47 endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" 48 "k8s.io/kubernetes/pkg/controller/garbagecollector" 49 namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" 50 nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam" 51 nodeipamconfig "k8s.io/kubernetes/pkg/controller/nodeipam/config" 52 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam" 53 lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle" 54 "k8s.io/kubernetes/pkg/controller/podgc" 55 replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" 56 resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" 57 serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" 58 "k8s.io/kubernetes/pkg/controller/storageversiongc" 59 ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" 60 "k8s.io/kubernetes/pkg/controller/ttlafterfinished" 61 "k8s.io/kubernetes/pkg/controller/volume/attachdetach" 62 "k8s.io/kubernetes/pkg/controller/volume/ephemeral" 63 "k8s.io/kubernetes/pkg/controller/volume/expand" 64 persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" 65 "k8s.io/kubernetes/pkg/controller/volume/pvcprotection" 66 "k8s.io/kubernetes/pkg/controller/volume/pvprotection" 67 "k8s.io/kubernetes/pkg/features" 68 quotainstall "k8s.io/kubernetes/pkg/quota/v1/install" 69 "k8s.io/kubernetes/pkg/volume/csimigration" 70 netutils "k8s.io/utils/net" 71) 72 73const ( 74 // defaultNodeMaskCIDRIPv4 is default mask size for IPv4 node cidr 75 defaultNodeMaskCIDRIPv4 = 24 76 // defaultNodeMaskCIDRIPv6 is default mask size for IPv6 node cidr 77 defaultNodeMaskCIDRIPv6 = 64 78) 79 80func startServiceController(ctx ControllerContext) (http.Handler, bool, error) { 81 serviceController, err := servicecontroller.New( 82 ctx.Cloud, 83 ctx.ClientBuilder.ClientOrDie("service-controller"), 84 ctx.InformerFactory.Core().V1().Services(), 85 ctx.InformerFactory.Core().V1().Nodes(), 86 ctx.ComponentConfig.KubeCloudShared.ClusterName, 87 utilfeature.DefaultFeatureGate, 88 ) 89 if err != nil { 90 // This error shouldn't fail. It lives like this as a legacy. 91 klog.Errorf("Failed to start service controller: %v", err) 92 return nil, false, nil 93 } 94 go serviceController.Run(ctx.Stop, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs)) 95 return nil, true, nil 96} 97 98func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error) { 99 var serviceCIDR *net.IPNet 100 var secondaryServiceCIDR *net.IPNet 101 102 // should we start nodeIPAM 103 if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs { 104 return nil, false, nil 105 } 106 107 // failure: bad cidrs in config 108 clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) 109 if err != nil { 110 return nil, false, err 111 } 112 113 // failure: more than one cidr and dual stack is not enabled 114 if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { 115 return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack or EndpointSlice feature is not enabled", len(clusterCIDRs)) 116 } 117 118 // failure: more than one cidr but they are not configured as dual stack 119 if len(clusterCIDRs) > 1 && !dualStack { 120 return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily)", len(clusterCIDRs)) 121 } 122 123 // failure: more than cidrs is not allowed even with dual stack 124 if len(clusterCIDRs) > 2 { 125 return nil, false, fmt.Errorf("len of clusters is:%v > more than max allowed of 2", len(clusterCIDRs)) 126 } 127 128 // service cidr processing 129 if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 { 130 _, serviceCIDR, err = net.ParseCIDR(ctx.ComponentConfig.NodeIPAMController.ServiceCIDR) 131 if err != nil { 132 klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.ServiceCIDR, err) 133 } 134 } 135 136 if len(strings.TrimSpace(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 { 137 _, secondaryServiceCIDR, err = net.ParseCIDR(ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR) 138 if err != nil { 139 klog.Warningf("Unsuccessful parsing of service CIDR %v: %v", ctx.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, err) 140 } 141 } 142 143 // the following checks are triggered if both serviceCIDR and secondaryServiceCIDR are provided 144 if serviceCIDR != nil && secondaryServiceCIDR != nil { 145 // should have dual stack flag enabled 146 if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { 147 return nil, false, fmt.Errorf("secondary service cidr is provided and IPv6DualStack feature is not enabled") 148 } 149 150 // should be dual stack (from different IPFamilies) 151 dualstackServiceCIDR, err := netutils.IsDualStackCIDRs([]*net.IPNet{serviceCIDR, secondaryServiceCIDR}) 152 if err != nil { 153 return nil, false, fmt.Errorf("failed to perform dualstack check on serviceCIDR and secondaryServiceCIDR error:%v", err) 154 } 155 if !dualstackServiceCIDR { 156 return nil, false, fmt.Errorf("serviceCIDR and secondaryServiceCIDR are not dualstack (from different IPfamiles)") 157 } 158 } 159 160 var nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6 int 161 if dualStack { 162 // only --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 supported with dual stack clusters. 163 // --node-cidr-mask-size flag is incompatible with dual stack clusters. 164 nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6, err = setNodeCIDRMaskSizesDualStack(ctx.ComponentConfig.NodeIPAMController) 165 } else { 166 // only --node-cidr-mask-size supported with single stack clusters. 167 // --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 flags are incompatible with single stack clusters. 168 nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6, err = setNodeCIDRMaskSizes(ctx.ComponentConfig.NodeIPAMController) 169 } 170 171 if err != nil { 172 return nil, false, err 173 } 174 175 // get list of node cidr mask sizes 176 nodeCIDRMaskSizes := getNodeCIDRMaskSizes(clusterCIDRs, nodeCIDRMaskSizeIPv4, nodeCIDRMaskSizeIPv6) 177 178 nodeIpamController, err := nodeipamcontroller.NewNodeIpamController( 179 ctx.InformerFactory.Core().V1().Nodes(), 180 ctx.Cloud, 181 ctx.ClientBuilder.ClientOrDie("node-controller"), 182 clusterCIDRs, 183 serviceCIDR, 184 secondaryServiceCIDR, 185 nodeCIDRMaskSizes, 186 ipam.CIDRAllocatorType(ctx.ComponentConfig.KubeCloudShared.CIDRAllocatorType), 187 ) 188 if err != nil { 189 return nil, true, err 190 } 191 go nodeIpamController.Run(ctx.Stop) 192 return nil, true, nil 193} 194 195func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) { 196 lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController( 197 ctx.InformerFactory.Coordination().V1().Leases(), 198 ctx.InformerFactory.Core().V1().Pods(), 199 ctx.InformerFactory.Core().V1().Nodes(), 200 ctx.InformerFactory.Apps().V1().DaemonSets(), 201 // node lifecycle controller uses existing cluster role from node-controller 202 ctx.ClientBuilder.ClientOrDie("node-controller"), 203 ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, 204 ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration, 205 ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration, 206 ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration, 207 ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate, 208 ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate, 209 ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold, 210 ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold, 211 ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager, 212 ) 213 if err != nil { 214 return nil, true, err 215 } 216 go lifecycleController.Run(ctx.Stop) 217 return nil, true, nil 218} 219 220func startCloudNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) { 221 cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController( 222 ctx.InformerFactory.Core().V1().Nodes(), 223 // cloud node lifecycle controller uses existing cluster role from node-controller 224 ctx.ClientBuilder.ClientOrDie("node-controller"), 225 ctx.Cloud, 226 ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration, 227 ) 228 if err != nil { 229 // the controller manager should continue to run if the "Instances" interface is not 230 // supported, though it's unlikely for a cloud provider to not support it 231 klog.Errorf("failed to start cloud node lifecycle controller: %v", err) 232 return nil, false, nil 233 } 234 235 go cloudNodeLifecycleController.Run(ctx.Stop) 236 return nil, true, nil 237} 238 239func startRouteController(ctx ControllerContext) (http.Handler, bool, error) { 240 if !ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes { 241 klog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", ctx.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, ctx.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes) 242 return nil, false, nil 243 } 244 if ctx.Cloud == nil { 245 klog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.") 246 return nil, false, nil 247 } 248 routes, ok := ctx.Cloud.Routes() 249 if !ok { 250 klog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.") 251 return nil, false, nil 252 } 253 254 // failure: bad cidrs in config 255 clusterCIDRs, dualStack, err := processCIDRs(ctx.ComponentConfig.KubeCloudShared.ClusterCIDR) 256 if err != nil { 257 return nil, false, err 258 } 259 260 // failure: more than one cidr and dual stack is not enabled 261 if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { 262 return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack feature is not enabled", len(clusterCIDRs)) 263 } 264 265 // failure: more than one cidr but they are not configured as dual stack 266 if len(clusterCIDRs) > 1 && !dualStack { 267 return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs)) 268 } 269 270 // failure: more than cidrs is not allowed even with dual stack 271 if len(clusterCIDRs) > 2 { 272 return nil, false, fmt.Errorf("length of clusterCIDRs is:%v more than max allowed of 2", len(clusterCIDRs)) 273 } 274 275 routeController := routecontroller.New(routes, 276 ctx.ClientBuilder.ClientOrDie("route-controller"), 277 ctx.InformerFactory.Core().V1().Nodes(), 278 ctx.ComponentConfig.KubeCloudShared.ClusterName, 279 clusterCIDRs) 280 go routeController.Run(ctx.Stop, ctx.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration) 281 return nil, true, nil 282} 283 284func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) { 285 plugins, err := ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) 286 if err != nil { 287 return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err) 288 } 289 filteredDialOptions, err := options.ParseVolumeHostFilters( 290 ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, 291 ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) 292 if err != nil { 293 return nil, true, err 294 } 295 params := persistentvolumecontroller.ControllerParameters{ 296 KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"), 297 SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration, 298 VolumePlugins: plugins, 299 Cloud: ctx.Cloud, 300 ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName, 301 VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(), 302 ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), 303 ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(), 304 PodInformer: ctx.InformerFactory.Core().V1().Pods(), 305 NodeInformer: ctx.InformerFactory.Core().V1().Nodes(), 306 EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning, 307 FilteredDialOptions: filteredDialOptions, 308 } 309 volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params) 310 if volumeControllerErr != nil { 311 return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr) 312 } 313 go volumeController.Run(ctx.Stop) 314 return nil, true, nil 315} 316 317func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, error) { 318 if ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration < time.Second { 319 return nil, true, fmt.Errorf("duration time must be greater than one second as set via command line option reconcile-sync-loop-period") 320 } 321 322 csiNodeInformer := ctx.InformerFactory.Storage().V1().CSINodes() 323 csiDriverInformer := ctx.InformerFactory.Storage().V1().CSIDrivers() 324 325 plugins, err := ProbeAttachableVolumePlugins() 326 if err != nil { 327 return nil, true, fmt.Errorf("failed to probe volume plugins when starting attach/detach controller: %v", err) 328 } 329 330 filteredDialOptions, err := options.ParseVolumeHostFilters( 331 ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, 332 ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) 333 if err != nil { 334 return nil, true, err 335 } 336 337 attachDetachController, attachDetachControllerErr := 338 attachdetach.NewAttachDetachController( 339 ctx.ClientBuilder.ClientOrDie("attachdetach-controller"), 340 ctx.InformerFactory.Core().V1().Pods(), 341 ctx.InformerFactory.Core().V1().Nodes(), 342 ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), 343 ctx.InformerFactory.Core().V1().PersistentVolumes(), 344 csiNodeInformer, 345 csiDriverInformer, 346 ctx.InformerFactory.Storage().V1().VolumeAttachments(), 347 ctx.Cloud, 348 plugins, 349 GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration), 350 ctx.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync, 351 ctx.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration, 352 attachdetach.DefaultTimerConfig, 353 filteredDialOptions, 354 ) 355 if attachDetachControllerErr != nil { 356 return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr) 357 } 358 go attachDetachController.Run(ctx.Stop) 359 return nil, true, nil 360} 361 362func startVolumeExpandController(ctx ControllerContext) (http.Handler, bool, error) { 363 if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) { 364 plugins, err := ProbeExpandableVolumePlugins(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration) 365 if err != nil { 366 return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err) 367 } 368 csiTranslator := csitrans.New() 369 filteredDialOptions, err := options.ParseVolumeHostFilters( 370 ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostCIDRDenylist, 371 ctx.ComponentConfig.PersistentVolumeBinderController.VolumeHostAllowLocalLoopback) 372 if err != nil { 373 return nil, true, err 374 } 375 expandController, expandControllerErr := expand.NewExpandController( 376 ctx.ClientBuilder.ClientOrDie("expand-controller"), 377 ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), 378 ctx.InformerFactory.Core().V1().PersistentVolumes(), 379 ctx.Cloud, 380 plugins, 381 csiTranslator, 382 csimigration.NewPluginManager(csiTranslator, utilfeature.DefaultFeatureGate), 383 filteredDialOptions, 384 ) 385 386 if expandControllerErr != nil { 387 return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr) 388 } 389 go expandController.Run(ctx.Stop) 390 return nil, true, nil 391 } 392 return nil, false, nil 393} 394 395func startEphemeralVolumeController(ctx ControllerContext) (http.Handler, bool, error) { 396 if utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume) { 397 ephemeralController, err := ephemeral.NewController( 398 ctx.ClientBuilder.ClientOrDie("ephemeral-volume-controller"), 399 ctx.InformerFactory.Core().V1().Pods(), 400 ctx.InformerFactory.Core().V1().PersistentVolumeClaims()) 401 if err != nil { 402 return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err) 403 } 404 // TODO (before beta at the latest): make this configurable similar to the EndpointController 405 go ephemeralController.Run(1 /* int(ctx.ComponentConfig.EphemeralController.ConcurrentEphemeralVolumeSyncs) */, ctx.Stop) 406 return nil, true, nil 407 } 408 return nil, false, nil 409} 410 411func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) { 412 go endpointcontroller.NewEndpointController( 413 ctx.InformerFactory.Core().V1().Pods(), 414 ctx.InformerFactory.Core().V1().Services(), 415 ctx.InformerFactory.Core().V1().Endpoints(), 416 ctx.ClientBuilder.ClientOrDie("endpoint-controller"), 417 ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration, 418 ).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop) 419 return nil, true, nil 420} 421 422func startReplicationController(ctx ControllerContext) (http.Handler, bool, error) { 423 go replicationcontroller.NewReplicationManager( 424 ctx.InformerFactory.Core().V1().Pods(), 425 ctx.InformerFactory.Core().V1().ReplicationControllers(), 426 ctx.ClientBuilder.ClientOrDie("replication-controller"), 427 replicationcontroller.BurstReplicas, 428 ).Run(int(ctx.ComponentConfig.ReplicationController.ConcurrentRCSyncs), ctx.Stop) 429 return nil, true, nil 430} 431 432func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) { 433 go podgc.NewPodGC( 434 ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), 435 ctx.InformerFactory.Core().V1().Pods(), 436 ctx.InformerFactory.Core().V1().Nodes(), 437 int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold), 438 ).Run(ctx.Stop) 439 return nil, true, nil 440} 441 442func startResourceQuotaController(ctx ControllerContext) (http.Handler, bool, error) { 443 resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") 444 resourceQuotaControllerDiscoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller") 445 discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources 446 listerFuncForResource := generic.ListerFuncForResourceFunc(ctx.InformerFactory.ForResource) 447 quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource) 448 449 resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{ 450 QuotaClient: resourceQuotaControllerClient.CoreV1(), 451 ResourceQuotaInformer: ctx.InformerFactory.Core().V1().ResourceQuotas(), 452 ResyncPeriod: controller.StaticResyncPeriodFunc(ctx.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration), 453 InformerFactory: ctx.ObjectOrMetadataInformerFactory, 454 ReplenishmentResyncPeriod: ctx.ResyncPeriod, 455 DiscoveryFunc: discoveryFunc, 456 IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, 457 InformersStarted: ctx.InformersStarted, 458 Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), 459 } 460 if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil { 461 if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { 462 return nil, true, err 463 } 464 } 465 466 resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions) 467 if err != nil { 468 return nil, false, err 469 } 470 go resourceQuotaController.Run(int(ctx.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs), ctx.Stop) 471 472 // Periodically the quota controller to detect new resource types 473 go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Stop) 474 475 return nil, true, nil 476} 477 478func startNamespaceController(ctx ControllerContext) (http.Handler, bool, error) { 479 // the namespace cleanup controller is very chatty. It makes lots of discovery calls and then it makes lots of delete calls 480 // the ratelimiter negatively affects its speed. Deleting 100 total items in a namespace (that's only a few of each resource 481 // including events), takes ~10 seconds by default. 482 nsKubeconfig := ctx.ClientBuilder.ConfigOrDie("namespace-controller") 483 nsKubeconfig.QPS *= 20 484 nsKubeconfig.Burst *= 100 485 namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig) 486 return startModifiedNamespaceController(ctx, namespaceKubeClient, nsKubeconfig) 487} 488 489func startModifiedNamespaceController(ctx ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (http.Handler, bool, error) { 490 491 metadataClient, err := metadata.NewForConfig(nsKubeconfig) 492 if err != nil { 493 return nil, true, err 494 } 495 496 discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources 497 498 namespaceController := namespacecontroller.NewNamespaceController( 499 namespaceKubeClient, 500 metadataClient, 501 discoverResourcesFn, 502 ctx.InformerFactory.Core().V1().Namespaces(), 503 ctx.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration, 504 v1.FinalizerKubernetes, 505 ) 506 go namespaceController.Run(int(ctx.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs), ctx.Stop) 507 508 return nil, true, nil 509} 510 511func startServiceAccountController(ctx ControllerContext) (http.Handler, bool, error) { 512 sac, err := serviceaccountcontroller.NewServiceAccountsController( 513 ctx.InformerFactory.Core().V1().ServiceAccounts(), 514 ctx.InformerFactory.Core().V1().Namespaces(), 515 ctx.ClientBuilder.ClientOrDie("service-account-controller"), 516 serviceaccountcontroller.DefaultServiceAccountsControllerOptions(), 517 ) 518 if err != nil { 519 return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err) 520 } 521 go sac.Run(1, ctx.Stop) 522 return nil, true, nil 523} 524 525func startTTLController(ctx ControllerContext) (http.Handler, bool, error) { 526 go ttlcontroller.NewTTLController( 527 ctx.InformerFactory.Core().V1().Nodes(), 528 ctx.ClientBuilder.ClientOrDie("ttl-controller"), 529 ).Run(5, ctx.Stop) 530 return nil, true, nil 531} 532 533func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) { 534 if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector { 535 return nil, false, nil 536 } 537 538 gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector") 539 discoveryClient := ctx.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector") 540 541 config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector") 542 metadataClient, err := metadata.NewForConfig(config) 543 if err != nil { 544 return nil, true, err 545 } 546 547 ignoredResources := make(map[schema.GroupResource]struct{}) 548 for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources { 549 ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{} 550 } 551 garbageCollector, err := garbagecollector.NewGarbageCollector( 552 gcClientset, 553 metadataClient, 554 ctx.RESTMapper, 555 ignoredResources, 556 ctx.ObjectOrMetadataInformerFactory, 557 ctx.InformersStarted, 558 ) 559 if err != nil { 560 return nil, true, fmt.Errorf("failed to start the generic garbage collector: %v", err) 561 } 562 563 // Start the garbage collector. 564 workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs) 565 go garbageCollector.Run(workers, ctx.Stop) 566 567 // Periodically refresh the RESTMapper with new discovery information and sync 568 // the garbage collector. 569 go garbageCollector.Sync(discoveryClient, 30*time.Second, ctx.Stop) 570 571 return garbagecollector.NewDebugHandler(garbageCollector), true, nil 572} 573 574func startPVCProtectionController(ctx ControllerContext) (http.Handler, bool, error) { 575 pvcProtectionController, err := pvcprotection.NewPVCProtectionController( 576 ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), 577 ctx.InformerFactory.Core().V1().Pods(), 578 ctx.ClientBuilder.ClientOrDie("pvc-protection-controller"), 579 utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), 580 utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), 581 ) 582 if err != nil { 583 return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err) 584 } 585 go pvcProtectionController.Run(1, ctx.Stop) 586 return nil, true, nil 587} 588 589func startPVProtectionController(ctx ControllerContext) (http.Handler, bool, error) { 590 go pvprotection.NewPVProtectionController( 591 ctx.InformerFactory.Core().V1().PersistentVolumes(), 592 ctx.ClientBuilder.ClientOrDie("pv-protection-controller"), 593 utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection), 594 ).Run(1, ctx.Stop) 595 return nil, true, nil 596} 597 598func startTTLAfterFinishedController(ctx ControllerContext) (http.Handler, bool, error) { 599 if !utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) { 600 return nil, false, nil 601 } 602 go ttlafterfinished.New( 603 ctx.InformerFactory.Batch().V1().Jobs(), 604 ctx.ClientBuilder.ClientOrDie("ttl-after-finished-controller"), 605 ).Run(int(ctx.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs), ctx.Stop) 606 return nil, true, nil 607} 608 609// processCIDRs is a helper function that works on a comma separated cidrs and returns 610// a list of typed cidrs 611// a flag if cidrs represents a dual stack 612// error if failed to parse any of the cidrs 613func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) { 614 cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",") 615 616 cidrs, err := netutils.ParseCIDRs(cidrsSplit) 617 if err != nil { 618 return nil, false, err 619 } 620 621 // if cidrs has an error then the previous call will fail 622 // safe to ignore error checking on next call 623 dualstack, _ := netutils.IsDualStackCIDRs(cidrs) 624 625 return cidrs, dualstack, nil 626} 627 628// setNodeCIDRMaskSizes returns the IPv4 and IPv6 node cidr mask sizes. 629// If --node-cidr-mask-size not set, then it will return default IPv4 and IPv6 cidr mask sizes. 630func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration) (int, int, error) { 631 ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6 632 // NodeCIDRMaskSizeIPv4 and NodeCIDRMaskSizeIPv6 can be used only for dual-stack clusters 633 if cfg.NodeCIDRMaskSizeIPv4 != 0 || cfg.NodeCIDRMaskSizeIPv6 != 0 { 634 return ipv4Mask, ipv6Mask, errors.New("usage of --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 are not allowed with non dual-stack clusters") 635 } 636 if cfg.NodeCIDRMaskSize != 0 { 637 ipv4Mask = int(cfg.NodeCIDRMaskSize) 638 ipv6Mask = int(cfg.NodeCIDRMaskSize) 639 } 640 return ipv4Mask, ipv6Mask, nil 641} 642 643// setNodeCIDRMaskSizesDualStack returns the IPv4 and IPv6 node cidr mask sizes to the value provided 644// for --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 respectively. If value not provided, 645// then it will return default IPv4 and IPv6 cidr mask sizes. 646func setNodeCIDRMaskSizesDualStack(cfg nodeipamconfig.NodeIPAMControllerConfiguration) (int, int, error) { 647 ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6 648 // NodeCIDRMaskSize can be used only for single stack clusters 649 if cfg.NodeCIDRMaskSize != 0 { 650 return ipv4Mask, ipv6Mask, errors.New("usage of --node-cidr-mask-size is not allowed with dual-stack clusters") 651 } 652 if cfg.NodeCIDRMaskSizeIPv4 != 0 { 653 ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4) 654 } 655 if cfg.NodeCIDRMaskSizeIPv6 != 0 { 656 ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6) 657 } 658 return ipv4Mask, ipv6Mask, nil 659} 660 661// getNodeCIDRMaskSizes is a helper function that helps the generate the node cidr mask 662// sizes slice based on the cluster cidr slice 663func getNodeCIDRMaskSizes(clusterCIDRs []*net.IPNet, maskSizeIPv4, maskSizeIPv6 int) []int { 664 nodeMaskCIDRs := make([]int, len(clusterCIDRs)) 665 666 for idx, clusterCIDR := range clusterCIDRs { 667 if netutils.IsIPv6CIDR(clusterCIDR) { 668 nodeMaskCIDRs[idx] = maskSizeIPv6 669 } else { 670 nodeMaskCIDRs[idx] = maskSizeIPv4 671 } 672 } 673 return nodeMaskCIDRs 674} 675 676func startStorageVersionGCController(ctx ControllerContext) (http.Handler, bool, error) { 677 go storageversiongc.NewStorageVersionGC( 678 ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), 679 ctx.InformerFactory.Coordination().V1().Leases(), 680 ctx.InformerFactory.Internal().V1alpha1().StorageVersions(), 681 ).Run(ctx.Stop) 682 return nil, true, nil 683} 684