1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package csi
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"io"
24	"net"
25	"strconv"
26	"sync"
27
28	csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/status"
32	api "k8s.io/api/core/v1"
33	"k8s.io/apimachinery/pkg/api/resource"
34	utilfeature "k8s.io/apiserver/pkg/util/feature"
35	"k8s.io/klog/v2"
36	"k8s.io/kubernetes/pkg/features"
37	"k8s.io/kubernetes/pkg/volume"
38	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
39)
40
41type csiClient interface {
42	NodeGetInfo(ctx context.Context) (
43		nodeID string,
44		maxVolumePerNode int64,
45		accessibleTopology map[string]string,
46		err error)
47
48	// The caller is responsible for checking whether the driver supports
49	// applying FSGroup by calling NodeSupportsVolumeMountGroup().
50	// If the driver does not, fsGroup must be set to nil.
51	NodePublishVolume(
52		ctx context.Context,
53		volumeid string,
54		readOnly bool,
55		stagingTargetPath string,
56		targetPath string,
57		accessMode api.PersistentVolumeAccessMode,
58		publishContext map[string]string,
59		volumeContext map[string]string,
60		secrets map[string]string,
61		fsType string,
62		mountOptions []string,
63		fsGroup *int64,
64	) error
65
66	NodeExpandVolume(ctx context.Context, rsOpts csiResizeOptions) (resource.Quantity, error)
67	NodeUnpublishVolume(
68		ctx context.Context,
69		volID string,
70		targetPath string,
71	) error
72
73	// The caller is responsible for checking whether the driver supports
74	// applying FSGroup by calling NodeSupportsVolumeMountGroup().
75	// If the driver does not, fsGroup must be set to nil.
76	NodeStageVolume(ctx context.Context,
77		volID string,
78		publishVolumeInfo map[string]string,
79		stagingTargetPath string,
80		fsType string,
81		accessMode api.PersistentVolumeAccessMode,
82		secrets map[string]string,
83		volumeContext map[string]string,
84		mountOptions []string,
85		fsGroup *int64,
86	) error
87
88	NodeGetVolumeStats(
89		ctx context.Context,
90		volID string,
91		targetPath string,
92	) (*volume.Metrics, error)
93	NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
94	NodeSupportsStageUnstage(ctx context.Context) (bool, error)
95	NodeSupportsNodeExpand(ctx context.Context) (bool, error)
96	NodeSupportsVolumeStats(ctx context.Context) (bool, error)
97	NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error)
98	NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error)
99}
100
101// Strongly typed address
102type csiAddr string
103
104// Strongly typed driver name
105type csiDriverName string
106
107// csiClient encapsulates all csi-plugin methods
108type csiDriverClient struct {
109	driverName          csiDriverName
110	addr                csiAddr
111	metricsManager      *MetricsManager
112	nodeV1ClientCreator nodeV1ClientCreator
113}
114
115type csiResizeOptions struct {
116	volumeID string
117	// volumePath is path where volume is available. It could be:
118	//   - path where node is staged if NodeExpandVolume is called after NodeStageVolume
119	//   - path where volume is published if NodeExpandVolume is called after NodePublishVolume
120	// DEPRECATION NOTICE: in future NodeExpandVolume will be always called after NodePublish
121	volumePath        string
122	stagingTargetPath string
123	fsType            string
124	accessMode        api.PersistentVolumeAccessMode
125	newSize           resource.Quantity
126	mountOptions      []string
127}
128
129var _ csiClient = &csiDriverClient{}
130
131type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) (
132	nodeClient csipbv1.NodeClient,
133	closer io.Closer,
134	err error,
135)
136
137type nodeV1AccessModeMapper func(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode
138
139// newV1NodeClient creates a new NodeClient with the internally used gRPC
140// connection set up. It also returns a closer which must to be called to close
141// the gRPC connection when the NodeClient is not used anymore.
142// This is the default implementation for the nodeV1ClientCreator, used in
143// newCsiDriverClient.
144func newV1NodeClient(addr csiAddr, metricsManager *MetricsManager) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
145	var conn *grpc.ClientConn
146	conn, err = newGrpcConn(addr, metricsManager)
147	if err != nil {
148		return nil, nil, err
149	}
150
151	nodeClient = csipbv1.NewNodeClient(conn)
152	return nodeClient, conn, nil
153}
154
155func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
156	if driverName == "" {
157		return nil, fmt.Errorf("driver name is empty")
158	}
159
160	existingDriver, driverExists := csiDrivers.Get(string(driverName))
161	if !driverExists {
162		return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
163	}
164
165	nodeV1ClientCreator := newV1NodeClient
166	return &csiDriverClient{
167		driverName:          driverName,
168		addr:                csiAddr(existingDriver.endpoint),
169		nodeV1ClientCreator: nodeV1ClientCreator,
170		metricsManager:      NewCSIMetricsManager(string(driverName)),
171	}, nil
172}
173
174func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
175	nodeID string,
176	maxVolumePerNode int64,
177	accessibleTopology map[string]string,
178	err error) {
179	klog.V(4).Info(log("calling NodeGetInfo rpc"))
180
181	var getNodeInfoError error
182	nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
183	if getNodeInfoError != nil {
184		klog.Warningf("Error calling CSI NodeGetInfo(): %v", getNodeInfoError.Error())
185	}
186	return nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError
187}
188
189func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
190	nodeID string,
191	maxVolumePerNode int64,
192	accessibleTopology map[string]string,
193	err error) {
194
195	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
196	if err != nil {
197		return "", 0, nil, err
198	}
199	defer closer.Close()
200
201	res, err := nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
202	if err != nil {
203		return "", 0, nil, err
204	}
205
206	topology := res.GetAccessibleTopology()
207	if topology != nil {
208		accessibleTopology = topology.Segments
209	}
210	return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
211}
212
213func (c *csiDriverClient) NodePublishVolume(
214	ctx context.Context,
215	volID string,
216	readOnly bool,
217	stagingTargetPath string,
218	targetPath string,
219	accessMode api.PersistentVolumeAccessMode,
220	publishContext map[string]string,
221	volumeContext map[string]string,
222	secrets map[string]string,
223	fsType string,
224	mountOptions []string,
225	fsGroup *int64,
226) error {
227	klog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath))
228	if volID == "" {
229		return errors.New("missing volume id")
230	}
231	if targetPath == "" {
232		return errors.New("missing target path")
233	}
234
235	if c.nodeV1ClientCreator == nil {
236		return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
237	}
238
239	accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
240	if err != nil {
241		return err
242	}
243
244	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
245	if err != nil {
246		return err
247	}
248	defer closer.Close()
249
250	req := &csipbv1.NodePublishVolumeRequest{
251		VolumeId:       volID,
252		TargetPath:     targetPath,
253		Readonly:       readOnly,
254		PublishContext: publishContext,
255		VolumeContext:  volumeContext,
256		Secrets:        secrets,
257		VolumeCapability: &csipbv1.VolumeCapability{
258			AccessMode: &csipbv1.VolumeCapability_AccessMode{
259				Mode: accessModeMapper(accessMode),
260			},
261		},
262	}
263	if stagingTargetPath != "" {
264		req.StagingTargetPath = stagingTargetPath
265	}
266
267	if fsType == fsTypeBlockName {
268		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
269			Block: &csipbv1.VolumeCapability_BlockVolume{},
270		}
271	} else {
272		mountVolume := &csipbv1.VolumeCapability_MountVolume{
273			FsType:     fsType,
274			MountFlags: mountOptions,
275		}
276		if fsGroup != nil {
277			mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */)
278		}
279		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
280			Mount: mountVolume,
281		}
282	}
283
284	_, err = nodeClient.NodePublishVolume(ctx, req)
285	if err != nil && !isFinalError(err) {
286		return volumetypes.NewUncertainProgressError(err.Error())
287	}
288	return err
289}
290
291func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
292	if c.nodeV1ClientCreator == nil {
293		return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
294	}
295
296	if opts.volumeID == "" {
297		return opts.newSize, errors.New("missing volume id")
298	}
299	if opts.volumePath == "" {
300		return opts.newSize, errors.New("missing volume path")
301	}
302
303	if opts.newSize.Value() < 0 {
304		return opts.newSize, errors.New("size can not be less than 0")
305	}
306
307	accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
308	if err != nil {
309		return opts.newSize, err
310	}
311
312	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
313	if err != nil {
314		return opts.newSize, err
315	}
316	defer closer.Close()
317
318	req := &csipbv1.NodeExpandVolumeRequest{
319		VolumeId:      opts.volumeID,
320		VolumePath:    opts.volumePath,
321		CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
322		VolumeCapability: &csipbv1.VolumeCapability{
323			AccessMode: &csipbv1.VolumeCapability_AccessMode{
324				Mode: accessModeMapper(opts.accessMode),
325			},
326		},
327	}
328
329	// not all CSI drivers support NodeStageUnstage and hence the StagingTargetPath
330	// should only be set when available
331	if opts.stagingTargetPath != "" {
332		req.StagingTargetPath = opts.stagingTargetPath
333	}
334
335	if opts.fsType == fsTypeBlockName {
336		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
337			Block: &csipbv1.VolumeCapability_BlockVolume{},
338		}
339	} else {
340		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
341			Mount: &csipbv1.VolumeCapability_MountVolume{
342				FsType:     opts.fsType,
343				MountFlags: opts.mountOptions,
344			},
345		}
346	}
347
348	resp, err := nodeClient.NodeExpandVolume(ctx, req)
349	if err != nil {
350		return opts.newSize, err
351	}
352	updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
353	return *updatedQuantity, nil
354}
355
356func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
357	klog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath))
358	if volID == "" {
359		return errors.New("missing volume id")
360	}
361	if targetPath == "" {
362		return errors.New("missing target path")
363	}
364	if c.nodeV1ClientCreator == nil {
365		return errors.New("nodeV1ClientCreate is nil")
366	}
367
368	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
369	if err != nil {
370		return err
371	}
372	defer closer.Close()
373
374	req := &csipbv1.NodeUnpublishVolumeRequest{
375		VolumeId:   volID,
376		TargetPath: targetPath,
377	}
378
379	_, err = nodeClient.NodeUnpublishVolume(ctx, req)
380	return err
381}
382
383func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
384	volID string,
385	publishContext map[string]string,
386	stagingTargetPath string,
387	fsType string,
388	accessMode api.PersistentVolumeAccessMode,
389	secrets map[string]string,
390	volumeContext map[string]string,
391	mountOptions []string,
392	fsGroup *int64,
393) error {
394	klog.V(4).Info(log("calling NodeStageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath))
395	if volID == "" {
396		return errors.New("missing volume id")
397	}
398	if stagingTargetPath == "" {
399		return errors.New("missing staging target path")
400	}
401	if c.nodeV1ClientCreator == nil {
402		return errors.New("nodeV1ClientCreate is nil")
403	}
404
405	accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
406	if err != nil {
407		return err
408	}
409
410	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
411	if err != nil {
412		return err
413	}
414	defer closer.Close()
415
416	req := &csipbv1.NodeStageVolumeRequest{
417		VolumeId:          volID,
418		PublishContext:    publishContext,
419		StagingTargetPath: stagingTargetPath,
420		VolumeCapability: &csipbv1.VolumeCapability{
421			AccessMode: &csipbv1.VolumeCapability_AccessMode{
422				Mode: accessModeMapper(accessMode),
423			},
424		},
425		Secrets:       secrets,
426		VolumeContext: volumeContext,
427	}
428
429	if fsType == fsTypeBlockName {
430		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
431			Block: &csipbv1.VolumeCapability_BlockVolume{},
432		}
433	} else {
434		mountVolume := &csipbv1.VolumeCapability_MountVolume{
435			FsType:     fsType,
436			MountFlags: mountOptions,
437		}
438		if fsGroup != nil {
439			mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */)
440		}
441		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
442			Mount: mountVolume,
443		}
444	}
445
446	_, err = nodeClient.NodeStageVolume(ctx, req)
447	if err != nil && !isFinalError(err) {
448		return volumetypes.NewUncertainProgressError(err.Error())
449	}
450	return err
451}
452
453func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
454	klog.V(4).Info(log("calling NodeUnstageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath))
455	if volID == "" {
456		return errors.New("missing volume id")
457	}
458	if stagingTargetPath == "" {
459		return errors.New("missing staging target path")
460	}
461	if c.nodeV1ClientCreator == nil {
462		return errors.New("nodeV1ClientCreate is nil")
463	}
464
465	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
466	if err != nil {
467		return err
468	}
469	defer closer.Close()
470
471	req := &csipbv1.NodeUnstageVolumeRequest{
472		VolumeId:          volID,
473		StagingTargetPath: stagingTargetPath,
474	}
475	_, err = nodeClient.NodeUnstageVolume(ctx, req)
476	return err
477}
478
479func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
480	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME)
481}
482
483func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
484	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
485}
486
487func (c *csiDriverClient) getNodeV1AccessModeMapper(ctx context.Context) (nodeV1AccessModeMapper, error) {
488	supported, err := c.NodeSupportsSingleNodeMultiWriterAccessMode(ctx)
489	if err != nil {
490		return nil, err
491	}
492	if supported {
493		return asSingleNodeMultiWriterCapableCSIAccessModeV1, nil
494	}
495	return asCSIAccessModeV1, nil
496}
497
498func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
499	switch am {
500	case api.ReadWriteOnce:
501		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
502	case api.ReadOnlyMany:
503		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
504	case api.ReadWriteMany:
505		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
506	// This mapping exists to enable CSI drivers that lack the
507	// SINGLE_NODE_MULTI_WRITER capability to work with the
508	// ReadWriteOncePod access mode.
509	case api.ReadWriteOncePod:
510		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
511	}
512	return csipbv1.VolumeCapability_AccessMode_UNKNOWN
513}
514
515func asSingleNodeMultiWriterCapableCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
516	switch am {
517	case api.ReadWriteOnce:
518		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER
519	case api.ReadOnlyMany:
520		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
521	case api.ReadWriteMany:
522		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
523	case api.ReadWriteOncePod:
524		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER
525	}
526	return csipbv1.VolumeCapability_AccessMode_UNKNOWN
527}
528
529func newGrpcConn(addr csiAddr, metricsManager *MetricsManager) (*grpc.ClientConn, error) {
530	network := "unix"
531	klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))
532
533	return grpc.Dial(
534		string(addr),
535		grpc.WithInsecure(),
536		grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
537			return (&net.Dialer{}).DialContext(ctx, network, target)
538		}),
539		grpc.WithChainUnaryInterceptor(metricsManager.RecordMetricsInterceptor),
540	)
541}
542
543// CSI client getter with cache.
544// This provides a method to initialize CSI client with driver name and caches
545// it for later use. When CSI clients have not been discovered yet (e.g.
546// on kubelet restart), client initialization will fail. Users of CSI client (e.g.
547// mounter manager and block mapper) can use this to delay CSI client
548// initialization until needed.
549type csiClientGetter struct {
550	sync.RWMutex
551	csiClient  csiClient
552	driverName csiDriverName
553}
554
555func (c *csiClientGetter) Get() (csiClient, error) {
556	c.RLock()
557	if c.csiClient != nil {
558		c.RUnlock()
559		return c.csiClient, nil
560	}
561	c.RUnlock()
562	c.Lock()
563	defer c.Unlock()
564	// Double-checking locking criterion.
565	if c.csiClient != nil {
566		return c.csiClient, nil
567	}
568	csi, err := newCsiDriverClient(c.driverName)
569	if err != nil {
570		return nil, err
571	}
572	c.csiClient = csi
573	return c.csiClient, nil
574}
575
576func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
577	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
578}
579
580func (c *csiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
581	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
582}
583
584func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
585	klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath))
586	if volID == "" {
587		return nil, errors.New("missing volume id")
588	}
589	if targetPath == "" {
590		return nil, errors.New("missing target path")
591	}
592	if c.nodeV1ClientCreator == nil {
593		return nil, errors.New("nodeV1ClientCreate is nil")
594	}
595
596	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
597	if err != nil {
598		return nil, err
599	}
600	defer closer.Close()
601
602	req := &csipbv1.NodeGetVolumeStatsRequest{
603		VolumeId:   volID,
604		VolumePath: targetPath,
605	}
606
607	resp, err := nodeClient.NodeGetVolumeStats(ctx, req)
608	if err != nil {
609		return nil, err
610	}
611	usages := resp.GetUsage()
612	if usages == nil {
613		return nil, fmt.Errorf("failed to get usage from response. usage is nil")
614	}
615	metrics := &volume.Metrics{
616		Used:       resource.NewQuantity(int64(0), resource.BinarySI),
617		Capacity:   resource.NewQuantity(int64(0), resource.BinarySI),
618		Available:  resource.NewQuantity(int64(0), resource.BinarySI),
619		InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI),
620		Inodes:     resource.NewQuantity(int64(0), resource.BinarySI),
621		InodesFree: resource.NewQuantity(int64(0), resource.BinarySI),
622	}
623
624	if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
625		isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx)
626		if err != nil {
627			return nil, err
628		}
629
630		if isSupportNodeVolumeCondition {
631			abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage()
632			metrics.Abnormal, metrics.Message = &abnormal, &message
633		}
634	}
635
636	for _, usage := range usages {
637		if usage == nil {
638			continue
639		}
640		unit := usage.GetUnit()
641		switch unit {
642		case csipbv1.VolumeUsage_BYTES:
643			metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
644			metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
645			metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
646		case csipbv1.VolumeUsage_INODES:
647			metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
648			metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
649			metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
650		default:
651			klog.Errorf("unknown key %s in usage", unit.String())
652		}
653
654	}
655	return metrics, nil
656}
657
658func (c *csiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) {
659	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
660}
661
662func (c *csiDriverClient) NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error) {
663	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP)
664}
665
666func (c *csiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
667	klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if the node service has %s capability", capabilityType))
668	capabilities, err := c.nodeGetCapabilities(ctx)
669	if err != nil {
670		return false, err
671	}
672
673	for _, capability := range capabilities {
674		if capability == nil || capability.GetRpc() == nil {
675			continue
676		}
677		if capability.GetRpc().GetType() == capabilityType {
678			return true, nil
679		}
680	}
681	return false, nil
682}
683
684func (c *csiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) {
685	if c.nodeV1ClientCreator == nil {
686		return []*csipbv1.NodeServiceCapability{}, errors.New("nodeV1ClientCreate is nil")
687	}
688
689	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
690	if err != nil {
691		return []*csipbv1.NodeServiceCapability{}, err
692	}
693	defer closer.Close()
694
695	req := &csipbv1.NodeGetCapabilitiesRequest{}
696	resp, err := nodeClient.NodeGetCapabilities(ctx, req)
697	if err != nil {
698		return []*csipbv1.NodeServiceCapability{}, err
699	}
700	return resp.GetCapabilities(), nil
701}
702
703func isFinalError(err error) bool {
704	// Sources:
705	// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
706	// https://github.com/container-storage-interface/spec/blob/master/spec.md
707	st, ok := status.FromError(err)
708	if !ok {
709		// This is not gRPC error. The operation must have failed before gRPC
710		// method was called, otherwise we would get gRPC error.
711		// We don't know if any previous volume operation is in progress, be on the safe side.
712		return false
713	}
714	switch st.Code() {
715	case codes.Canceled, // gRPC: Client Application cancelled the request
716		codes.DeadlineExceeded,  // gRPC: Timeout
717		codes.Unavailable,       // gRPC: Server shutting down, TCP connection broken - previous volume operation may be still in progress.
718		codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous volume operation may be still in progress.
719		codes.Aborted:           // CSI: Operation pending for volume
720		return false
721	}
722	// All other errors mean that operation either did not
723	// even start or failed. It is for sure not in progress.
724	return true
725}
726