1// +build !providerless 2 3/* 4Copyright 2020 The Kubernetes Authors. 5 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17*/ 18 19package containerserviceclient 20 21import ( 22 "context" 23 "fmt" 24 "net/http" 25 "time" 26 27 "github.com/Azure/azure-sdk-for-go/services/containerservice/mgmt/2020-04-01/containerservice" 28 "github.com/Azure/go-autorest/autorest" 29 "github.com/Azure/go-autorest/autorest/azure" 30 "github.com/Azure/go-autorest/autorest/to" 31 32 "k8s.io/client-go/util/flowcontrol" 33 "k8s.io/klog/v2" 34 azclients "k8s.io/legacy-cloud-providers/azure/clients" 35 "k8s.io/legacy-cloud-providers/azure/clients/armclient" 36 "k8s.io/legacy-cloud-providers/azure/metrics" 37 "k8s.io/legacy-cloud-providers/azure/retry" 38) 39 40var _ Interface = &Client{} 41 42// Client implements ContainerService client Interface. 43type Client struct { 44 armClient armclient.Interface 45 subscriptionID string 46 47 // Rate limiting configures. 48 rateLimiterReader flowcontrol.RateLimiter 49 rateLimiterWriter flowcontrol.RateLimiter 50 51 // ARM throttling configures. 52 RetryAfterReader time.Time 53 RetryAfterWriter time.Time 54} 55 56// New creates a new ContainerServiceClient client with ratelimiting. 57func New(config *azclients.ClientConfig) *Client { 58 baseURI := config.ResourceManagerEndpoint 59 authorizer := config.Authorizer 60 armClient := armclient.New(authorizer, baseURI, config.UserAgent, APIVersion, config.Location, config.Backoff) 61 rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig) 62 63 klog.V(2).Infof("Azure ContainerServiceClient (read ops) using rate limit config: QPS=%g, bucket=%d", 64 config.RateLimitConfig.CloudProviderRateLimitQPS, 65 config.RateLimitConfig.CloudProviderRateLimitBucket) 66 klog.V(2).Infof("Azure ContainerServiceClient (write ops) using rate limit config: QPS=%g, bucket=%d", 67 config.RateLimitConfig.CloudProviderRateLimitQPSWrite, 68 config.RateLimitConfig.CloudProviderRateLimitBucketWrite) 69 70 client := &Client{ 71 armClient: armClient, 72 rateLimiterReader: rateLimiterReader, 73 rateLimiterWriter: rateLimiterWriter, 74 subscriptionID: config.SubscriptionID, 75 } 76 77 return client 78} 79 80// Get gets a ManagedCluster. 81func (c *Client) Get(ctx context.Context, resourceGroupName string, managedClusterName string) (containerservice.ManagedCluster, *retry.Error) { 82 mc := metrics.NewMetricContext("managed_clusters", "get", resourceGroupName, c.subscriptionID, "") 83 84 // Report errors if the client is rate limited. 85 if !c.rateLimiterReader.TryAccept() { 86 mc.RateLimitedCount() 87 return containerservice.ManagedCluster{}, retry.GetRateLimitError(false, "GetManagedCluster") 88 } 89 90 // Report errors if the client is throttled. 91 if c.RetryAfterReader.After(time.Now()) { 92 mc.ThrottledCount() 93 rerr := retry.GetThrottlingError("GetManagedCluster", "client throttled", c.RetryAfterReader) 94 return containerservice.ManagedCluster{}, rerr 95 } 96 97 result, rerr := c.getManagedCluster(ctx, resourceGroupName, managedClusterName) 98 mc.Observe(rerr.Error()) 99 if rerr != nil { 100 if rerr.IsThrottled() { 101 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 102 c.RetryAfterReader = rerr.RetryAfter 103 } 104 105 return result, rerr 106 } 107 108 return result, nil 109} 110 111// getManagedCluster gets a ManagedCluster. 112func (c *Client) getManagedCluster(ctx context.Context, resourceGroupName string, managedClusterName string) (containerservice.ManagedCluster, *retry.Error) { 113 resourceID := armclient.GetResourceID( 114 c.subscriptionID, 115 resourceGroupName, 116 "Microsoft.ContainerService/managedClusters", 117 managedClusterName, 118 ) 119 result := containerservice.ManagedCluster{} 120 121 response, rerr := c.armClient.GetResource(ctx, resourceID, "") 122 defer c.armClient.CloseResponse(ctx, response) 123 if rerr != nil { 124 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.get.request", resourceID, rerr.Error()) 125 return result, rerr 126 } 127 128 err := autorest.Respond( 129 response, 130 azure.WithErrorUnlessStatusCode(http.StatusOK), 131 autorest.ByUnmarshallingJSON(&result)) 132 if err != nil { 133 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.get.respond", resourceID, err) 134 return result, retry.GetError(response, err) 135 } 136 137 result.Response = autorest.Response{Response: response} 138 return result, nil 139} 140 141// List gets a list of ManagedClusters in the resource group. 142func (c *Client) List(ctx context.Context, resourceGroupName string) ([]containerservice.ManagedCluster, *retry.Error) { 143 mc := metrics.NewMetricContext("managed_clusters", "list", resourceGroupName, c.subscriptionID, "") 144 145 // Report errors if the client is rate limited. 146 if !c.rateLimiterReader.TryAccept() { 147 mc.RateLimitedCount() 148 return nil, retry.GetRateLimitError(false, "ListManagedCluster") 149 } 150 151 // Report errors if the client is throttled. 152 if c.RetryAfterReader.After(time.Now()) { 153 mc.ThrottledCount() 154 rerr := retry.GetThrottlingError("ListManagedCluster", "client throttled", c.RetryAfterReader) 155 return nil, rerr 156 } 157 158 result, rerr := c.listManagedCluster(ctx, resourceGroupName) 159 mc.Observe(rerr.Error()) 160 if rerr != nil { 161 if rerr.IsThrottled() { 162 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 163 c.RetryAfterReader = rerr.RetryAfter 164 } 165 166 return result, rerr 167 } 168 169 return result, nil 170} 171 172// listManagedCluster gets a list of ManagedClusters in the resource group. 173func (c *Client) listManagedCluster(ctx context.Context, resourceGroupName string) ([]containerservice.ManagedCluster, *retry.Error) { 174 resourceID := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.ContainerService/managedClusters", 175 autorest.Encode("path", c.subscriptionID), 176 autorest.Encode("path", resourceGroupName)) 177 result := make([]containerservice.ManagedCluster, 0) 178 page := &ManagedClusterResultPage{} 179 page.fn = c.listNextResults 180 181 resp, rerr := c.armClient.GetResource(ctx, resourceID, "") 182 defer c.armClient.CloseResponse(ctx, resp) 183 if rerr != nil { 184 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.list.request", resourceID, rerr.Error()) 185 return result, rerr 186 } 187 188 var err error 189 page.mclr, err = c.listResponder(resp) 190 if err != nil { 191 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.list.respond", resourceID, err) 192 return result, retry.GetError(resp, err) 193 } 194 195 for { 196 result = append(result, page.Values()...) 197 198 // Abort the loop when there's no nextLink in the response. 199 if to.String(page.Response().NextLink) == "" { 200 break 201 } 202 203 if err = page.NextWithContext(ctx); err != nil { 204 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedcluster.list.next", resourceID, err) 205 return result, retry.GetError(page.Response().Response.Response, err) 206 } 207 } 208 209 return result, nil 210} 211 212func (c *Client) listResponder(resp *http.Response) (result containerservice.ManagedClusterListResult, err error) { 213 err = autorest.Respond( 214 resp, 215 autorest.ByIgnoring(), 216 azure.WithErrorUnlessStatusCode(http.StatusOK), 217 autorest.ByUnmarshallingJSON(&result)) 218 result.Response = autorest.Response{Response: resp} 219 return 220} 221 222// managedClusterListResultPreparer prepares a request to retrieve the next set of results. 223// It returns nil if no more results exist. 224func (c *Client) managedClusterListResultPreparer(ctx context.Context, mclr containerservice.ManagedClusterListResult) (*http.Request, error) { 225 if mclr.NextLink == nil || len(to.String(mclr.NextLink)) < 1 { 226 return nil, nil 227 } 228 229 decorators := []autorest.PrepareDecorator{ 230 autorest.WithBaseURL(to.String(mclr.NextLink)), 231 } 232 return c.armClient.PrepareGetRequest(ctx, decorators...) 233} 234 235// listNextResults retrieves the next set of results, if any. 236func (c *Client) listNextResults(ctx context.Context, lastResults containerservice.ManagedClusterListResult) (result containerservice.ManagedClusterListResult, err error) { 237 req, err := c.managedClusterListResultPreparer(ctx, lastResults) 238 if err != nil { 239 return result, autorest.NewErrorWithError(err, "managedclusterclient", "listNextResults", nil, "Failure preparing next results request") 240 } 241 if req == nil { 242 return 243 } 244 245 resp, rerr := c.armClient.Send(ctx, req) 246 defer c.armClient.CloseResponse(ctx, resp) 247 if rerr != nil { 248 result.Response = autorest.Response{Response: resp} 249 return result, autorest.NewErrorWithError(rerr.Error(), "managedclusterclient", "listNextResults", resp, "Failure sending next results request") 250 } 251 252 result, err = c.listResponder(resp) 253 if err != nil { 254 err = autorest.NewErrorWithError(err, "managedclusterclient", "listNextResults", resp, "Failure responding to next results request") 255 } 256 257 return 258} 259 260// ManagedClusterResultPage contains a page of ManagedCluster values. 261type ManagedClusterResultPage struct { 262 fn func(context.Context, containerservice.ManagedClusterListResult) (containerservice.ManagedClusterListResult, error) 263 mclr containerservice.ManagedClusterListResult 264} 265 266// NextWithContext advances to the next page of values. If there was an error making 267// the request the page does not advance and the error is returned. 268func (page *ManagedClusterResultPage) NextWithContext(ctx context.Context) (err error) { 269 next, err := page.fn(ctx, page.mclr) 270 if err != nil { 271 return err 272 } 273 page.mclr = next 274 return nil 275} 276 277// Next advances to the next page of values. If there was an error making 278// the request the page does not advance and the error is returned. 279// Deprecated: Use NextWithContext() instead. 280func (page *ManagedClusterResultPage) Next() error { 281 return page.NextWithContext(context.Background()) 282} 283 284// NotDone returns true if the page enumeration should be started or is not yet complete. 285func (page ManagedClusterResultPage) NotDone() bool { 286 return !page.mclr.IsEmpty() 287} 288 289// Response returns the raw server response from the last page request. 290func (page ManagedClusterResultPage) Response() containerservice.ManagedClusterListResult { 291 return page.mclr 292} 293 294// Values returns the slice of values for the current page or nil if there are no values. 295func (page ManagedClusterResultPage) Values() []containerservice.ManagedCluster { 296 if page.mclr.IsEmpty() { 297 return nil 298 } 299 return *page.mclr.Value 300} 301 302// CreateOrUpdate creates or updates a ManagedCluster. 303func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, managedClusterName string, parameters containerservice.ManagedCluster, etag string) *retry.Error { 304 mc := metrics.NewMetricContext("managed_clusters", "create_or_update", resourceGroupName, c.subscriptionID, "") 305 306 // Report errors if the client is rate limited. 307 if !c.rateLimiterWriter.TryAccept() { 308 mc.RateLimitedCount() 309 return retry.GetRateLimitError(true, "CreateOrUpdateManagedCluster") 310 } 311 312 // Report errors if the client is throttled. 313 if c.RetryAfterWriter.After(time.Now()) { 314 mc.ThrottledCount() 315 rerr := retry.GetThrottlingError("CreateOrUpdateManagedCluster", "client throttled", c.RetryAfterWriter) 316 return rerr 317 } 318 319 rerr := c.createOrUpdateManagedCluster(ctx, resourceGroupName, managedClusterName, parameters, etag) 320 mc.Observe(rerr.Error()) 321 if rerr != nil { 322 if rerr.IsThrottled() { 323 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 324 c.RetryAfterWriter = rerr.RetryAfter 325 } 326 327 return rerr 328 } 329 330 return nil 331} 332 333// createOrUpdateManagedCluster creates or updates a ManagedCluster. 334func (c *Client) createOrUpdateManagedCluster(ctx context.Context, resourceGroupName string, managedClusterName string, parameters containerservice.ManagedCluster, etag string) *retry.Error { 335 resourceID := armclient.GetResourceID( 336 c.subscriptionID, 337 resourceGroupName, 338 "Microsoft.ContainerService/managedClusters", 339 managedClusterName, 340 ) 341 decorators := []autorest.PrepareDecorator{ 342 autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}), 343 autorest.WithJSON(parameters), 344 } 345 if etag != "" { 346 decorators = append(decorators, autorest.WithHeader("If-Match", autorest.String(etag))) 347 } 348 349 response, rerr := c.armClient.PutResourceWithDecorators(ctx, resourceID, parameters, decorators) 350 defer c.armClient.CloseResponse(ctx, response) 351 if rerr != nil { 352 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedCluster.put.request", resourceID, rerr.Error()) 353 return rerr 354 } 355 356 if response != nil && response.StatusCode != http.StatusNoContent { 357 _, rerr = c.createOrUpdateResponder(response) 358 if rerr != nil { 359 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "managedCluster.put.respond", resourceID, rerr.Error()) 360 return rerr 361 } 362 } 363 364 return nil 365} 366 367func (c *Client) createOrUpdateResponder(resp *http.Response) (*containerservice.ManagedCluster, *retry.Error) { 368 result := &containerservice.ManagedCluster{} 369 err := autorest.Respond( 370 resp, 371 azure.WithErrorUnlessStatusCode(http.StatusOK, http.StatusCreated), 372 autorest.ByUnmarshallingJSON(&result)) 373 result.Response = autorest.Response{Response: resp} 374 return result, retry.GetError(resp, err) 375} 376 377// Delete deletes a ManagedCluster by name. 378func (c *Client) Delete(ctx context.Context, resourceGroupName string, managedClusterName string) *retry.Error { 379 mc := metrics.NewMetricContext("managed_clusters", "delete", resourceGroupName, c.subscriptionID, "") 380 381 // Report errors if the client is rate limited. 382 if !c.rateLimiterWriter.TryAccept() { 383 mc.RateLimitedCount() 384 return retry.GetRateLimitError(true, "DeleteManagedCluster") 385 } 386 387 // Report errors if the client is throttled. 388 if c.RetryAfterWriter.After(time.Now()) { 389 mc.ThrottledCount() 390 rerr := retry.GetThrottlingError("DeleteManagedCluster", "client throttled", c.RetryAfterWriter) 391 return rerr 392 } 393 394 rerr := c.deleteManagedCluster(ctx, resourceGroupName, managedClusterName) 395 mc.Observe(rerr.Error()) 396 if rerr != nil { 397 if rerr.IsThrottled() { 398 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 399 c.RetryAfterWriter = rerr.RetryAfter 400 } 401 402 return rerr 403 } 404 405 return nil 406} 407 408// deleteManagedCluster deletes a ManagedCluster by name. 409func (c *Client) deleteManagedCluster(ctx context.Context, resourceGroupName string, managedClusterName string) *retry.Error { 410 resourceID := armclient.GetResourceID( 411 c.subscriptionID, 412 resourceGroupName, 413 "Microsoft.ContainerService/managedClusters", 414 managedClusterName, 415 ) 416 417 return c.armClient.DeleteResource(ctx, resourceID, "") 418} 419