1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17// Package operationexecutor implements interfaces that enable execution of
18// register and unregister operations with a
19// goroutinemap so that more than one operation is never triggered
20// on the same plugin.
21package operationexecutor
22
23import (
24	"context"
25	"fmt"
26	"net"
27	"time"
28
29	"k8s.io/klog/v2"
30
31	"github.com/pkg/errors"
32	"google.golang.org/grpc"
33	"k8s.io/client-go/tools/record"
34	registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
35	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
36)
37
38const (
39	dialTimeoutDuration   = 10 * time.Second
40	notifyTimeoutDuration = 5 * time.Second
41)
42
43var _ OperationGenerator = &operationGenerator{}
44
45type operationGenerator struct {
46
47	// recorder is used to record events in the API server
48	recorder record.EventRecorder
49}
50
51// NewOperationGenerator is returns instance of operationGenerator
52func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {
53
54	return &operationGenerator{
55		recorder: recorder,
56	}
57}
58
59// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
60type OperationGenerator interface {
61	// Generates the RegisterPlugin function needed to perform the registration of a plugin
62	GenerateRegisterPluginFunc(
63		socketPath string,
64		timestamp time.Time,
65		pluginHandlers map[string]cache.PluginHandler,
66		actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
67
68	// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
69	GenerateUnregisterPluginFunc(
70		pluginInfo cache.PluginInfo,
71		actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
72}
73
74func (og *operationGenerator) GenerateRegisterPluginFunc(
75	socketPath string,
76	timestamp time.Time,
77	pluginHandlers map[string]cache.PluginHandler,
78	actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
79
80	registerPluginFunc := func() error {
81		client, conn, err := dial(socketPath, dialTimeoutDuration)
82		if err != nil {
83			return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
84		}
85		defer conn.Close()
86
87		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
88		defer cancel()
89
90		infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
91		if err != nil {
92			return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
93		}
94
95		handler, ok := pluginHandlers[infoResp.Type]
96		if !ok {
97			if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
98				return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
99			}
100			return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
101		}
102
103		if infoResp.Endpoint == "" {
104			infoResp.Endpoint = socketPath
105		}
106		if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
107			if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
108				return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
109			}
110			return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
111		}
112		// We add the plugin to the actual state of world cache before calling a plugin consumer's Register handle
113		// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
114		err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
115			SocketPath: socketPath,
116			Timestamp:  timestamp,
117			Handler:    handler,
118			Name:       infoResp.Name,
119		})
120		if err != nil {
121			klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
122		}
123		if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
124			return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
125		}
126
127		// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
128		if err := og.notifyPlugin(client, true, ""); err != nil {
129			return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
130		}
131		return nil
132	}
133	return registerPluginFunc
134}
135
136func (og *operationGenerator) GenerateUnregisterPluginFunc(
137	pluginInfo cache.PluginInfo,
138	actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
139
140	unregisterPluginFunc := func() error {
141		if pluginInfo.Handler == nil {
142			return fmt.Errorf("UnregisterPlugin error -- failed to get plugin handler for %s", pluginInfo.SocketPath)
143		}
144		// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
145		// so that if we receive a register event during Register Plugin, we can process it as a Register call.
146		actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
147
148		pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
149
150		klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
151		return nil
152	}
153	return unregisterPluginFunc
154}
155
156func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error {
157	ctx, cancel := context.WithTimeout(context.Background(), notifyTimeoutDuration)
158	defer cancel()
159
160	status := &registerapi.RegistrationStatus{
161		PluginRegistered: registered,
162		Error:            errStr,
163	}
164
165	if _, err := client.NotifyRegistrationStatus(ctx, status); err != nil {
166		return errors.Wrap(err, errStr)
167	}
168
169	if errStr != "" {
170		return errors.New(errStr)
171	}
172
173	return nil
174}
175
176// Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
177func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
178	ctx, cancel := context.WithTimeout(context.Background(), timeout)
179	defer cancel()
180
181	c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
182		grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
183			return (&net.Dialer{}).DialContext(ctx, "unix", addr)
184		}),
185	)
186
187	if err != nil {
188		return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
189	}
190
191	return registerapi.NewRegistrationClient(c), c, nil
192}
193