1/* 2Copyright 2018 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cloudresource 18 19import ( 20 "context" 21 "fmt" 22 "sync" 23 "time" 24 25 "k8s.io/api/core/v1" 26 "k8s.io/apimachinery/pkg/types" 27 "k8s.io/apimachinery/pkg/util/wait" 28 cloudprovider "k8s.io/cloud-provider" 29 30 "k8s.io/klog/v2" 31) 32 33// SyncManager is an interface for making requests to a cloud provider 34type SyncManager interface { 35 Run(stopCh <-chan struct{}) 36 NodeAddresses() ([]v1.NodeAddress, error) 37} 38 39var _ SyncManager = &cloudResourceSyncManager{} 40 41type cloudResourceSyncManager struct { 42 // Cloud provider interface. 43 cloud cloudprovider.Interface 44 // Sync period 45 syncPeriod time.Duration 46 47 nodeAddressesMonitor *sync.Cond 48 nodeAddressesErr error 49 nodeAddresses []v1.NodeAddress 50 51 nodeName types.NodeName 52} 53 54// NewSyncManager creates a manager responsible for collecting resources from a 55// cloud provider through requests that are sensitive to timeouts and hanging 56func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager { 57 return &cloudResourceSyncManager{ 58 cloud: cloud, 59 syncPeriod: syncPeriod, 60 nodeName: nodeName, 61 // nodeAddressesMonitor is a monitor that guards a result (nodeAddresses, 62 // nodeAddressesErr) of the sync loop under the condition that a result has 63 // been saved at least once. The semantics here are: 64 // 65 // * Readers of the result will wait on the monitor until the first result 66 // has been saved. 67 // * The sync loop (i.e. the only writer), will signal all waiters every 68 // time it updates the result. 69 nodeAddressesMonitor: sync.NewCond(&sync.Mutex{}), 70 } 71} 72 73// NodeAddresses waits for the first sync loop to run. If no successful syncs 74// have run, it will return the most recent error. If node addresses have been 75// synced successfully, it will return the list of node addresses from the most 76// recent successful sync. 77func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) { 78 m.nodeAddressesMonitor.L.Lock() 79 defer m.nodeAddressesMonitor.L.Unlock() 80 // wait until there is something 81 for { 82 if addrs, err := m.nodeAddresses, m.nodeAddressesErr; len(addrs) > 0 || err != nil { 83 return addrs, err 84 } 85 klog.V(5).InfoS("Waiting for cloud provider to provide node addresses") 86 m.nodeAddressesMonitor.Wait() 87 } 88} 89 90// getNodeAddresses calls the cloud provider to get a current list of node addresses. 91func (m *cloudResourceSyncManager) getNodeAddresses() ([]v1.NodeAddress, error) { 92 // TODO(roberthbailey): Can we do this without having credentials to talk to 93 // the cloud provider? 94 // TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and 95 // returned an interface. 96 // TODO: If IP addresses couldn't be fetched from the cloud provider, should 97 // kubelet fallback on the other methods for getting the IP below? 98 instances, ok := m.cloud.Instances() 99 if !ok { 100 return nil, fmt.Errorf("failed to get instances from cloud provider") 101 } 102 return instances.NodeAddresses(context.TODO(), m.nodeName) 103} 104 105func (m *cloudResourceSyncManager) syncNodeAddresses() { 106 klog.V(5).InfoS("Requesting node addresses from cloud provider for node", "nodeName", m.nodeName) 107 108 addrs, err := m.getNodeAddresses() 109 110 m.nodeAddressesMonitor.L.Lock() 111 defer m.nodeAddressesMonitor.L.Unlock() 112 defer m.nodeAddressesMonitor.Broadcast() 113 114 if err != nil { 115 klog.V(2).InfoS("Node addresses from cloud provider for node not collected", "nodeName", m.nodeName, "err", err) 116 117 if len(m.nodeAddresses) > 0 { 118 // in the event that a sync loop fails when a previous sync had 119 // succeeded, continue to use the old addresses. 120 return 121 } 122 123 m.nodeAddressesErr = fmt.Errorf("failed to get node address from cloud provider: %v", err) 124 return 125 } 126 127 klog.V(5).InfoS("Node addresses from cloud provider for node collected", "nodeName", m.nodeName) 128 m.nodeAddressesErr = nil 129 m.nodeAddresses = addrs 130} 131 132// Run starts the cloud resource sync manager's sync loop. 133func (m *cloudResourceSyncManager) Run(stopCh <-chan struct{}) { 134 wait.Until(m.syncNodeAddresses, m.syncPeriod, stopCh) 135} 136