1// +build !providerless
2
3/*
4Copyright 2020 The Kubernetes Authors.
5
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17*/
18
19package containerserviceclient
20
21import (
22	"context"
23	"fmt"
24	"net/http"
25	"time"
26
27	"github.com/Azure/azure-sdk-for-go/services/containerservice/mgmt/2020-04-01/containerservice"
28	"github.com/Azure/go-autorest/autorest"
29	"github.com/Azure/go-autorest/autorest/azure"
30	"github.com/Azure/go-autorest/autorest/to"
31
32	"k8s.io/client-go/util/flowcontrol"
33	"k8s.io/klog/v2"
34	azclients "k8s.io/legacy-cloud-providers/azure/clients"
35	"k8s.io/legacy-cloud-providers/azure/clients/armclient"
36	"k8s.io/legacy-cloud-providers/azure/metrics"
37	"k8s.io/legacy-cloud-providers/azure/retry"
38)
39
40var _ Interface = &Client{}
41
42// Client implements ContainerService client Interface.
43type Client struct {
44	armClient      armclient.Interface
45	subscriptionID string
46
47	// Rate limiting configures.
48	rateLimiterReader flowcontrol.RateLimiter
49	rateLimiterWriter flowcontrol.RateLimiter
50
51	// ARM throttling configures.
52	RetryAfterReader time.Time
53	RetryAfterWriter time.Time
54}
55
56// New creates a new ContainerServiceClient client with ratelimiting.
57func New(config *azclients.ClientConfig) *Client {
58	baseURI := config.ResourceManagerEndpoint
59	authorizer := config.Authorizer
60	armClient := armclient.New(authorizer, baseURI, config.UserAgent, APIVersion, config.Location, config.Backoff)
61	rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig)
62
63	klog.V(2).Infof("Azure ContainerServiceClient (read ops) using rate limit config: QPS=%g, bucket=%d",
64		config.RateLimitConfig.CloudProviderRateLimitQPS,
65		config.RateLimitConfig.CloudProviderRateLimitBucket)
66	klog.V(2).Infof("Azure ContainerServiceClient (write ops) using rate limit config: QPS=%g, bucket=%d",
67		config.RateLimitConfig.CloudProviderRateLimitQPSWrite,
68		config.RateLimitConfig.CloudProviderRateLimitBucketWrite)
69
70	client := &Client{
71		armClient:         armClient,
72		rateLimiterReader: rateLimiterReader,
73		rateLimiterWriter: rateLimiterWriter,
74		subscriptionID:    config.SubscriptionID,
75	}
76
77	return client
78}
79
80// Get gets a ManagedCluster.
81func (c *Client) Get(ctx context.Context, resourceGroupName string, managedClusterName string) (containerservice.ManagedCluster, *retry.Error) {
82	mc := metrics.NewMetricContext("managed_clusters", "get", resourceGroupName, c.subscriptionID, "")
83
84	// Report errors if the client is rate limited.
85	if !c.rateLimiterReader.TryAccept() {
86		mc.RateLimitedCount()
87		return containerservice.ManagedCluster{}, retry.GetRateLimitError(false, "GetManagedCluster")
88	}
89
90	// Report errors if the client is throttled.
91	if c.RetryAfterReader.After(time.Now()) {
92		mc.ThrottledCount()
93		rerr := retry.GetThrottlingError("GetManagedCluster", "client throttled", c.RetryAfterReader)
94		return containerservice.ManagedCluster{}, rerr
95	}
96
97	result, rerr := c.getManagedCluster(ctx, resourceGroupName, managedClusterName)
98	mc.Observe(rerr.Error())
99	if rerr != nil {
100		if rerr.IsThrottled() {
101			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
102			c.RetryAfterReader = rerr.RetryAfter
103		}
104
105		return result, rerr
106	}
107
108	return result, nil
109}
110
111// getManagedCluster gets a ManagedCluster.
112func (c *Client) getManagedCluster(ctx context.Context, resourceGroupName string, managedClusterName string) (containerservice.ManagedCluster, *retry.Error) {
113	resourceID := armclient.GetResourceID(
114		c.subscriptionID,
115		resourceGroupName,
116		"Microsoft.ContainerService/managedClusters",
117		managedClusterName,
118	)
119	result := containerservice.ManagedCluster{}
120
121	response, rerr := c.armClient.GetResource(ctx, resourceID, "")
122	defer c.armClient.CloseResponse(ctx, response)
123	if rerr != nil {
124		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.get.request", resourceID, rerr.Error())
125		return result, rerr
126	}
127
128	err := autorest.Respond(
129		response,
130		azure.WithErrorUnlessStatusCode(http.StatusOK),
131		autorest.ByUnmarshallingJSON(&result))
132	if err != nil {
133		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.get.respond", resourceID, err)
134		return result, retry.GetError(response, err)
135	}
136
137	result.Response = autorest.Response{Response: response}
138	return result, nil
139}
140
141// List gets a list of ManagedClusters in the resource group.
142func (c *Client) List(ctx context.Context, resourceGroupName string) ([]containerservice.ManagedCluster, *retry.Error) {
143	mc := metrics.NewMetricContext("managed_clusters", "list", resourceGroupName, c.subscriptionID, "")
144
145	// Report errors if the client is rate limited.
146	if !c.rateLimiterReader.TryAccept() {
147		mc.RateLimitedCount()
148		return nil, retry.GetRateLimitError(false, "ListManagedCluster")
149	}
150
151	// Report errors if the client is throttled.
152	if c.RetryAfterReader.After(time.Now()) {
153		mc.ThrottledCount()
154		rerr := retry.GetThrottlingError("ListManagedCluster", "client throttled", c.RetryAfterReader)
155		return nil, rerr
156	}
157
158	result, rerr := c.listManagedCluster(ctx, resourceGroupName)
159	mc.Observe(rerr.Error())
160	if rerr != nil {
161		if rerr.IsThrottled() {
162			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
163			c.RetryAfterReader = rerr.RetryAfter
164		}
165
166		return result, rerr
167	}
168
169	return result, nil
170}
171
172// listManagedCluster gets a list of ManagedClusters in the resource group.
173func (c *Client) listManagedCluster(ctx context.Context, resourceGroupName string) ([]containerservice.ManagedCluster, *retry.Error) {
174	resourceID := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.ContainerService/managedClusters",
175		autorest.Encode("path", c.subscriptionID),
176		autorest.Encode("path", resourceGroupName))
177	result := make([]containerservice.ManagedCluster, 0)
178	page := &ManagedClusterResultPage{}
179	page.fn = c.listNextResults
180
181	resp, rerr := c.armClient.GetResource(ctx, resourceID, "")
182	defer c.armClient.CloseResponse(ctx, resp)
183	if rerr != nil {
184		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.list.request", resourceID, rerr.Error())
185		return result, rerr
186	}
187
188	var err error
189	page.mclr, err = c.listResponder(resp)
190	if err != nil {
191		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.list.respond", resourceID, err)
192		return result, retry.GetError(resp, err)
193	}
194
195	for {
196		result = append(result, page.Values()...)
197
198		// Abort the loop when there's no nextLink in the response.
199		if to.String(page.Response().NextLink) == "" {
200			break
201		}
202
203		if err = page.NextWithContext(ctx); err != nil {
204			klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.list.next", resourceID, err)
205			return result, retry.GetError(page.Response().Response.Response, err)
206		}
207	}
208
209	return result, nil
210}
211
212func (c *Client) listResponder(resp *http.Response) (result containerservice.ManagedClusterListResult, err error) {
213	err = autorest.Respond(
214		resp,
215		autorest.ByIgnoring(),
216		azure.WithErrorUnlessStatusCode(http.StatusOK),
217		autorest.ByUnmarshallingJSON(&result))
218	result.Response = autorest.Response{Response: resp}
219	return
220}
221
222// managedClusterListResultPreparer prepares a request to retrieve the next set of results.
223// It returns nil if no more results exist.
224func (c *Client) managedClusterListResultPreparer(ctx context.Context, mclr containerservice.ManagedClusterListResult) (*http.Request, error) {
225	if mclr.NextLink == nil || len(to.String(mclr.NextLink)) < 1 {
226		return nil, nil
227	}
228
229	decorators := []autorest.PrepareDecorator{
230		autorest.WithBaseURL(to.String(mclr.NextLink)),
231	}
232	return c.armClient.PrepareGetRequest(ctx, decorators...)
233}
234
235// listNextResults retrieves the next set of results, if any.
236func (c *Client) listNextResults(ctx context.Context, lastResults containerservice.ManagedClusterListResult) (result containerservice.ManagedClusterListResult, err error) {
237	req, err := c.managedClusterListResultPreparer(ctx, lastResults)
238	if err != nil {
239		return result, autorest.NewErrorWithError(err, "managedclusterclient", "listNextResults", nil, "Failure preparing next results request")
240	}
241	if req == nil {
242		return
243	}
244
245	resp, rerr := c.armClient.Send(ctx, req)
246	defer c.armClient.CloseResponse(ctx, resp)
247	if rerr != nil {
248		result.Response = autorest.Response{Response: resp}
249		return result, autorest.NewErrorWithError(rerr.Error(), "managedclusterclient", "listNextResults", resp, "Failure sending next results request")
250	}
251
252	result, err = c.listResponder(resp)
253	if err != nil {
254		err = autorest.NewErrorWithError(err, "managedclusterclient", "listNextResults", resp, "Failure responding to next results request")
255	}
256
257	return
258}
259
260// ManagedClusterResultPage contains a page of ManagedCluster values.
261type ManagedClusterResultPage struct {
262	fn   func(context.Context, containerservice.ManagedClusterListResult) (containerservice.ManagedClusterListResult, error)
263	mclr containerservice.ManagedClusterListResult
264}
265
266// NextWithContext advances to the next page of values.  If there was an error making
267// the request the page does not advance and the error is returned.
268func (page *ManagedClusterResultPage) NextWithContext(ctx context.Context) (err error) {
269	next, err := page.fn(ctx, page.mclr)
270	if err != nil {
271		return err
272	}
273	page.mclr = next
274	return nil
275}
276
277// Next advances to the next page of values.  If there was an error making
278// the request the page does not advance and the error is returned.
279// Deprecated: Use NextWithContext() instead.
280func (page *ManagedClusterResultPage) Next() error {
281	return page.NextWithContext(context.Background())
282}
283
284// NotDone returns true if the page enumeration should be started or is not yet complete.
285func (page ManagedClusterResultPage) NotDone() bool {
286	return !page.mclr.IsEmpty()
287}
288
289// Response returns the raw server response from the last page request.
290func (page ManagedClusterResultPage) Response() containerservice.ManagedClusterListResult {
291	return page.mclr
292}
293
294// Values returns the slice of values for the current page or nil if there are no values.
295func (page ManagedClusterResultPage) Values() []containerservice.ManagedCluster {
296	if page.mclr.IsEmpty() {
297		return nil
298	}
299	return *page.mclr.Value
300}
301
302// CreateOrUpdate creates or updates a ManagedCluster.
303func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, managedClusterName string, parameters containerservice.ManagedCluster, etag string) *retry.Error {
304	mc := metrics.NewMetricContext("managed_clusters", "create_or_update", resourceGroupName, c.subscriptionID, "")
305
306	// Report errors if the client is rate limited.
307	if !c.rateLimiterWriter.TryAccept() {
308		mc.RateLimitedCount()
309		return retry.GetRateLimitError(true, "CreateOrUpdateManagedCluster")
310	}
311
312	// Report errors if the client is throttled.
313	if c.RetryAfterWriter.After(time.Now()) {
314		mc.ThrottledCount()
315		rerr := retry.GetThrottlingError("CreateOrUpdateManagedCluster", "client throttled", c.RetryAfterWriter)
316		return rerr
317	}
318
319	rerr := c.createOrUpdateManagedCluster(ctx, resourceGroupName, managedClusterName, parameters, etag)
320	mc.Observe(rerr.Error())
321	if rerr != nil {
322		if rerr.IsThrottled() {
323			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
324			c.RetryAfterWriter = rerr.RetryAfter
325		}
326
327		return rerr
328	}
329
330	return nil
331}
332
333// createOrUpdateManagedCluster creates or updates a ManagedCluster.
334func (c *Client) createOrUpdateManagedCluster(ctx context.Context, resourceGroupName string, managedClusterName string, parameters containerservice.ManagedCluster, etag string) *retry.Error {
335	resourceID := armclient.GetResourceID(
336		c.subscriptionID,
337		resourceGroupName,
338		"Microsoft.ContainerService/managedClusters",
339		managedClusterName,
340	)
341	decorators := []autorest.PrepareDecorator{
342		autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}),
343		autorest.WithJSON(parameters),
344	}
345	if etag != "" {
346		decorators = append(decorators, autorest.WithHeader("If-Match", autorest.String(etag)))
347	}
348
349	response, rerr := c.armClient.PutResourceWithDecorators(ctx, resourceID, parameters, decorators)
350	defer c.armClient.CloseResponse(ctx, response)
351	if rerr != nil {
352		klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedCluster.put.request", resourceID, rerr.Error())
353		return rerr
354	}
355
356	if response != nil && response.StatusCode != http.StatusNoContent {
357		_, rerr = c.createOrUpdateResponder(response)
358		if rerr != nil {
359			klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedCluster.put.respond", resourceID, rerr.Error())
360			return rerr
361		}
362	}
363
364	return nil
365}
366
367func (c *Client) createOrUpdateResponder(resp *http.Response) (*containerservice.ManagedCluster, *retry.Error) {
368	result := &containerservice.ManagedCluster{}
369	err := autorest.Respond(
370		resp,
371		azure.WithErrorUnlessStatusCode(http.StatusOK, http.StatusCreated),
372		autorest.ByUnmarshallingJSON(&result))
373	result.Response = autorest.Response{Response: resp}
374	return result, retry.GetError(resp, err)
375}
376
377// Delete deletes a ManagedCluster by name.
378func (c *Client) Delete(ctx context.Context, resourceGroupName string, managedClusterName string) *retry.Error {
379	mc := metrics.NewMetricContext("managed_clusters", "delete", resourceGroupName, c.subscriptionID, "")
380
381	// Report errors if the client is rate limited.
382	if !c.rateLimiterWriter.TryAccept() {
383		mc.RateLimitedCount()
384		return retry.GetRateLimitError(true, "DeleteManagedCluster")
385	}
386
387	// Report errors if the client is throttled.
388	if c.RetryAfterWriter.After(time.Now()) {
389		mc.ThrottledCount()
390		rerr := retry.GetThrottlingError("DeleteManagedCluster", "client throttled", c.RetryAfterWriter)
391		return rerr
392	}
393
394	rerr := c.deleteManagedCluster(ctx, resourceGroupName, managedClusterName)
395	mc.Observe(rerr.Error())
396	if rerr != nil {
397		if rerr.IsThrottled() {
398			// Update RetryAfterReader so that no more requests would be sent until RetryAfter expires.
399			c.RetryAfterWriter = rerr.RetryAfter
400		}
401
402		return rerr
403	}
404
405	return nil
406}
407
408// deleteManagedCluster deletes a ManagedCluster by name.
409func (c *Client) deleteManagedCluster(ctx context.Context, resourceGroupName string, managedClusterName string) *retry.Error {
410	resourceID := armclient.GetResourceID(
411		c.subscriptionID,
412		resourceGroupName,
413		"Microsoft.ContainerService/managedClusters",
414		managedClusterName,
415	)
416
417	return c.armClient.DeleteResource(ctx, resourceID, "")
418}
419