1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package pluginmanager 18 19import ( 20 "time" 21 22 "k8s.io/apimachinery/pkg/util/runtime" 23 "k8s.io/client-go/tools/record" 24 "k8s.io/klog/v2" 25 "k8s.io/kubernetes/pkg/kubelet/config" 26 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" 27 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/metrics" 28 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor" 29 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher" 30 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler" 31) 32 33// PluginManager runs a set of asynchronous loops that figure out which plugins 34// need to be registered/deregistered and makes it so. 35type PluginManager interface { 36 // Starts the plugin manager and all the asynchronous loops that it controls 37 Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) 38 39 // AddHandler adds the given plugin handler for a specific plugin type, which 40 // will be added to the actual state of world cache so that it can be passed to 41 // the desired state of world cache in order to be used during plugin 42 // registration/deregistration 43 AddHandler(pluginType string, pluginHandler cache.PluginHandler) 44} 45 46const ( 47 // loopSleepDuration is the amount of time the reconciler loop waits 48 // between successive executions 49 loopSleepDuration = 1 * time.Second 50) 51 52// NewPluginManager returns a new concrete instance implementing the 53// PluginManager interface. 54func NewPluginManager( 55 sockDir string, 56 recorder record.EventRecorder) PluginManager { 57 asw := cache.NewActualStateOfWorld() 58 dsw := cache.NewDesiredStateOfWorld() 59 reconciler := reconciler.NewReconciler( 60 operationexecutor.NewOperationExecutor( 61 operationexecutor.NewOperationGenerator( 62 recorder, 63 ), 64 ), 65 loopSleepDuration, 66 dsw, 67 asw, 68 ) 69 70 pm := &pluginManager{ 71 desiredStateOfWorldPopulator: pluginwatcher.NewWatcher( 72 sockDir, 73 dsw, 74 ), 75 reconciler: reconciler, 76 desiredStateOfWorld: dsw, 77 actualStateOfWorld: asw, 78 } 79 return pm 80} 81 82// pluginManager implements the PluginManager interface 83type pluginManager struct { 84 // desiredStateOfWorldPopulator (the plugin watcher) runs an asynchronous 85 // periodic loop to populate the desiredStateOfWorld. 86 desiredStateOfWorldPopulator *pluginwatcher.Watcher 87 88 // reconciler runs an asynchronous periodic loop to reconcile the 89 // desiredStateOfWorld with the actualStateOfWorld by triggering register 90 // and unregister operations using the operationExecutor. 91 reconciler reconciler.Reconciler 92 93 // actualStateOfWorld is a data structure containing the actual state of 94 // the world according to the manager: i.e. which plugins are registered. 95 // The data structure is populated upon successful completion of register 96 // and unregister actions triggered by the reconciler. 97 actualStateOfWorld cache.ActualStateOfWorld 98 99 // desiredStateOfWorld is a data structure containing the desired state of 100 // the world according to the plugin manager: i.e. what plugins are registered. 101 // The data structure is populated by the desired state of the world 102 // populator (plugin watcher). 103 desiredStateOfWorld cache.DesiredStateOfWorld 104} 105 106var _ PluginManager = &pluginManager{} 107 108func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { 109 defer runtime.HandleCrash() 110 111 pm.desiredStateOfWorldPopulator.Start(stopCh) 112 klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts") 113 114 klog.InfoS("Starting Kubelet Plugin Manager") 115 go pm.reconciler.Run(stopCh) 116 117 metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld) 118 <-stopCh 119 klog.InfoS("Shutting down Kubelet Plugin Manager") 120} 121 122func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) { 123 pm.reconciler.AddHandler(pluginType, handler) 124} 125