1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "encoding/json" 23 "fmt" 24 "strconv" 25 "strings" 26 "time" 27 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/grpclog" 30) 31 32const maxInt = int(^uint(0) >> 1) 33 34// MethodConfig defines the configuration recommended by the service providers for a 35// particular method. 36// 37// Deprecated: Users should not use this struct. Service config should be received 38// through name resolver, as specified here 39// https://github.com/grpc/grpc/blob/master/doc/service_config.md 40type MethodConfig struct { 41 // WaitForReady indicates whether RPCs sent to this method should wait until 42 // the connection is ready by default (!failfast). The value specified via the 43 // gRPC client API will override the value set here. 44 WaitForReady *bool 45 // Timeout is the default timeout for RPCs sent to this method. The actual 46 // deadline used will be the minimum of the value specified here and the value 47 // set by the application via the gRPC client API. If either one is not set, 48 // then the other will be used. If neither is set, then the RPC has no deadline. 49 Timeout *time.Duration 50 // MaxReqSize is the maximum allowed payload size for an individual request in a 51 // stream (client->server) in bytes. The size which is measured is the serialized 52 // payload after per-message compression (but before stream compression) in bytes. 53 // The actual value used is the minimum of the value specified here and the value set 54 // by the application via the gRPC client API. If either one is not set, then the other 55 // will be used. If neither is set, then the built-in default is used. 56 MaxReqSize *int 57 // MaxRespSize is the maximum allowed payload size for an individual response in a 58 // stream (server->client) in bytes. 59 MaxRespSize *int 60 // RetryPolicy configures retry options for the method. 61 retryPolicy *retryPolicy 62} 63 64// ServiceConfig is provided by the service provider and contains parameters for how 65// clients that connect to the service should behave. 66// 67// Deprecated: Users should not use this struct. Service config should be received 68// through name resolver, as specified here 69// https://github.com/grpc/grpc/blob/master/doc/service_config.md 70type ServiceConfig struct { 71 // LB is the load balancer the service providers recommends. The balancer specified 72 // via grpc.WithBalancer will override this. 73 LB *string 74 75 // Methods contains a map for the methods in this service. If there is an 76 // exact match for a method (i.e. /service/method) in the map, use the 77 // corresponding MethodConfig. If there's no exact match, look for the 78 // default config for the service (/service/) and use the corresponding 79 // MethodConfig if it exists. Otherwise, the method has no MethodConfig to 80 // use. 81 Methods map[string]MethodConfig 82 83 // If a retryThrottlingPolicy is provided, gRPC will automatically throttle 84 // retry attempts and hedged RPCs when the client’s ratio of failures to 85 // successes exceeds a threshold. 86 // 87 // For each server name, the gRPC client will maintain a token_count which is 88 // initially set to maxTokens, and can take values between 0 and maxTokens. 89 // 90 // Every outgoing RPC (regardless of service or method invoked) will change 91 // token_count as follows: 92 // 93 // - Every failed RPC will decrement the token_count by 1. 94 // - Every successful RPC will increment the token_count by tokenRatio. 95 // 96 // If token_count is less than or equal to maxTokens / 2, then RPCs will not 97 // be retried and hedged RPCs will not be sent. 98 retryThrottling *retryThrottlingPolicy 99 // healthCheckConfig must be set as one of the requirement to enable LB channel 100 // health check. 101 healthCheckConfig *healthCheckConfig 102} 103 104// healthCheckConfig defines the go-native version of the LB channel health check config. 105type healthCheckConfig struct { 106 // serviceName is the service name to use in the health-checking request. 107 ServiceName string 108} 109 110// retryPolicy defines the go-native version of the retry policy defined by the 111// service config here: 112// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config 113type retryPolicy struct { 114 // MaxAttempts is the maximum number of attempts, including the original RPC. 115 // 116 // This field is required and must be two or greater. 117 maxAttempts int 118 119 // Exponential backoff parameters. The initial retry attempt will occur at 120 // random(0, initialBackoffMS). In general, the nth attempt will occur at 121 // random(0, 122 // min(initialBackoffMS*backoffMultiplier**(n-1), maxBackoffMS)). 123 // 124 // These fields are required and must be greater than zero. 125 initialBackoff time.Duration 126 maxBackoff time.Duration 127 backoffMultiplier float64 128 129 // The set of status codes which may be retried. 130 // 131 // Status codes are specified as strings, e.g., "UNAVAILABLE". 132 // 133 // This field is required and must be non-empty. 134 // Note: a set is used to store this for easy lookup. 135 retryableStatusCodes map[codes.Code]bool 136} 137 138type jsonRetryPolicy struct { 139 MaxAttempts int 140 InitialBackoff string 141 MaxBackoff string 142 BackoffMultiplier float64 143 RetryableStatusCodes []codes.Code 144} 145 146// retryThrottlingPolicy defines the go-native version of the retry throttling 147// policy defined by the service config here: 148// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config 149type retryThrottlingPolicy struct { 150 // The number of tokens starts at maxTokens. The token_count will always be 151 // between 0 and maxTokens. 152 // 153 // This field is required and must be greater than zero. 154 MaxTokens float64 155 // The amount of tokens to add on each successful RPC. Typically this will 156 // be some number between 0 and 1, e.g., 0.1. 157 // 158 // This field is required and must be greater than zero. Up to 3 decimal 159 // places are supported. 160 TokenRatio float64 161} 162 163func parseDuration(s *string) (*time.Duration, error) { 164 if s == nil { 165 return nil, nil 166 } 167 if !strings.HasSuffix(*s, "s") { 168 return nil, fmt.Errorf("malformed duration %q", *s) 169 } 170 ss := strings.SplitN((*s)[:len(*s)-1], ".", 3) 171 if len(ss) > 2 { 172 return nil, fmt.Errorf("malformed duration %q", *s) 173 } 174 // hasDigits is set if either the whole or fractional part of the number is 175 // present, since both are optional but one is required. 176 hasDigits := false 177 var d time.Duration 178 if len(ss[0]) > 0 { 179 i, err := strconv.ParseInt(ss[0], 10, 32) 180 if err != nil { 181 return nil, fmt.Errorf("malformed duration %q: %v", *s, err) 182 } 183 d = time.Duration(i) * time.Second 184 hasDigits = true 185 } 186 if len(ss) == 2 && len(ss[1]) > 0 { 187 if len(ss[1]) > 9 { 188 return nil, fmt.Errorf("malformed duration %q", *s) 189 } 190 f, err := strconv.ParseInt(ss[1], 10, 64) 191 if err != nil { 192 return nil, fmt.Errorf("malformed duration %q: %v", *s, err) 193 } 194 for i := 9; i > len(ss[1]); i-- { 195 f *= 10 196 } 197 d += time.Duration(f) 198 hasDigits = true 199 } 200 if !hasDigits { 201 return nil, fmt.Errorf("malformed duration %q", *s) 202 } 203 204 return &d, nil 205} 206 207type jsonName struct { 208 Service *string 209 Method *string 210} 211 212func (j jsonName) generatePath() (string, bool) { 213 if j.Service == nil { 214 return "", false 215 } 216 res := "/" + *j.Service + "/" 217 if j.Method != nil { 218 res += *j.Method 219 } 220 return res, true 221} 222 223// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. 224type jsonMC struct { 225 Name *[]jsonName 226 WaitForReady *bool 227 Timeout *string 228 MaxRequestMessageBytes *int64 229 MaxResponseMessageBytes *int64 230 RetryPolicy *jsonRetryPolicy 231} 232 233// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. 234type jsonSC struct { 235 LoadBalancingPolicy *string 236 MethodConfig *[]jsonMC 237 RetryThrottling *retryThrottlingPolicy 238 HealthCheckConfig *healthCheckConfig 239} 240 241func parseServiceConfig(js string) (ServiceConfig, error) { 242 if len(js) == 0 { 243 return ServiceConfig{}, fmt.Errorf("no JSON service config provided") 244 } 245 var rsc jsonSC 246 err := json.Unmarshal([]byte(js), &rsc) 247 if err != nil { 248 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 249 return ServiceConfig{}, err 250 } 251 sc := ServiceConfig{ 252 LB: rsc.LoadBalancingPolicy, 253 Methods: make(map[string]MethodConfig), 254 retryThrottling: rsc.RetryThrottling, 255 healthCheckConfig: rsc.HealthCheckConfig, 256 } 257 if rsc.MethodConfig == nil { 258 return sc, nil 259 } 260 261 for _, m := range *rsc.MethodConfig { 262 if m.Name == nil { 263 continue 264 } 265 d, err := parseDuration(m.Timeout) 266 if err != nil { 267 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 268 return ServiceConfig{}, err 269 } 270 271 mc := MethodConfig{ 272 WaitForReady: m.WaitForReady, 273 Timeout: d, 274 } 275 if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil { 276 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 277 return ServiceConfig{}, err 278 } 279 if m.MaxRequestMessageBytes != nil { 280 if *m.MaxRequestMessageBytes > int64(maxInt) { 281 mc.MaxReqSize = newInt(maxInt) 282 } else { 283 mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes)) 284 } 285 } 286 if m.MaxResponseMessageBytes != nil { 287 if *m.MaxResponseMessageBytes > int64(maxInt) { 288 mc.MaxRespSize = newInt(maxInt) 289 } else { 290 mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes)) 291 } 292 } 293 for _, n := range *m.Name { 294 if path, valid := n.generatePath(); valid { 295 sc.Methods[path] = mc 296 } 297 } 298 } 299 300 if sc.retryThrottling != nil { 301 if sc.retryThrottling.MaxTokens <= 0 || 302 sc.retryThrottling.MaxTokens >= 1000 || 303 sc.retryThrottling.TokenRatio <= 0 { 304 // Illegal throttling config; disable throttling. 305 sc.retryThrottling = nil 306 } 307 } 308 return sc, nil 309} 310 311func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) { 312 if jrp == nil { 313 return nil, nil 314 } 315 ib, err := parseDuration(&jrp.InitialBackoff) 316 if err != nil { 317 return nil, err 318 } 319 mb, err := parseDuration(&jrp.MaxBackoff) 320 if err != nil { 321 return nil, err 322 } 323 324 if jrp.MaxAttempts <= 1 || 325 *ib <= 0 || 326 *mb <= 0 || 327 jrp.BackoffMultiplier <= 0 || 328 len(jrp.RetryableStatusCodes) == 0 { 329 grpclog.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp) 330 return nil, nil 331 } 332 333 rp := &retryPolicy{ 334 maxAttempts: jrp.MaxAttempts, 335 initialBackoff: *ib, 336 maxBackoff: *mb, 337 backoffMultiplier: jrp.BackoffMultiplier, 338 retryableStatusCodes: make(map[codes.Code]bool), 339 } 340 if rp.maxAttempts > 5 { 341 // TODO(retry): Make the max maxAttempts configurable. 342 rp.maxAttempts = 5 343 } 344 for _, code := range jrp.RetryableStatusCodes { 345 rp.retryableStatusCodes[code] = true 346 } 347 return rp, nil 348} 349 350func min(a, b *int) *int { 351 if *a < *b { 352 return a 353 } 354 return b 355} 356 357func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { 358 if mcMax == nil && doptMax == nil { 359 return &defaultVal 360 } 361 if mcMax != nil && doptMax != nil { 362 return min(mcMax, doptMax) 363 } 364 if mcMax != nil { 365 return mcMax 366 } 367 return doptMax 368} 369 370func newInt(b int) *int { 371 return &b 372} 373