1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "encoding/json" 23 "fmt" 24 "strconv" 25 "strings" 26 "time" 27 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/grpclog" 30) 31 32const maxInt = int(^uint(0) >> 1) 33 34// MethodConfig defines the configuration recommended by the service providers for a 35// particular method. 36// 37// Deprecated: Users should not use this struct. Service config should be received 38// through name resolver, as specified here 39// https://github.com/grpc/grpc/blob/master/doc/service_config.md 40type MethodConfig struct { 41 // WaitForReady indicates whether RPCs sent to this method should wait until 42 // the connection is ready by default (!failfast). The value specified via the 43 // gRPC client API will override the value set here. 44 WaitForReady *bool 45 // Timeout is the default timeout for RPCs sent to this method. The actual 46 // deadline used will be the minimum of the value specified here and the value 47 // set by the application via the gRPC client API. If either one is not set, 48 // then the other will be used. If neither is set, then the RPC has no deadline. 49 Timeout *time.Duration 50 // MaxReqSize is the maximum allowed payload size for an individual request in a 51 // stream (client->server) in bytes. The size which is measured is the serialized 52 // payload after per-message compression (but before stream compression) in bytes. 53 // The actual value used is the minimum of the value specified here and the value set 54 // by the application via the gRPC client API. If either one is not set, then the other 55 // will be used. If neither is set, then the built-in default is used. 56 MaxReqSize *int 57 // MaxRespSize is the maximum allowed payload size for an individual response in a 58 // stream (server->client) in bytes. 59 MaxRespSize *int 60 // RetryPolicy configures retry options for the method. 61 retryPolicy *retryPolicy 62} 63 64// ServiceConfig is provided by the service provider and contains parameters for how 65// clients that connect to the service should behave. 66// 67// Deprecated: Users should not use this struct. Service config should be received 68// through name resolver, as specified here 69// https://github.com/grpc/grpc/blob/master/doc/service_config.md 70type ServiceConfig struct { 71 // LB is the load balancer the service providers recommends. The balancer specified 72 // via grpc.WithBalancer will override this. 73 LB *string 74 75 // Methods contains a map for the methods in this service. If there is an 76 // exact match for a method (i.e. /service/method) in the map, use the 77 // corresponding MethodConfig. If there's no exact match, look for the 78 // default config for the service (/service/) and use the corresponding 79 // MethodConfig if it exists. Otherwise, the method has no MethodConfig to 80 // use. 81 Methods map[string]MethodConfig 82 83 // If a retryThrottlingPolicy is provided, gRPC will automatically throttle 84 // retry attempts and hedged RPCs when the client’s ratio of failures to 85 // successes exceeds a threshold. 86 // 87 // For each server name, the gRPC client will maintain a token_count which is 88 // initially set to maxTokens, and can take values between 0 and maxTokens. 89 // 90 // Every outgoing RPC (regardless of service or method invoked) will change 91 // token_count as follows: 92 // 93 // - Every failed RPC will decrement the token_count by 1. 94 // - Every successful RPC will increment the token_count by tokenRatio. 95 // 96 // If token_count is less than or equal to maxTokens / 2, then RPCs will not 97 // be retried and hedged RPCs will not be sent. 98 retryThrottling *retryThrottlingPolicy 99 // healthCheckConfig must be set as one of the requirement to enable LB channel 100 // health check. 101 healthCheckConfig *healthCheckConfig 102 // rawJSONString stores service config json string that get parsed into 103 // this service config struct. 104 rawJSONString string 105} 106 107// healthCheckConfig defines the go-native version of the LB channel health check config. 108type healthCheckConfig struct { 109 // serviceName is the service name to use in the health-checking request. 110 ServiceName string 111} 112 113// retryPolicy defines the go-native version of the retry policy defined by the 114// service config here: 115// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config 116type retryPolicy struct { 117 // MaxAttempts is the maximum number of attempts, including the original RPC. 118 // 119 // This field is required and must be two or greater. 120 maxAttempts int 121 122 // Exponential backoff parameters. The initial retry attempt will occur at 123 // random(0, initialBackoffMS). In general, the nth attempt will occur at 124 // random(0, 125 // min(initialBackoffMS*backoffMultiplier**(n-1), maxBackoffMS)). 126 // 127 // These fields are required and must be greater than zero. 128 initialBackoff time.Duration 129 maxBackoff time.Duration 130 backoffMultiplier float64 131 132 // The set of status codes which may be retried. 133 // 134 // Status codes are specified as strings, e.g., "UNAVAILABLE". 135 // 136 // This field is required and must be non-empty. 137 // Note: a set is used to store this for easy lookup. 138 retryableStatusCodes map[codes.Code]bool 139} 140 141type jsonRetryPolicy struct { 142 MaxAttempts int 143 InitialBackoff string 144 MaxBackoff string 145 BackoffMultiplier float64 146 RetryableStatusCodes []codes.Code 147} 148 149// retryThrottlingPolicy defines the go-native version of the retry throttling 150// policy defined by the service config here: 151// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config 152type retryThrottlingPolicy struct { 153 // The number of tokens starts at maxTokens. The token_count will always be 154 // between 0 and maxTokens. 155 // 156 // This field is required and must be greater than zero. 157 MaxTokens float64 158 // The amount of tokens to add on each successful RPC. Typically this will 159 // be some number between 0 and 1, e.g., 0.1. 160 // 161 // This field is required and must be greater than zero. Up to 3 decimal 162 // places are supported. 163 TokenRatio float64 164} 165 166func parseDuration(s *string) (*time.Duration, error) { 167 if s == nil { 168 return nil, nil 169 } 170 if !strings.HasSuffix(*s, "s") { 171 return nil, fmt.Errorf("malformed duration %q", *s) 172 } 173 ss := strings.SplitN((*s)[:len(*s)-1], ".", 3) 174 if len(ss) > 2 { 175 return nil, fmt.Errorf("malformed duration %q", *s) 176 } 177 // hasDigits is set if either the whole or fractional part of the number is 178 // present, since both are optional but one is required. 179 hasDigits := false 180 var d time.Duration 181 if len(ss[0]) > 0 { 182 i, err := strconv.ParseInt(ss[0], 10, 32) 183 if err != nil { 184 return nil, fmt.Errorf("malformed duration %q: %v", *s, err) 185 } 186 d = time.Duration(i) * time.Second 187 hasDigits = true 188 } 189 if len(ss) == 2 && len(ss[1]) > 0 { 190 if len(ss[1]) > 9 { 191 return nil, fmt.Errorf("malformed duration %q", *s) 192 } 193 f, err := strconv.ParseInt(ss[1], 10, 64) 194 if err != nil { 195 return nil, fmt.Errorf("malformed duration %q: %v", *s, err) 196 } 197 for i := 9; i > len(ss[1]); i-- { 198 f *= 10 199 } 200 d += time.Duration(f) 201 hasDigits = true 202 } 203 if !hasDigits { 204 return nil, fmt.Errorf("malformed duration %q", *s) 205 } 206 207 return &d, nil 208} 209 210type jsonName struct { 211 Service *string 212 Method *string 213} 214 215func (j jsonName) generatePath() (string, bool) { 216 if j.Service == nil { 217 return "", false 218 } 219 res := "/" + *j.Service + "/" 220 if j.Method != nil { 221 res += *j.Method 222 } 223 return res, true 224} 225 226// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. 227type jsonMC struct { 228 Name *[]jsonName 229 WaitForReady *bool 230 Timeout *string 231 MaxRequestMessageBytes *int64 232 MaxResponseMessageBytes *int64 233 RetryPolicy *jsonRetryPolicy 234} 235 236// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. 237type jsonSC struct { 238 LoadBalancingPolicy *string 239 MethodConfig *[]jsonMC 240 RetryThrottling *retryThrottlingPolicy 241 HealthCheckConfig *healthCheckConfig 242} 243 244func parseServiceConfig(js string) (*ServiceConfig, error) { 245 var rsc jsonSC 246 err := json.Unmarshal([]byte(js), &rsc) 247 if err != nil { 248 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 249 return nil, err 250 } 251 sc := ServiceConfig{ 252 LB: rsc.LoadBalancingPolicy, 253 Methods: make(map[string]MethodConfig), 254 retryThrottling: rsc.RetryThrottling, 255 healthCheckConfig: rsc.HealthCheckConfig, 256 rawJSONString: js, 257 } 258 if rsc.MethodConfig == nil { 259 return &sc, nil 260 } 261 262 for _, m := range *rsc.MethodConfig { 263 if m.Name == nil { 264 continue 265 } 266 d, err := parseDuration(m.Timeout) 267 if err != nil { 268 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 269 return nil, err 270 } 271 272 mc := MethodConfig{ 273 WaitForReady: m.WaitForReady, 274 Timeout: d, 275 } 276 if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil { 277 grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) 278 return nil, err 279 } 280 if m.MaxRequestMessageBytes != nil { 281 if *m.MaxRequestMessageBytes > int64(maxInt) { 282 mc.MaxReqSize = newInt(maxInt) 283 } else { 284 mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes)) 285 } 286 } 287 if m.MaxResponseMessageBytes != nil { 288 if *m.MaxResponseMessageBytes > int64(maxInt) { 289 mc.MaxRespSize = newInt(maxInt) 290 } else { 291 mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes)) 292 } 293 } 294 for _, n := range *m.Name { 295 if path, valid := n.generatePath(); valid { 296 sc.Methods[path] = mc 297 } 298 } 299 } 300 301 if sc.retryThrottling != nil { 302 if sc.retryThrottling.MaxTokens <= 0 || 303 sc.retryThrottling.MaxTokens > 1000 || 304 sc.retryThrottling.TokenRatio <= 0 { 305 // Illegal throttling config; disable throttling. 306 sc.retryThrottling = nil 307 } 308 } 309 return &sc, nil 310} 311 312func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) { 313 if jrp == nil { 314 return nil, nil 315 } 316 ib, err := parseDuration(&jrp.InitialBackoff) 317 if err != nil { 318 return nil, err 319 } 320 mb, err := parseDuration(&jrp.MaxBackoff) 321 if err != nil { 322 return nil, err 323 } 324 325 if jrp.MaxAttempts <= 1 || 326 *ib <= 0 || 327 *mb <= 0 || 328 jrp.BackoffMultiplier <= 0 || 329 len(jrp.RetryableStatusCodes) == 0 { 330 grpclog.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp) 331 return nil, nil 332 } 333 334 rp := &retryPolicy{ 335 maxAttempts: jrp.MaxAttempts, 336 initialBackoff: *ib, 337 maxBackoff: *mb, 338 backoffMultiplier: jrp.BackoffMultiplier, 339 retryableStatusCodes: make(map[codes.Code]bool), 340 } 341 if rp.maxAttempts > 5 { 342 // TODO(retry): Make the max maxAttempts configurable. 343 rp.maxAttempts = 5 344 } 345 for _, code := range jrp.RetryableStatusCodes { 346 rp.retryableStatusCodes[code] = true 347 } 348 return rp, nil 349} 350 351func min(a, b *int) *int { 352 if *a < *b { 353 return a 354 } 355 return b 356} 357 358func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { 359 if mcMax == nil && doptMax == nil { 360 return &defaultVal 361 } 362 if mcMax != nil && doptMax != nil { 363 return min(mcMax, doptMax) 364 } 365 if mcMax != nil { 366 return mcMax 367 } 368 return doptMax 369} 370 371func newInt(b int) *int { 372 return &b 373} 374