1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package csi 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "io" 24 "net" 25 "strconv" 26 "sync" 27 28 csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/status" 32 api "k8s.io/api/core/v1" 33 "k8s.io/apimachinery/pkg/api/resource" 34 utilfeature "k8s.io/apiserver/pkg/util/feature" 35 "k8s.io/klog/v2" 36 "k8s.io/kubernetes/pkg/features" 37 "k8s.io/kubernetes/pkg/volume" 38 volumetypes "k8s.io/kubernetes/pkg/volume/util/types" 39) 40 41type csiClient interface { 42 NodeGetInfo(ctx context.Context) ( 43 nodeID string, 44 maxVolumePerNode int64, 45 accessibleTopology map[string]string, 46 err error) 47 48 // The caller is responsible for checking whether the driver supports 49 // applying FSGroup by calling NodeSupportsVolumeMountGroup(). 50 // If the driver does not, fsGroup must be set to nil. 51 NodePublishVolume( 52 ctx context.Context, 53 volumeid string, 54 readOnly bool, 55 stagingTargetPath string, 56 targetPath string, 57 accessMode api.PersistentVolumeAccessMode, 58 publishContext map[string]string, 59 volumeContext map[string]string, 60 secrets map[string]string, 61 fsType string, 62 mountOptions []string, 63 fsGroup *int64, 64 ) error 65 66 NodeExpandVolume(ctx context.Context, rsOpts csiResizeOptions) (resource.Quantity, error) 67 NodeUnpublishVolume( 68 ctx context.Context, 69 volID string, 70 targetPath string, 71 ) error 72 73 // The caller is responsible for checking whether the driver supports 74 // applying FSGroup by calling NodeSupportsVolumeMountGroup(). 75 // If the driver does not, fsGroup must be set to nil. 76 NodeStageVolume(ctx context.Context, 77 volID string, 78 publishVolumeInfo map[string]string, 79 stagingTargetPath string, 80 fsType string, 81 accessMode api.PersistentVolumeAccessMode, 82 secrets map[string]string, 83 volumeContext map[string]string, 84 mountOptions []string, 85 fsGroup *int64, 86 ) error 87 88 NodeGetVolumeStats( 89 ctx context.Context, 90 volID string, 91 targetPath string, 92 ) (*volume.Metrics, error) 93 NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error 94 NodeSupportsStageUnstage(ctx context.Context) (bool, error) 95 NodeSupportsNodeExpand(ctx context.Context) (bool, error) 96 NodeSupportsVolumeStats(ctx context.Context) (bool, error) 97 NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) 98 NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error) 99} 100 101// Strongly typed address 102type csiAddr string 103 104// Strongly typed driver name 105type csiDriverName string 106 107// csiClient encapsulates all csi-plugin methods 108type csiDriverClient struct { 109 driverName csiDriverName 110 addr csiAddr 111 metricsManager *MetricsManager 112 nodeV1ClientCreator nodeV1ClientCreator 113} 114 115type csiResizeOptions struct { 116 volumeID string 117 // volumePath is path where volume is available. It could be: 118 // - path where node is staged if NodeExpandVolume is called after NodeStageVolume 119 // - path where volume is published if NodeExpandVolume is called after NodePublishVolume 120 // DEPRECATION NOTICE: in future NodeExpandVolume will be always called after NodePublish 121 volumePath string 122 stagingTargetPath string 123 fsType string 124 accessMode api.PersistentVolumeAccessMode 125 newSize resource.Quantity 126 mountOptions []string 127} 128 129var _ csiClient = &csiDriverClient{} 130 131type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) ( 132 nodeClient csipbv1.NodeClient, 133 closer io.Closer, 134 err error, 135) 136 137type nodeV1AccessModeMapper func(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode 138 139// newV1NodeClient creates a new NodeClient with the internally used gRPC 140// connection set up. It also returns a closer which must to be called to close 141// the gRPC connection when the NodeClient is not used anymore. 142// This is the default implementation for the nodeV1ClientCreator, used in 143// newCsiDriverClient. 144func newV1NodeClient(addr csiAddr, metricsManager *MetricsManager) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) { 145 var conn *grpc.ClientConn 146 conn, err = newGrpcConn(addr, metricsManager) 147 if err != nil { 148 return nil, nil, err 149 } 150 151 nodeClient = csipbv1.NewNodeClient(conn) 152 return nodeClient, conn, nil 153} 154 155func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) { 156 if driverName == "" { 157 return nil, fmt.Errorf("driver name is empty") 158 } 159 160 existingDriver, driverExists := csiDrivers.Get(string(driverName)) 161 if !driverExists { 162 return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName) 163 } 164 165 nodeV1ClientCreator := newV1NodeClient 166 return &csiDriverClient{ 167 driverName: driverName, 168 addr: csiAddr(existingDriver.endpoint), 169 nodeV1ClientCreator: nodeV1ClientCreator, 170 metricsManager: NewCSIMetricsManager(string(driverName)), 171 }, nil 172} 173 174func (c *csiDriverClient) NodeGetInfo(ctx context.Context) ( 175 nodeID string, 176 maxVolumePerNode int64, 177 accessibleTopology map[string]string, 178 err error) { 179 klog.V(4).Info(log("calling NodeGetInfo rpc")) 180 181 var getNodeInfoError error 182 nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx) 183 if getNodeInfoError != nil { 184 klog.Warningf("Error calling CSI NodeGetInfo(): %v", getNodeInfoError.Error()) 185 } 186 return nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError 187} 188 189func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) ( 190 nodeID string, 191 maxVolumePerNode int64, 192 accessibleTopology map[string]string, 193 err error) { 194 195 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 196 if err != nil { 197 return "", 0, nil, err 198 } 199 defer closer.Close() 200 201 res, err := nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{}) 202 if err != nil { 203 return "", 0, nil, err 204 } 205 206 topology := res.GetAccessibleTopology() 207 if topology != nil { 208 accessibleTopology = topology.Segments 209 } 210 return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil 211} 212 213func (c *csiDriverClient) NodePublishVolume( 214 ctx context.Context, 215 volID string, 216 readOnly bool, 217 stagingTargetPath string, 218 targetPath string, 219 accessMode api.PersistentVolumeAccessMode, 220 publishContext map[string]string, 221 volumeContext map[string]string, 222 secrets map[string]string, 223 fsType string, 224 mountOptions []string, 225 fsGroup *int64, 226) error { 227 klog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath)) 228 if volID == "" { 229 return errors.New("missing volume id") 230 } 231 if targetPath == "" { 232 return errors.New("missing target path") 233 } 234 235 if c.nodeV1ClientCreator == nil { 236 return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil") 237 } 238 239 accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx) 240 if err != nil { 241 return err 242 } 243 244 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 245 if err != nil { 246 return err 247 } 248 defer closer.Close() 249 250 req := &csipbv1.NodePublishVolumeRequest{ 251 VolumeId: volID, 252 TargetPath: targetPath, 253 Readonly: readOnly, 254 PublishContext: publishContext, 255 VolumeContext: volumeContext, 256 Secrets: secrets, 257 VolumeCapability: &csipbv1.VolumeCapability{ 258 AccessMode: &csipbv1.VolumeCapability_AccessMode{ 259 Mode: accessModeMapper(accessMode), 260 }, 261 }, 262 } 263 if stagingTargetPath != "" { 264 req.StagingTargetPath = stagingTargetPath 265 } 266 267 if fsType == fsTypeBlockName { 268 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ 269 Block: &csipbv1.VolumeCapability_BlockVolume{}, 270 } 271 } else { 272 mountVolume := &csipbv1.VolumeCapability_MountVolume{ 273 FsType: fsType, 274 MountFlags: mountOptions, 275 } 276 if fsGroup != nil { 277 mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */) 278 } 279 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ 280 Mount: mountVolume, 281 } 282 } 283 284 _, err = nodeClient.NodePublishVolume(ctx, req) 285 if err != nil && !isFinalError(err) { 286 return volumetypes.NewUncertainProgressError(err.Error()) 287 } 288 return err 289} 290 291func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) { 292 if c.nodeV1ClientCreator == nil { 293 return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion") 294 } 295 296 if opts.volumeID == "" { 297 return opts.newSize, errors.New("missing volume id") 298 } 299 if opts.volumePath == "" { 300 return opts.newSize, errors.New("missing volume path") 301 } 302 303 if opts.newSize.Value() < 0 { 304 return opts.newSize, errors.New("size can not be less than 0") 305 } 306 307 accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx) 308 if err != nil { 309 return opts.newSize, err 310 } 311 312 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 313 if err != nil { 314 return opts.newSize, err 315 } 316 defer closer.Close() 317 318 req := &csipbv1.NodeExpandVolumeRequest{ 319 VolumeId: opts.volumeID, 320 VolumePath: opts.volumePath, 321 CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()}, 322 VolumeCapability: &csipbv1.VolumeCapability{ 323 AccessMode: &csipbv1.VolumeCapability_AccessMode{ 324 Mode: accessModeMapper(opts.accessMode), 325 }, 326 }, 327 } 328 329 // not all CSI drivers support NodeStageUnstage and hence the StagingTargetPath 330 // should only be set when available 331 if opts.stagingTargetPath != "" { 332 req.StagingTargetPath = opts.stagingTargetPath 333 } 334 335 if opts.fsType == fsTypeBlockName { 336 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ 337 Block: &csipbv1.VolumeCapability_BlockVolume{}, 338 } 339 } else { 340 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ 341 Mount: &csipbv1.VolumeCapability_MountVolume{ 342 FsType: opts.fsType, 343 MountFlags: opts.mountOptions, 344 }, 345 } 346 } 347 348 resp, err := nodeClient.NodeExpandVolume(ctx, req) 349 if err != nil { 350 return opts.newSize, err 351 } 352 updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI) 353 return *updatedQuantity, nil 354} 355 356func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error { 357 klog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath)) 358 if volID == "" { 359 return errors.New("missing volume id") 360 } 361 if targetPath == "" { 362 return errors.New("missing target path") 363 } 364 if c.nodeV1ClientCreator == nil { 365 return errors.New("nodeV1ClientCreate is nil") 366 } 367 368 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 369 if err != nil { 370 return err 371 } 372 defer closer.Close() 373 374 req := &csipbv1.NodeUnpublishVolumeRequest{ 375 VolumeId: volID, 376 TargetPath: targetPath, 377 } 378 379 _, err = nodeClient.NodeUnpublishVolume(ctx, req) 380 return err 381} 382 383func (c *csiDriverClient) NodeStageVolume(ctx context.Context, 384 volID string, 385 publishContext map[string]string, 386 stagingTargetPath string, 387 fsType string, 388 accessMode api.PersistentVolumeAccessMode, 389 secrets map[string]string, 390 volumeContext map[string]string, 391 mountOptions []string, 392 fsGroup *int64, 393) error { 394 klog.V(4).Info(log("calling NodeStageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath)) 395 if volID == "" { 396 return errors.New("missing volume id") 397 } 398 if stagingTargetPath == "" { 399 return errors.New("missing staging target path") 400 } 401 if c.nodeV1ClientCreator == nil { 402 return errors.New("nodeV1ClientCreate is nil") 403 } 404 405 accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx) 406 if err != nil { 407 return err 408 } 409 410 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 411 if err != nil { 412 return err 413 } 414 defer closer.Close() 415 416 req := &csipbv1.NodeStageVolumeRequest{ 417 VolumeId: volID, 418 PublishContext: publishContext, 419 StagingTargetPath: stagingTargetPath, 420 VolumeCapability: &csipbv1.VolumeCapability{ 421 AccessMode: &csipbv1.VolumeCapability_AccessMode{ 422 Mode: accessModeMapper(accessMode), 423 }, 424 }, 425 Secrets: secrets, 426 VolumeContext: volumeContext, 427 } 428 429 if fsType == fsTypeBlockName { 430 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{ 431 Block: &csipbv1.VolumeCapability_BlockVolume{}, 432 } 433 } else { 434 mountVolume := &csipbv1.VolumeCapability_MountVolume{ 435 FsType: fsType, 436 MountFlags: mountOptions, 437 } 438 if fsGroup != nil { 439 mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */) 440 } 441 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{ 442 Mount: mountVolume, 443 } 444 } 445 446 _, err = nodeClient.NodeStageVolume(ctx, req) 447 if err != nil && !isFinalError(err) { 448 return volumetypes.NewUncertainProgressError(err.Error()) 449 } 450 return err 451} 452 453func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error { 454 klog.V(4).Info(log("calling NodeUnstageVolume rpc [volid=%s,staging_target_path=%s]", volID, stagingTargetPath)) 455 if volID == "" { 456 return errors.New("missing volume id") 457 } 458 if stagingTargetPath == "" { 459 return errors.New("missing staging target path") 460 } 461 if c.nodeV1ClientCreator == nil { 462 return errors.New("nodeV1ClientCreate is nil") 463 } 464 465 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 466 if err != nil { 467 return err 468 } 469 defer closer.Close() 470 471 req := &csipbv1.NodeUnstageVolumeRequest{ 472 VolumeId: volID, 473 StagingTargetPath: stagingTargetPath, 474 } 475 _, err = nodeClient.NodeUnstageVolume(ctx, req) 476 return err 477} 478 479func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) { 480 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME) 481} 482 483func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) { 484 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME) 485} 486 487func (c *csiDriverClient) getNodeV1AccessModeMapper(ctx context.Context) (nodeV1AccessModeMapper, error) { 488 supported, err := c.NodeSupportsSingleNodeMultiWriterAccessMode(ctx) 489 if err != nil { 490 return nil, err 491 } 492 if supported { 493 return asSingleNodeMultiWriterCapableCSIAccessModeV1, nil 494 } 495 return asCSIAccessModeV1, nil 496} 497 498func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode { 499 switch am { 500 case api.ReadWriteOnce: 501 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER 502 case api.ReadOnlyMany: 503 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY 504 case api.ReadWriteMany: 505 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER 506 // This mapping exists to enable CSI drivers that lack the 507 // SINGLE_NODE_MULTI_WRITER capability to work with the 508 // ReadWriteOncePod access mode. 509 case api.ReadWriteOncePod: 510 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER 511 } 512 return csipbv1.VolumeCapability_AccessMode_UNKNOWN 513} 514 515func asSingleNodeMultiWriterCapableCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode { 516 switch am { 517 case api.ReadWriteOnce: 518 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER 519 case api.ReadOnlyMany: 520 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY 521 case api.ReadWriteMany: 522 return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER 523 case api.ReadWriteOncePod: 524 return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER 525 } 526 return csipbv1.VolumeCapability_AccessMode_UNKNOWN 527} 528 529func newGrpcConn(addr csiAddr, metricsManager *MetricsManager) (*grpc.ClientConn, error) { 530 network := "unix" 531 klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr)) 532 533 return grpc.Dial( 534 string(addr), 535 grpc.WithInsecure(), 536 grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) { 537 return (&net.Dialer{}).DialContext(ctx, network, target) 538 }), 539 grpc.WithChainUnaryInterceptor(metricsManager.RecordMetricsInterceptor), 540 ) 541} 542 543// CSI client getter with cache. 544// This provides a method to initialize CSI client with driver name and caches 545// it for later use. When CSI clients have not been discovered yet (e.g. 546// on kubelet restart), client initialization will fail. Users of CSI client (e.g. 547// mounter manager and block mapper) can use this to delay CSI client 548// initialization until needed. 549type csiClientGetter struct { 550 sync.RWMutex 551 csiClient csiClient 552 driverName csiDriverName 553} 554 555func (c *csiClientGetter) Get() (csiClient, error) { 556 c.RLock() 557 if c.csiClient != nil { 558 c.RUnlock() 559 return c.csiClient, nil 560 } 561 c.RUnlock() 562 c.Lock() 563 defer c.Unlock() 564 // Double-checking locking criterion. 565 if c.csiClient != nil { 566 return c.csiClient, nil 567 } 568 csi, err := newCsiDriverClient(c.driverName) 569 if err != nil { 570 return nil, err 571 } 572 c.csiClient = csi 573 return c.csiClient, nil 574} 575 576func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) { 577 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS) 578} 579 580func (c *csiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) { 581 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER) 582} 583 584func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) { 585 klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath)) 586 if volID == "" { 587 return nil, errors.New("missing volume id") 588 } 589 if targetPath == "" { 590 return nil, errors.New("missing target path") 591 } 592 if c.nodeV1ClientCreator == nil { 593 return nil, errors.New("nodeV1ClientCreate is nil") 594 } 595 596 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 597 if err != nil { 598 return nil, err 599 } 600 defer closer.Close() 601 602 req := &csipbv1.NodeGetVolumeStatsRequest{ 603 VolumeId: volID, 604 VolumePath: targetPath, 605 } 606 607 resp, err := nodeClient.NodeGetVolumeStats(ctx, req) 608 if err != nil { 609 return nil, err 610 } 611 usages := resp.GetUsage() 612 if usages == nil { 613 return nil, fmt.Errorf("failed to get usage from response. usage is nil") 614 } 615 metrics := &volume.Metrics{ 616 Used: resource.NewQuantity(int64(0), resource.BinarySI), 617 Capacity: resource.NewQuantity(int64(0), resource.BinarySI), 618 Available: resource.NewQuantity(int64(0), resource.BinarySI), 619 InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI), 620 Inodes: resource.NewQuantity(int64(0), resource.BinarySI), 621 InodesFree: resource.NewQuantity(int64(0), resource.BinarySI), 622 } 623 624 if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) { 625 isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx) 626 if err != nil { 627 return nil, err 628 } 629 630 if isSupportNodeVolumeCondition { 631 abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage() 632 metrics.Abnormal, metrics.Message = &abnormal, &message 633 } 634 } 635 636 for _, usage := range usages { 637 if usage == nil { 638 continue 639 } 640 unit := usage.GetUnit() 641 switch unit { 642 case csipbv1.VolumeUsage_BYTES: 643 metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI) 644 metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI) 645 metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI) 646 case csipbv1.VolumeUsage_INODES: 647 metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI) 648 metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI) 649 metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI) 650 default: 651 klog.Errorf("unknown key %s in usage", unit.String()) 652 } 653 654 } 655 return metrics, nil 656} 657 658func (c *csiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) { 659 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION) 660} 661 662func (c *csiDriverClient) NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error) { 663 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP) 664} 665 666func (c *csiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) { 667 klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if the node service has %s capability", capabilityType)) 668 capabilities, err := c.nodeGetCapabilities(ctx) 669 if err != nil { 670 return false, err 671 } 672 673 for _, capability := range capabilities { 674 if capability == nil || capability.GetRpc() == nil { 675 continue 676 } 677 if capability.GetRpc().GetType() == capabilityType { 678 return true, nil 679 } 680 } 681 return false, nil 682} 683 684func (c *csiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) { 685 if c.nodeV1ClientCreator == nil { 686 return []*csipbv1.NodeServiceCapability{}, errors.New("nodeV1ClientCreate is nil") 687 } 688 689 nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager) 690 if err != nil { 691 return []*csipbv1.NodeServiceCapability{}, err 692 } 693 defer closer.Close() 694 695 req := &csipbv1.NodeGetCapabilitiesRequest{} 696 resp, err := nodeClient.NodeGetCapabilities(ctx, req) 697 if err != nil { 698 return []*csipbv1.NodeServiceCapability{}, err 699 } 700 return resp.GetCapabilities(), nil 701} 702 703func isFinalError(err error) bool { 704 // Sources: 705 // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md 706 // https://github.com/container-storage-interface/spec/blob/master/spec.md 707 st, ok := status.FromError(err) 708 if !ok { 709 // This is not gRPC error. The operation must have failed before gRPC 710 // method was called, otherwise we would get gRPC error. 711 // We don't know if any previous volume operation is in progress, be on the safe side. 712 return false 713 } 714 switch st.Code() { 715 case codes.Canceled, // gRPC: Client Application cancelled the request 716 codes.DeadlineExceeded, // gRPC: Timeout 717 codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous volume operation may be still in progress. 718 codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous volume operation may be still in progress. 719 codes.Aborted: // CSI: Operation pending for volume 720 return false 721 } 722 // All other errors mean that operation either did not 723 // even start or failed. It is for sure not in progress. 724 return true 725} 726