1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "encoding/json" 23 "fmt" 24 "strconv" 25 "strings" 26 "time" 27 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/grpclog" 30) 31 32const maxInt = int(^uint(0) >> 1) 33 34// MethodConfig defines the configuration recommended by the service providers for a 35// particular method. 36// 37// Deprecated: Users should not use this struct. Service config should be received 38// through name resolver, as specified here 39// https://github.com/grpc/grpc/blob/master/doc/service_config.md 40type MethodConfig struct { 41 // WaitForReady indicates whether RPCs sent to this method should wait until 42 // the connection is ready by default (!failfast). The value specified via the 43 // gRPC client API will override the value set here. 44 WaitForReady *bool 45 // Timeout is the default timeout for RPCs sent to this method. The actual 46 // deadline used will be the minimum of the value specified here and the value 47 // set by the application via the gRPC client API. If either one is not set, 48 // then the other will be used. If neither is set, then the RPC has no deadline. 49 Timeout *time.Duration 50 // MaxReqSize is the maximum allowed payload size for an individual request in a 51 // stream (client->server) in bytes. The size which is measured is the serialized 52 // payload after per-message compression (but before stream compression) in bytes. 53 // The actual value used is the minimum of the value specified here and the value set 54 // by the application via the gRPC client API. If either one is not set, then the other 55 // will be used. If neither is set, then the built-in default is used. 56 MaxReqSize *int 57 // MaxRespSize is the maximum allowed payload size for an individual response in a 58 // stream (server->client) in bytes. 59 MaxRespSize *int 60 // RetryPolicy configures retry options for the method. 61 retryPolicy *retryPolicy 62} 63 64// ServiceConfig is provided by the service provider and contains parameters for how 65// clients that connect to the service should behave. 66// 67// Deprecated: Users should not use this struct. Service config should be received 68// through name resolver, as specified here 69// https://github.com/grpc/grpc/blob/master/doc/service_config.md 70type ServiceConfig struct { 71 // LB is the load balancer the service providers recommends. The balancer specified 72 // via grpc.WithBalancer will override this. 73 LB *string 74 75 // Methods contains a map for the methods in this service. If there is an 76 // exact match for a method (i.e. /service/method) in the map, use the 77 // corresponding MethodConfig. If there's no exact match, look for the 78 // default config for the service (/service/) and use the corresponding 79 // MethodConfig if it exists. Otherwise, the method has no MethodConfig to 80 // use. 81 Methods map[string]MethodConfig 82 83 // If a retryThrottlingPolicy is provided, gRPC will automatically throttle 84 // retry attempts and hedged RPCs when the client’s ratio of failures to 85 // successes exceeds a threshold. 86 // 87 // For each server name, the gRPC client will maintain a token_count which is 88 // initially set to maxTokens, and can take values between 0 and maxTokens. 89 // 90 // Every outgoing RPC (regardless of service or method invoked) will change 91 // token_count as follows: 92 // 93 // - Every failed RPC will decrement the token_count by 1. 94 // - Every successful RPC will increment the token_count by tokenRatio. 95 // 96 // If token_count is less than or equal to maxTokens / 2, then RPCs will not 97 // be retried and hedged RPCs will not be sent. 98 retryThrottling *retryThrottlingPolicy 99} 100 101// retryPolicy defines the go-native version of the retry policy defined by the 102// service config here: 103// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config 104type retryPolicy struct { 105 // MaxAttempts is the maximum number of attempts, including the original RPC. 106 // 107 // This field is required and must be two or greater. 108 maxAttempts int 109 110 // Exponential backoff parameters. The initial retry attempt will occur at 111 // random(0, initialBackoffMS). In general, the nth attempt will occur at 112 // random(0, 113 // min(initialBackoffMS*backoffMultiplier**(n-1), maxBackoffMS)). 114 // 115 // These fields are required and must be greater than zero. 116 initialBackoff time.Duration 117 maxBackoff time.Duration 118 backoffMultiplier float64 119 120 // The set of status codes which may be retried. 121 // 122 // Status codes are specified as strings, e.g., "UNAVAILABLE". 123 // 124 // This field is required and must be non-empty. 125 // Note: a set is used to store this for easy lookup. 126 retryableStatusCodes map[codes.Code]bool 127} 128 129type jsonRetryPolicy struct { 130 MaxAttempts int 131 InitialBackoff string 132 MaxBackoff string 133 BackoffMultiplier float64 134 RetryableStatusCodes []codes.Code 135} 136 137// retryThrottlingPolicy defines the go-native version of the retry throttling 138// policy defined by the service config here: 139// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config 140type retryThrottlingPolicy struct { 141 // The number of tokens starts at maxTokens. The token_count will always be 142 // between 0 and maxTokens. 143 // 144 // This field is required and must be greater than zero. 145 MaxTokens float64 146 // The amount of tokens to add on each successful RPC. Typically this will 147 // be some number between 0 and 1, e.g., 0.1. 148 // 149 // This field is required and must be greater than zero. Up to 3 decimal 150 // places are supported. 151 TokenRatio float64 152} 153 154func parseDuration(s *string) (*time.Duration, error) { 155 if s == nil { 156 return nil, nil 157 } 158 if !strings.HasSuffix(*s, "s") { 159 return nil, fmt.Errorf("malformed duration %q", *s) 160 } 161 ss := strings.SplitN((*s)[:len(*s)-1], ".", 3) 162 if len(ss) > 2 { 163 return nil, fmt.Errorf("malformed duration %q", *s) 164 } 165 // hasDigits is set if either the whole or fractional part of the number is 166 // present, since both are optional but one is required. 167 hasDigits := false 168 var d time.Duration 169 if len(ss[0]) > 0 { 170 i, err := strconv.ParseInt(ss[0], 10, 32) 171 if err != nil { 172 return nil, fmt.Errorf("malformed duration %q: %v", *s, err) 173 } 174 d = time.Duration(i) * time.Second 175 hasDigits = true 176 } 177 if len(ss) == 2 && len(ss[1]) > 0 { 178 if len(ss[1]) > 9 { 179 return nil, fmt.Errorf("malformed duration %q", *s) 180 } 181 f, err := strconv.ParseInt(ss[1], 10, 64) 182 if err != nil { 183 return nil, fmt.Errorf("malformed duration %q: %v", *s, err) 184 } 185 for i := 9; i > len(ss[1]); i-- { 186 f *= 10 187 } 188 d += time.Duration(f) 189 hasDigits = true 190 } 191 if !hasDigits { 192 return nil, fmt.Errorf("malformed duration %q", *s) 193 } 194 195 return &d, nil 196} 197 198type jsonName struct { 199 Service *string 200 Method *string 201} 202 203func (j jsonName) generatePath() (string, bool) { 204 if j.Service == nil { 205 return "", false 206 } 207 res := "/" + *j.Service + "/" 208 if j.Method != nil { 209 res += *j.Method 210 } 211 return res, true 212} 213 214// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. 215type jsonMC struct { 216 Name *[]jsonName 217 WaitForReady *bool 218 Timeout *string 219 MaxRequestMessageBytes *int64 220 MaxResponseMessageBytes *int64 221 RetryPolicy *jsonRetryPolicy 222} 223 224// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. 225type jsonSC struct { 226 LoadBalancingPolicy *string 227 MethodConfig *[]jsonMC 228 RetryThrottling *retryThrottlingPolicy 229} 230 231func parseServiceConfig(js string) (ServiceConfig, error) { 232 var rsc jsonSC 233 err := json.Unmarshal([]byte(js), &rsc) 234 if err != nil { 235 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 236 return ServiceConfig{}, err 237 } 238 sc := ServiceConfig{ 239 LB: rsc.LoadBalancingPolicy, 240 Methods: make(map[string]MethodConfig), 241 retryThrottling: rsc.RetryThrottling, 242 } 243 if rsc.MethodConfig == nil { 244 return sc, nil 245 } 246 247 for _, m := range *rsc.MethodConfig { 248 if m.Name == nil { 249 continue 250 } 251 d, err := parseDuration(m.Timeout) 252 if err != nil { 253 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 254 return ServiceConfig{}, err 255 } 256 257 mc := MethodConfig{ 258 WaitForReady: m.WaitForReady, 259 Timeout: d, 260 } 261 if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil { 262 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 263 return ServiceConfig{}, err 264 } 265 if m.MaxRequestMessageBytes != nil { 266 if *m.MaxRequestMessageBytes > int64(maxInt) { 267 mc.MaxReqSize = newInt(maxInt) 268 } else { 269 mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes)) 270 } 271 } 272 if m.MaxResponseMessageBytes != nil { 273 if *m.MaxResponseMessageBytes > int64(maxInt) { 274 mc.MaxRespSize = newInt(maxInt) 275 } else { 276 mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes)) 277 } 278 } 279 for _, n := range *m.Name { 280 if path, valid := n.generatePath(); valid { 281 sc.Methods[path] = mc 282 } 283 } 284 } 285 286 if sc.retryThrottling != nil { 287 if sc.retryThrottling.MaxTokens <= 0 || 288 sc.retryThrottling.MaxTokens >= 1000 || 289 sc.retryThrottling.TokenRatio <= 0 { 290 // Illegal throttling config; disable throttling. 291 sc.retryThrottling = nil 292 } 293 } 294 return sc, nil 295} 296 297func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) { 298 if jrp == nil { 299 return nil, nil 300 } 301 ib, err := parseDuration(&jrp.InitialBackoff) 302 if err != nil { 303 return nil, err 304 } 305 mb, err := parseDuration(&jrp.MaxBackoff) 306 if err != nil { 307 return nil, err 308 } 309 310 if jrp.MaxAttempts <= 1 || 311 *ib <= 0 || 312 *mb <= 0 || 313 jrp.BackoffMultiplier <= 0 || 314 len(jrp.RetryableStatusCodes) == 0 { 315 grpclog.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp) 316 return nil, nil 317 } 318 319 rp := &retryPolicy{ 320 maxAttempts: jrp.MaxAttempts, 321 initialBackoff: *ib, 322 maxBackoff: *mb, 323 backoffMultiplier: jrp.BackoffMultiplier, 324 retryableStatusCodes: make(map[codes.Code]bool), 325 } 326 if rp.maxAttempts > 5 { 327 // TODO(retry): Make the max maxAttempts configurable. 328 rp.maxAttempts = 5 329 } 330 for _, code := range jrp.RetryableStatusCodes { 331 rp.retryableStatusCodes[code] = true 332 } 333 return rp, nil 334} 335 336func min(a, b *int) *int { 337 if *a < *b { 338 return a 339 } 340 return b 341} 342 343func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { 344 if mcMax == nil && doptMax == nil { 345 return &defaultVal 346 } 347 if mcMax != nil && doptMax != nil { 348 return min(mcMax, doptMax) 349 } 350 if mcMax != nil { 351 return mcMax 352 } 353 return doptMax 354} 355 356func newInt(b int) *int { 357 return &b 358} 359