1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cloudresource
18
19import (
20	"context"
21	"fmt"
22	"sync"
23	"time"
24
25	"k8s.io/api/core/v1"
26	"k8s.io/apimachinery/pkg/types"
27	"k8s.io/apimachinery/pkg/util/wait"
28	cloudprovider "k8s.io/cloud-provider"
29
30	"k8s.io/klog/v2"
31)
32
33// SyncManager is an interface for making requests to a cloud provider
34type SyncManager interface {
35	Run(stopCh <-chan struct{})
36	NodeAddresses() ([]v1.NodeAddress, error)
37}
38
39var _ SyncManager = &cloudResourceSyncManager{}
40
41type cloudResourceSyncManager struct {
42	// Cloud provider interface.
43	cloud cloudprovider.Interface
44	// Sync period
45	syncPeriod time.Duration
46
47	nodeAddressesMonitor *sync.Cond
48	nodeAddressesErr     error
49	nodeAddresses        []v1.NodeAddress
50
51	nodeName types.NodeName
52}
53
54// NewSyncManager creates a manager responsible for collecting resources from a
55// cloud provider through requests that are sensitive to timeouts and hanging
56func NewSyncManager(cloud cloudprovider.Interface, nodeName types.NodeName, syncPeriod time.Duration) SyncManager {
57	return &cloudResourceSyncManager{
58		cloud:      cloud,
59		syncPeriod: syncPeriod,
60		nodeName:   nodeName,
61		// nodeAddressesMonitor is a monitor that guards a result (nodeAddresses,
62		// nodeAddressesErr) of the sync loop under the condition that a result has
63		// been saved at least once. The semantics here are:
64		//
65		// * Readers of the result will wait on the monitor until the first result
66		//   has been saved.
67		// * The sync loop (i.e. the only writer), will signal all waiters every
68		//   time it updates the result.
69		nodeAddressesMonitor: sync.NewCond(&sync.Mutex{}),
70	}
71}
72
73// NodeAddresses waits for the first sync loop to run. If no successful syncs
74// have run, it will return the most recent error. If node addresses have been
75// synced successfully, it will return the list of node addresses from the most
76// recent successful sync.
77func (m *cloudResourceSyncManager) NodeAddresses() ([]v1.NodeAddress, error) {
78	m.nodeAddressesMonitor.L.Lock()
79	defer m.nodeAddressesMonitor.L.Unlock()
80	// wait until there is something
81	for {
82		if addrs, err := m.nodeAddresses, m.nodeAddressesErr; len(addrs) > 0 || err != nil {
83			return addrs, err
84		}
85		klog.V(5).InfoS("Waiting for cloud provider to provide node addresses")
86		m.nodeAddressesMonitor.Wait()
87	}
88}
89
90// getNodeAddresses calls the cloud provider to get a current list of node addresses.
91func (m *cloudResourceSyncManager) getNodeAddresses() ([]v1.NodeAddress, error) {
92	// TODO(roberthbailey): Can we do this without having credentials to talk to
93	// the cloud provider?
94	// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and
95	// returned an interface.
96	// TODO: If IP addresses couldn't be fetched from the cloud provider, should
97	// kubelet fallback on the other methods for getting the IP below?
98	instances, ok := m.cloud.Instances()
99	if !ok {
100		return nil, fmt.Errorf("failed to get instances from cloud provider")
101	}
102	return instances.NodeAddresses(context.TODO(), m.nodeName)
103}
104
105func (m *cloudResourceSyncManager) syncNodeAddresses() {
106	klog.V(5).InfoS("Requesting node addresses from cloud provider for node", "nodeName", m.nodeName)
107
108	addrs, err := m.getNodeAddresses()
109
110	m.nodeAddressesMonitor.L.Lock()
111	defer m.nodeAddressesMonitor.L.Unlock()
112	defer m.nodeAddressesMonitor.Broadcast()
113
114	if err != nil {
115		klog.V(2).InfoS("Node addresses from cloud provider for node not collected", "nodeName", m.nodeName, "err", err)
116
117		if len(m.nodeAddresses) > 0 {
118			// in the event that a sync loop fails when a previous sync had
119			// succeeded, continue to use the old addresses.
120			return
121		}
122
123		m.nodeAddressesErr = fmt.Errorf("failed to get node address from cloud provider: %v", err)
124		return
125	}
126
127	klog.V(5).InfoS("Node addresses from cloud provider for node collected", "nodeName", m.nodeName)
128	m.nodeAddressesErr = nil
129	m.nodeAddresses = addrs
130}
131
132// Run starts the cloud resource sync manager's sync loop.
133func (m *cloudResourceSyncManager) Run(stopCh <-chan struct{}) {
134	wait.Until(m.syncNodeAddresses, m.syncPeriod, stopCh)
135}
136