1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package pluginmanager
18
19import (
20	"time"
21
22	"k8s.io/apimachinery/pkg/util/runtime"
23	"k8s.io/client-go/tools/record"
24	"k8s.io/klog/v2"
25	"k8s.io/kubernetes/pkg/kubelet/config"
26	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
27	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/metrics"
28	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
29	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
30	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler"
31)
32
33// PluginManager runs a set of asynchronous loops that figure out which plugins
34// need to be registered/deregistered and makes it so.
35type PluginManager interface {
36	// Starts the plugin manager and all the asynchronous loops that it controls
37	Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
38
39	// AddHandler adds the given plugin handler for a specific plugin type, which
40	// will be added to the actual state of world cache so that it can be passed to
41	// the desired state of world cache in order to be used during plugin
42	// registration/deregistration
43	AddHandler(pluginType string, pluginHandler cache.PluginHandler)
44}
45
46const (
47	// loopSleepDuration is the amount of time the reconciler loop waits
48	// between successive executions
49	loopSleepDuration = 1 * time.Second
50)
51
52// NewPluginManager returns a new concrete instance implementing the
53// PluginManager interface.
54func NewPluginManager(
55	sockDir string,
56	recorder record.EventRecorder) PluginManager {
57	asw := cache.NewActualStateOfWorld()
58	dsw := cache.NewDesiredStateOfWorld()
59	reconciler := reconciler.NewReconciler(
60		operationexecutor.NewOperationExecutor(
61			operationexecutor.NewOperationGenerator(
62				recorder,
63			),
64		),
65		loopSleepDuration,
66		dsw,
67		asw,
68	)
69
70	pm := &pluginManager{
71		desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
72			sockDir,
73			dsw,
74		),
75		reconciler:          reconciler,
76		desiredStateOfWorld: dsw,
77		actualStateOfWorld:  asw,
78	}
79	return pm
80}
81
82// pluginManager implements the PluginManager interface
83type pluginManager struct {
84	// desiredStateOfWorldPopulator (the plugin watcher) runs an asynchronous
85	// periodic loop to populate the desiredStateOfWorld.
86	desiredStateOfWorldPopulator *pluginwatcher.Watcher
87
88	// reconciler runs an asynchronous periodic loop to reconcile the
89	// desiredStateOfWorld with the actualStateOfWorld by triggering register
90	// and unregister operations using the operationExecutor.
91	reconciler reconciler.Reconciler
92
93	// actualStateOfWorld is a data structure containing the actual state of
94	// the world according to the manager: i.e. which plugins are registered.
95	// The data structure is populated upon successful completion of register
96	// and unregister actions triggered by the reconciler.
97	actualStateOfWorld cache.ActualStateOfWorld
98
99	// desiredStateOfWorld is a data structure containing the desired state of
100	// the world according to the plugin manager: i.e. what plugins are registered.
101	// The data structure is populated by the desired state of the world
102	// populator (plugin watcher).
103	desiredStateOfWorld cache.DesiredStateOfWorld
104}
105
106var _ PluginManager = &pluginManager{}
107
108func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
109	defer runtime.HandleCrash()
110
111	pm.desiredStateOfWorldPopulator.Start(stopCh)
112	klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")
113
114	klog.InfoS("Starting Kubelet Plugin Manager")
115	go pm.reconciler.Run(stopCh)
116
117	metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)
118	<-stopCh
119	klog.InfoS("Shutting down Kubelet Plugin Manager")
120}
121
122func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {
123	pm.reconciler.AddHandler(pluginType, handler)
124}
125