1// +build !providerless 2 3/* 4Copyright 2020 The Kubernetes Authors. 5 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17*/ 18 19package loadbalancerclient 20 21import ( 22 "context" 23 "fmt" 24 "net/http" 25 "time" 26 27 "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" 28 "github.com/Azure/go-autorest/autorest" 29 "github.com/Azure/go-autorest/autorest/azure" 30 "github.com/Azure/go-autorest/autorest/to" 31 32 "k8s.io/client-go/util/flowcontrol" 33 "k8s.io/klog/v2" 34 azclients "k8s.io/legacy-cloud-providers/azure/clients" 35 "k8s.io/legacy-cloud-providers/azure/clients/armclient" 36 "k8s.io/legacy-cloud-providers/azure/metrics" 37 "k8s.io/legacy-cloud-providers/azure/retry" 38) 39 40var _ Interface = &Client{} 41 42// Client implements LoadBalancer client Interface. 43type Client struct { 44 armClient armclient.Interface 45 subscriptionID string 46 47 // Rate limiting configures. 48 rateLimiterReader flowcontrol.RateLimiter 49 rateLimiterWriter flowcontrol.RateLimiter 50 51 // ARM throttling configures. 52 RetryAfterReader time.Time 53 RetryAfterWriter time.Time 54} 55 56// New creates a new LoadBalancer client with ratelimiting. 57func New(config *azclients.ClientConfig) *Client { 58 baseURI := config.ResourceManagerEndpoint 59 authorizer := config.Authorizer 60 armClient := armclient.New(authorizer, baseURI, config.UserAgent, APIVersion, config.Location, config.Backoff) 61 rateLimiterReader, rateLimiterWriter := azclients.NewRateLimiter(config.RateLimitConfig) 62 63 klog.V(2).Infof("Azure LoadBalancersClient (read ops) using rate limit config: QPS=%g, bucket=%d", 64 config.RateLimitConfig.CloudProviderRateLimitQPS, 65 config.RateLimitConfig.CloudProviderRateLimitBucket) 66 klog.V(2).Infof("Azure LoadBalancersClient (write ops) using rate limit config: QPS=%g, bucket=%d", 67 config.RateLimitConfig.CloudProviderRateLimitQPSWrite, 68 config.RateLimitConfig.CloudProviderRateLimitBucketWrite) 69 70 client := &Client{ 71 armClient: armClient, 72 rateLimiterReader: rateLimiterReader, 73 rateLimiterWriter: rateLimiterWriter, 74 subscriptionID: config.SubscriptionID, 75 } 76 77 return client 78} 79 80// Get gets a LoadBalancer. 81func (c *Client) Get(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (network.LoadBalancer, *retry.Error) { 82 mc := metrics.NewMetricContext("load_balancers", "get", resourceGroupName, c.subscriptionID, "") 83 84 // Report errors if the client is rate limited. 85 if !c.rateLimiterReader.TryAccept() { 86 mc.RateLimitedCount() 87 return network.LoadBalancer{}, retry.GetRateLimitError(false, "LBGet") 88 } 89 90 // Report errors if the client is throttled. 91 if c.RetryAfterReader.After(time.Now()) { 92 mc.ThrottledCount() 93 rerr := retry.GetThrottlingError("LBGet", "client throttled", c.RetryAfterReader) 94 return network.LoadBalancer{}, rerr 95 } 96 97 result, rerr := c.getLB(ctx, resourceGroupName, loadBalancerName, expand) 98 mc.Observe(rerr.Error()) 99 if rerr != nil { 100 if rerr.IsThrottled() { 101 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 102 c.RetryAfterReader = rerr.RetryAfter 103 } 104 105 return result, rerr 106 } 107 108 return result, nil 109} 110 111// getLB gets a LoadBalancer. 112func (c *Client) getLB(ctx context.Context, resourceGroupName string, loadBalancerName string, expand string) (network.LoadBalancer, *retry.Error) { 113 resourceID := armclient.GetResourceID( 114 c.subscriptionID, 115 resourceGroupName, 116 "Microsoft.Network/loadBalancers", 117 loadBalancerName, 118 ) 119 result := network.LoadBalancer{} 120 121 response, rerr := c.armClient.GetResource(ctx, resourceID, expand) 122 defer c.armClient.CloseResponse(ctx, response) 123 if rerr != nil { 124 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.get.request", resourceID, rerr.Error()) 125 return result, rerr 126 } 127 128 err := autorest.Respond( 129 response, 130 azure.WithErrorUnlessStatusCode(http.StatusOK), 131 autorest.ByUnmarshallingJSON(&result)) 132 if err != nil { 133 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.get.respond", resourceID, err) 134 return result, retry.GetError(response, err) 135 } 136 137 result.Response = autorest.Response{Response: response} 138 return result, nil 139} 140 141// List gets a list of LoadBalancer in the resource group. 142func (c *Client) List(ctx context.Context, resourceGroupName string) ([]network.LoadBalancer, *retry.Error) { 143 mc := metrics.NewMetricContext("load_balancers", "list", resourceGroupName, c.subscriptionID, "") 144 145 // Report errors if the client is rate limited. 146 if !c.rateLimiterReader.TryAccept() { 147 mc.RateLimitedCount() 148 return nil, retry.GetRateLimitError(false, "LBList") 149 } 150 151 // Report errors if the client is throttled. 152 if c.RetryAfterReader.After(time.Now()) { 153 mc.ThrottledCount() 154 rerr := retry.GetThrottlingError("LBList", "client throttled", c.RetryAfterReader) 155 return nil, rerr 156 } 157 158 result, rerr := c.listLB(ctx, resourceGroupName) 159 mc.Observe(rerr.Error()) 160 if rerr != nil { 161 if rerr.IsThrottled() { 162 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 163 c.RetryAfterReader = rerr.RetryAfter 164 } 165 166 return result, rerr 167 } 168 169 return result, nil 170} 171 172// listLB gets a list of LoadBalancers in the resource group. 173func (c *Client) listLB(ctx context.Context, resourceGroupName string) ([]network.LoadBalancer, *retry.Error) { 174 resourceID := fmt.Sprintf("/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers", 175 autorest.Encode("path", c.subscriptionID), 176 autorest.Encode("path", resourceGroupName)) 177 result := make([]network.LoadBalancer, 0) 178 page := &LoadBalancerListResultPage{} 179 page.fn = c.listNextResults 180 181 resp, rerr := c.armClient.GetResource(ctx, resourceID, "") 182 defer c.armClient.CloseResponse(ctx, resp) 183 if rerr != nil { 184 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.list.request", resourceID, rerr.Error()) 185 return result, rerr 186 } 187 188 var err error 189 page.lblr, err = c.listResponder(resp) 190 if err != nil { 191 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.list.respond", resourceID, err) 192 return result, retry.GetError(resp, err) 193 } 194 195 for { 196 result = append(result, page.Values()...) 197 198 // Abort the loop when there's no nextLink in the response. 199 if to.String(page.Response().NextLink) == "" { 200 break 201 } 202 203 if err = page.NextWithContext(ctx); err != nil { 204 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.list.next", resourceID, err) 205 return result, retry.GetError(page.Response().Response.Response, err) 206 } 207 } 208 209 return result, nil 210} 211 212// CreateOrUpdate creates or updates a LoadBalancer. 213func (c *Client) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, etag string) *retry.Error { 214 mc := metrics.NewMetricContext("load_balancers", "create_or_update", resourceGroupName, c.subscriptionID, "") 215 216 // Report errors if the client is rate limited. 217 if !c.rateLimiterWriter.TryAccept() { 218 mc.RateLimitedCount() 219 return retry.GetRateLimitError(true, "LBCreateOrUpdate") 220 } 221 222 // Report errors if the client is throttled. 223 if c.RetryAfterWriter.After(time.Now()) { 224 mc.ThrottledCount() 225 rerr := retry.GetThrottlingError("LBCreateOrUpdate", "client throttled", c.RetryAfterWriter) 226 return rerr 227 } 228 229 rerr := c.createOrUpdateLB(ctx, resourceGroupName, loadBalancerName, parameters, etag) 230 mc.Observe(rerr.Error()) 231 if rerr != nil { 232 if rerr.IsThrottled() { 233 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 234 c.RetryAfterWriter = rerr.RetryAfter 235 } 236 237 return rerr 238 } 239 240 return nil 241} 242 243// createOrUpdateLB creates or updates a LoadBalancer. 244func (c *Client) createOrUpdateLB(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, etag string) *retry.Error { 245 resourceID := armclient.GetResourceID( 246 c.subscriptionID, 247 resourceGroupName, 248 "Microsoft.Network/loadBalancers", 249 loadBalancerName, 250 ) 251 decorators := []autorest.PrepareDecorator{ 252 autorest.WithPathParameters("{resourceID}", map[string]interface{}{"resourceID": resourceID}), 253 autorest.WithJSON(parameters), 254 } 255 if etag != "" { 256 decorators = append(decorators, autorest.WithHeader("If-Match", autorest.String(etag))) 257 } 258 259 response, rerr := c.armClient.PutResourceWithDecorators(ctx, resourceID, parameters, decorators) 260 defer c.armClient.CloseResponse(ctx, response) 261 if rerr != nil { 262 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.put.request", resourceID, rerr.Error()) 263 return rerr 264 } 265 266 if response != nil && response.StatusCode != http.StatusNoContent { 267 _, rerr = c.createOrUpdateResponder(response) 268 if rerr != nil { 269 klog.V(5).Infof("Received error in %s: resourceID: %s, error: %s", "loadbalancer.put.respond", resourceID, rerr.Error()) 270 return rerr 271 } 272 } 273 274 return nil 275} 276 277func (c *Client) createOrUpdateResponder(resp *http.Response) (*network.LoadBalancer, *retry.Error) { 278 result := &network.LoadBalancer{} 279 err := autorest.Respond( 280 resp, 281 azure.WithErrorUnlessStatusCode(http.StatusOK, http.StatusCreated), 282 autorest.ByUnmarshallingJSON(&result)) 283 result.Response = autorest.Response{Response: resp} 284 return result, retry.GetError(resp, err) 285} 286 287// Delete deletes a LoadBalancer by name. 288func (c *Client) Delete(ctx context.Context, resourceGroupName string, loadBalancerName string) *retry.Error { 289 mc := metrics.NewMetricContext("load_balancers", "delete", resourceGroupName, c.subscriptionID, "") 290 291 // Report errors if the client is rate limited. 292 if !c.rateLimiterWriter.TryAccept() { 293 mc.RateLimitedCount() 294 return retry.GetRateLimitError(true, "LBDelete") 295 } 296 297 // Report errors if the client is throttled. 298 if c.RetryAfterWriter.After(time.Now()) { 299 mc.ThrottledCount() 300 rerr := retry.GetThrottlingError("LBDelete", "client throttled", c.RetryAfterWriter) 301 return rerr 302 } 303 304 rerr := c.deleteLB(ctx, resourceGroupName, loadBalancerName) 305 mc.Observe(rerr.Error()) 306 if rerr != nil { 307 if rerr.IsThrottled() { 308 // Update RetryAfterReader so that no more requests would be sent until RetryAfter expires. 309 c.RetryAfterWriter = rerr.RetryAfter 310 } 311 312 return rerr 313 } 314 315 return nil 316} 317 318// deleteLB deletes a LoadBalancer by name. 319func (c *Client) deleteLB(ctx context.Context, resourceGroupName string, loadBalancerName string) *retry.Error { 320 resourceID := armclient.GetResourceID( 321 c.subscriptionID, 322 resourceGroupName, 323 "Microsoft.Network/loadBalancers", 324 loadBalancerName, 325 ) 326 327 return c.armClient.DeleteResource(ctx, resourceID, "") 328} 329 330func (c *Client) listResponder(resp *http.Response) (result network.LoadBalancerListResult, err error) { 331 err = autorest.Respond( 332 resp, 333 autorest.ByIgnoring(), 334 azure.WithErrorUnlessStatusCode(http.StatusOK), 335 autorest.ByUnmarshallingJSON(&result)) 336 result.Response = autorest.Response{Response: resp} 337 return 338} 339 340// loadBalancerListResultPreparer prepares a request to retrieve the next set of results. 341// It returns nil if no more results exist. 342func (c *Client) loadBalancerListResultPreparer(ctx context.Context, lblr network.LoadBalancerListResult) (*http.Request, error) { 343 if lblr.NextLink == nil || len(to.String(lblr.NextLink)) < 1 { 344 return nil, nil 345 } 346 347 decorators := []autorest.PrepareDecorator{ 348 autorest.WithBaseURL(to.String(lblr.NextLink)), 349 } 350 return c.armClient.PrepareGetRequest(ctx, decorators...) 351} 352 353// listNextResults retrieves the next set of results, if any. 354func (c *Client) listNextResults(ctx context.Context, lastResults network.LoadBalancerListResult) (result network.LoadBalancerListResult, err error) { 355 req, err := c.loadBalancerListResultPreparer(ctx, lastResults) 356 if err != nil { 357 return result, autorest.NewErrorWithError(err, "loadbalancerclient", "listNextResults", nil, "Failure preparing next results request") 358 } 359 if req == nil { 360 return 361 } 362 363 resp, rerr := c.armClient.Send(ctx, req) 364 defer c.armClient.CloseResponse(ctx, resp) 365 if rerr != nil { 366 result.Response = autorest.Response{Response: resp} 367 return result, autorest.NewErrorWithError(rerr.Error(), "loadbalancerclient", "listNextResults", resp, "Failure sending next results request") 368 } 369 370 result, err = c.listResponder(resp) 371 if err != nil { 372 err = autorest.NewErrorWithError(err, "loadbalancerclient", "listNextResults", resp, "Failure responding to next results request") 373 } 374 375 return 376} 377 378// LoadBalancerListResultPage contains a page of LoadBalancer values. 379type LoadBalancerListResultPage struct { 380 fn func(context.Context, network.LoadBalancerListResult) (network.LoadBalancerListResult, error) 381 lblr network.LoadBalancerListResult 382} 383 384// NextWithContext advances to the next page of values. If there was an error making 385// the request the page does not advance and the error is returned. 386func (page *LoadBalancerListResultPage) NextWithContext(ctx context.Context) (err error) { 387 next, err := page.fn(ctx, page.lblr) 388 if err != nil { 389 return err 390 } 391 page.lblr = next 392 return nil 393} 394 395// Next advances to the next page of values. If there was an error making 396// the request the page does not advance and the error is returned. 397// Deprecated: Use NextWithContext() instead. 398func (page *LoadBalancerListResultPage) Next() error { 399 return page.NextWithContext(context.Background()) 400} 401 402// NotDone returns true if the page enumeration should be started or is not yet complete. 403func (page LoadBalancerListResultPage) NotDone() bool { 404 return !page.lblr.IsEmpty() 405} 406 407// Response returns the raw server response from the last page request. 408func (page LoadBalancerListResultPage) Response() network.LoadBalancerListResult { 409 return page.lblr 410} 411 412// Values returns the slice of values for the current page or nil if there are no values. 413func (page LoadBalancerListResultPage) Values() []network.LoadBalancer { 414 if page.lblr.IsEmpty() { 415 return nil 416 } 417 return *page.lblr.Value 418} 419