1// +build !providerless
2
3/*
4Copyright 2020 The Kubernetes Authors.
5
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17*/
18
19package loadbalancerclient
20
21import (
22	"context"
23	"fmt"
24	"net/http"
25	"time"
26
27	"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
28	"github.com/Azure/go-autorest/autorest"
29	"github.com/Azure/go-autorest/autorest/azure"
30	"github.com/Azure/go-autorest/autorest/to"
31
32	"k8s.io/client-go/util/flowcontrol"
33	"k8s.io/klog/v2"
34	azclients "k8s.io/legacy-cloud-providers/azure/clients"
35	"k8s.io/legacy-cloud-providers/azure/clients/armclient"
36	"k8s.io/legacy-cloud-providers/azure/metrics"
37	"k8s.io/legacy-cloud-providers/azure/retry"
38)
39
40var _ Interface = &Client{}
41
42// Client implements LoadBalancer client Interface.
43type Client struct {
44	armClient      armclient.Interface
45	subscriptionID string
46
47	// Rate limiting configures.
48	rateLimiterReader flowcontrol.RateLimiter
49	rateLimiterWriter flowcontrol.RateLimiter
50
51	// ARM throttling configures.
52	RetryAfterReader time.Time
53	RetryAfterWriter time.Time
54}
55
56// New creates a new LoadBalancer client with ratelimiting.
57func New(config *azclients.ClientConfig) *Client {
58	baseURI := config.ResourceManagerEndpoint
59	authorizer := config.Authorizer
60	armClient := armclient.New(authorizer, baseURI, config.UserAgent, APIVersion, config.Location, config.Backoff)
61	rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig)
62
63	klog.V(2).Infof("Azure LoadBalancersClient (read ops) using rate limit config: QPS=%g, bucket=%d",
64		config.RateLimitConfig.CloudProviderRateLimitQPS,
65		config.RateLimitConfig.CloudProviderRateLimitBucket)
66	klog.V(2).Infof("Azure LoadBalancersClient (write ops) using rate limit config: QPS=%g, bucket=%d",
67		config.RateLimitConfig.CloudProviderRateLimitQPSWrite,
68		config.RateLimitConfig.CloudProviderRateLimitBucketWrite)
69
70	client := &Client{
71		armClient:         armClient,
72		rateLimiterReader: rateLimiterReader,
73		rateLimiterWriter: rateLimiterWriter,
74		subscriptionID:    config.SubscriptionID,
75	}
76
77	return client
78}
79
80// Get gets a LoadBalancer.
81func (c *Client) Get(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (network.LoadBalancer, *retry.Error) {
82	mc := metrics.NewMetricContext("load_balancers", "get", resourceGroupName, c.subscriptionID, "")
83
84	// Report errors if the client is rate limited.
85	if !c.rateLimiterReader.TryAccept() {
86		mc.RateLimitedCount()
87		return network.LoadBalancer{}, retry.GetRateLimitError(false, "LBGet")
88	}
89
90	// Report errors if the client is throttled.
91	if c.RetryAfterReader.After(time.Now()) {
92		mc.ThrottledCount()
93		rerr := retry.GetThrottlingError("LBGet", "client throttled", c.RetryAfterReader)
94		return network.LoadBalancer{}, rerr
95	}
96
97	result, rerr := c.getLB(ctx, resourceGroupName, loadBalancerName, expand)
98	mc.Observe(rerr.Error())
99	if rerr != nil {
100		if rerr.IsThrottled() {
101			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
102			c.RetryAfterReader = rerr.RetryAfter
103		}
104
105		return result, rerr
106	}
107
108	return result, nil
109}
110
111// getLB gets a LoadBalancer.
112func (c *Client) getLB(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (network.LoadBalancer, *retry.Error) {
113	resourceID := armclient.GetResourceID(
114		c.subscriptionID,
115		resourceGroupName,
116		"Microsoft.Network/loadBalancers",
117		loadBalancerName,
118	)
119	result := network.LoadBalancer{}
120
121	response, rerr := c.armClient.GetResource(ctx, resourceID, expand)
122	defer c.armClient.CloseResponse(ctx, response)
123	if rerr != nil {
124		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.get.request", resourceID, rerr.Error())
125		return result, rerr
126	}
127
128	err := autorest.Respond(
129		response,
130		azure.WithErrorUnlessStatusCode(http.StatusOK),
131		autorest.ByUnmarshallingJSON(&result))
132	if err != nil {
133		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.get.respond", resourceID, err)
134		return result, retry.GetError(response, err)
135	}
136
137	result.Response = autorest.Response{Response: response}
138	return result, nil
139}
140
141// List gets a list of LoadBalancer in the resource group.
142func (c *Client) List(ctx context.Context, resourceGroupName string) ([]network.LoadBalancer, *retry.Error) {
143	mc := metrics.NewMetricContext("load_balancers", "list", resourceGroupName, c.subscriptionID, "")
144
145	// Report errors if the client is rate limited.
146	if !c.rateLimiterReader.TryAccept() {
147		mc.RateLimitedCount()
148		return nil, retry.GetRateLimitError(false, "LBList")
149	}
150
151	// Report errors if the client is throttled.
152	if c.RetryAfterReader.After(time.Now()) {
153		mc.ThrottledCount()
154		rerr := retry.GetThrottlingError("LBList", "client throttled", c.RetryAfterReader)
155		return nil, rerr
156	}
157
158	result, rerr := c.listLB(ctx, resourceGroupName)
159	mc.Observe(rerr.Error())
160	if rerr != nil {
161		if rerr.IsThrottled() {
162			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
163			c.RetryAfterReader = rerr.RetryAfter
164		}
165
166		return result, rerr
167	}
168
169	return result, nil
170}
171
172// listLB gets a list of LoadBalancers in the resource group.
173func (c *Client) listLB(ctx context.Context, resourceGroupName string) ([]network.LoadBalancer, *retry.Error) {
174	resourceID := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers",
175		autorest.Encode("path", c.subscriptionID),
176		autorest.Encode("path", resourceGroupName))
177	result := make([]network.LoadBalancer, 0)
178	page := &LoadBalancerListResultPage{}
179	page.fn = c.listNextResults
180
181	resp, rerr := c.armClient.GetResource(ctx, resourceID, "")
182	defer c.armClient.CloseResponse(ctx, resp)
183	if rerr != nil {
184		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.list.request", resourceID, rerr.Error())
185		return result, rerr
186	}
187
188	var err error
189	page.lblr, err = c.listResponder(resp)
190	if err != nil {
191		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.list.respond", resourceID, err)
192		return result, retry.GetError(resp, err)
193	}
194
195	for {
196		result = append(result, page.Values()...)
197
198		// Abort the loop when there's no nextLink in the response.
199		if to.String(page.Response().NextLink) == "" {
200			break
201		}
202
203		if err = page.NextWithContext(ctx); err != nil {
204			klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.list.next", resourceID, err)
205			return result, retry.GetError(page.Response().Response.Response, err)
206		}
207	}
208
209	return result, nil
210}
211
212// CreateOrUpdate creates or updates a LoadBalancer.
213func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, etag string) *retry.Error {
214	mc := metrics.NewMetricContext("load_balancers", "create_or_update", resourceGroupName, c.subscriptionID, "")
215
216	// Report errors if the client is rate limited.
217	if !c.rateLimiterWriter.TryAccept() {
218		mc.RateLimitedCount()
219		return retry.GetRateLimitError(true, "LBCreateOrUpdate")
220	}
221
222	// Report errors if the client is throttled.
223	if c.RetryAfterWriter.After(time.Now()) {
224		mc.ThrottledCount()
225		rerr := retry.GetThrottlingError("LBCreateOrUpdate", "client throttled", c.RetryAfterWriter)
226		return rerr
227	}
228
229	rerr := c.createOrUpdateLB(ctx, resourceGroupName, loadBalancerName, parameters, etag)
230	mc.Observe(rerr.Error())
231	if rerr != nil {
232		if rerr.IsThrottled() {
233			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
234			c.RetryAfterWriter = rerr.RetryAfter
235		}
236
237		return rerr
238	}
239
240	return nil
241}
242
243// createOrUpdateLB creates or updates a LoadBalancer.
244func (c *Client) createOrUpdateLB(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, etag string) *retry.Error {
245	resourceID := armclient.GetResourceID(
246		c.subscriptionID,
247		resourceGroupName,
248		"Microsoft.Network/loadBalancers",
249		loadBalancerName,
250	)
251	decorators := []autorest.PrepareDecorator{
252		autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}),
253		autorest.WithJSON(parameters),
254	}
255	if etag != "" {
256		decorators = append(decorators, autorest.WithHeader("If-Match", autorest.String(etag)))
257	}
258
259	response, rerr := c.armClient.PutResourceWithDecorators(ctx, resourceID, parameters, decorators)
260	defer c.armClient.CloseResponse(ctx, response)
261	if rerr != nil {
262		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.put.request", resourceID, rerr.Error())
263		return rerr
264	}
265
266	if response != nil && response.StatusCode != http.StatusNoContent {
267		_, rerr = c.createOrUpdateResponder(response)
268		if rerr != nil {
269			klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.put.respond", resourceID, rerr.Error())
270			return rerr
271		}
272	}
273
274	return nil
275}
276
277func (c *Client) createOrUpdateResponder(resp *http.Response) (*network.LoadBalancer, *retry.Error) {
278	result := &network.LoadBalancer{}
279	err := autorest.Respond(
280		resp,
281		azure.WithErrorUnlessStatusCode(http.StatusOK, http.StatusCreated),
282		autorest.ByUnmarshallingJSON(&result))
283	result.Response = autorest.Response{Response: resp}
284	return result, retry.GetError(resp, err)
285}
286
287// Delete deletes a LoadBalancer by name.
288func (c *Client) Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) *retry.Error {
289	mc := metrics.NewMetricContext("load_balancers", "delete", resourceGroupName, c.subscriptionID, "")
290
291	// Report errors if the client is rate limited.
292	if !c.rateLimiterWriter.TryAccept() {
293		mc.RateLimitedCount()
294		return retry.GetRateLimitError(true, "LBDelete")
295	}
296
297	// Report errors if the client is throttled.
298	if c.RetryAfterWriter.After(time.Now()) {
299		mc.ThrottledCount()
300		rerr := retry.GetThrottlingError("LBDelete", "client throttled", c.RetryAfterWriter)
301		return rerr
302	}
303
304	rerr := c.deleteLB(ctx, resourceGroupName, loadBalancerName)
305	mc.Observe(rerr.Error())
306	if rerr != nil {
307		if rerr.IsThrottled() {
308			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
309			c.RetryAfterWriter = rerr.RetryAfter
310		}
311
312		return rerr
313	}
314
315	return nil
316}
317
318// deleteLB deletes a LoadBalancer by name.
319func (c *Client) deleteLB(ctx context.Context, resourceGroupName string, loadBalancerName string) *retry.Error {
320	resourceID := armclient.GetResourceID(
321		c.subscriptionID,
322		resourceGroupName,
323		"Microsoft.Network/loadBalancers",
324		loadBalancerName,
325	)
326
327	return c.armClient.DeleteResource(ctx, resourceID, "")
328}
329
330func (c *Client) listResponder(resp *http.Response) (result network.LoadBalancerListResult, err error) {
331	err = autorest.Respond(
332		resp,
333		autorest.ByIgnoring(),
334		azure.WithErrorUnlessStatusCode(http.StatusOK),
335		autorest.ByUnmarshallingJSON(&result))
336	result.Response = autorest.Response{Response: resp}
337	return
338}
339
340// loadBalancerListResultPreparer prepares a request to retrieve the next set of results.
341// It returns nil if no more results exist.
342func (c *Client) loadBalancerListResultPreparer(ctx context.Context, lblr network.LoadBalancerListResult) (*http.Request, error) {
343	if lblr.NextLink == nil || len(to.String(lblr.NextLink)) < 1 {
344		return nil, nil
345	}
346
347	decorators := []autorest.PrepareDecorator{
348		autorest.WithBaseURL(to.String(lblr.NextLink)),
349	}
350	return c.armClient.PrepareGetRequest(ctx, decorators...)
351}
352
353// listNextResults retrieves the next set of results, if any.
354func (c *Client) listNextResults(ctx context.Context, lastResults network.LoadBalancerListResult) (result network.LoadBalancerListResult, err error) {
355	req, err := c.loadBalancerListResultPreparer(ctx, lastResults)
356	if err != nil {
357		return result, autorest.NewErrorWithError(err, "loadbalancerclient", "listNextResults", nil, "Failure preparing next results request")
358	}
359	if req == nil {
360		return
361	}
362
363	resp, rerr := c.armClient.Send(ctx, req)
364	defer c.armClient.CloseResponse(ctx, resp)
365	if rerr != nil {
366		result.Response = autorest.Response{Response: resp}
367		return result, autorest.NewErrorWithError(rerr.Error(), "loadbalancerclient", "listNextResults", resp, "Failure sending next results request")
368	}
369
370	result, err = c.listResponder(resp)
371	if err != nil {
372		err = autorest.NewErrorWithError(err, "loadbalancerclient", "listNextResults", resp, "Failure responding to next results request")
373	}
374
375	return
376}
377
378// LoadBalancerListResultPage contains a page of LoadBalancer values.
379type LoadBalancerListResultPage struct {
380	fn   func(context.Context, network.LoadBalancerListResult) (network.LoadBalancerListResult, error)
381	lblr network.LoadBalancerListResult
382}
383
384// NextWithContext advances to the next page of values.  If there was an error making
385// the request the page does not advance and the error is returned.
386func (page *LoadBalancerListResultPage) NextWithContext(ctx context.Context) (err error) {
387	next, err := page.fn(ctx, page.lblr)
388	if err != nil {
389		return err
390	}
391	page.lblr = next
392	return nil
393}
394
395// Next advances to the next page of values.  If there was an error making
396// the request the page does not advance and the error is returned.
397// Deprecated: Use NextWithContext() instead.
398func (page *LoadBalancerListResultPage) Next() error {
399	return page.NextWithContext(context.Background())
400}
401
402// NotDone returns true if the page enumeration should be started or is not yet complete.
403func (page LoadBalancerListResultPage) NotDone() bool {
404	return !page.lblr.IsEmpty()
405}
406
407// Response returns the raw server response from the last page request.
408func (page LoadBalancerListResultPage) Response() network.LoadBalancerListResult {
409	return page.lblr
410}
411
412// Values returns the slice of values for the current page or nil if there are no values.
413func (page LoadBalancerListResultPage) Values() []network.LoadBalancer {
414	if page.lblr.IsEmpty() {
415		return nil
416	}
417	return *page.lblr.Value
418}
419