1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"encoding/json"
23	"fmt"
24	"strconv"
25	"strings"
26	"time"
27
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/grpclog"
30)
31
32const maxInt = int(^uint(0) >> 1)
33
34// MethodConfig defines the configuration recommended by the service providers for a
35// particular method.
36//
37// Deprecated: Users should not use this struct. Service config should be received
38// through name resolver, as specified here
39// https://github.com/grpc/grpc/blob/master/doc/service_config.md
40type MethodConfig struct {
41	// WaitForReady indicates whether RPCs sent to this method should wait until
42	// the connection is ready by default (!failfast). The value specified via the
43	// gRPC client API will override the value set here.
44	WaitForReady *bool
45	// Timeout is the default timeout for RPCs sent to this method. The actual
46	// deadline used will be the minimum of the value specified here and the value
47	// set by the application via the gRPC client API.  If either one is not set,
48	// then the other will be used.  If neither is set, then the RPC has no deadline.
49	Timeout *time.Duration
50	// MaxReqSize is the maximum allowed payload size for an individual request in a
51	// stream (client->server) in bytes. The size which is measured is the serialized
52	// payload after per-message compression (but before stream compression) in bytes.
53	// The actual value used is the minimum of the value specified here and the value set
54	// by the application via the gRPC client API. If either one is not set, then the other
55	// will be used.  If neither is set, then the built-in default is used.
56	MaxReqSize *int
57	// MaxRespSize is the maximum allowed payload size for an individual response in a
58	// stream (server->client) in bytes.
59	MaxRespSize *int
60	// RetryPolicy configures retry options for the method.
61	retryPolicy *retryPolicy
62}
63
64// ServiceConfig is provided by the service provider and contains parameters for how
65// clients that connect to the service should behave.
66//
67// Deprecated: Users should not use this struct. Service config should be received
68// through name resolver, as specified here
69// https://github.com/grpc/grpc/blob/master/doc/service_config.md
70type ServiceConfig struct {
71	// LB is the load balancer the service providers recommends. The balancer specified
72	// via grpc.WithBalancer will override this.
73	LB *string
74
75	// Methods contains a map for the methods in this service.  If there is an
76	// exact match for a method (i.e. /service/method) in the map, use the
77	// corresponding MethodConfig.  If there's no exact match, look for the
78	// default config for the service (/service/) and use the corresponding
79	// MethodConfig if it exists.  Otherwise, the method has no MethodConfig to
80	// use.
81	Methods map[string]MethodConfig
82
83	// If a retryThrottlingPolicy is provided, gRPC will automatically throttle
84	// retry attempts and hedged RPCs when the client’s ratio of failures to
85	// successes exceeds a threshold.
86	//
87	// For each server name, the gRPC client will maintain a token_count which is
88	// initially set to maxTokens, and can take values between 0 and maxTokens.
89	//
90	// Every outgoing RPC (regardless of service or method invoked) will change
91	// token_count as follows:
92	//
93	//   - Every failed RPC will decrement the token_count by 1.
94	//   - Every successful RPC will increment the token_count by tokenRatio.
95	//
96	// If token_count is less than or equal to maxTokens / 2, then RPCs will not
97	// be retried and hedged RPCs will not be sent.
98	retryThrottling *retryThrottlingPolicy
99}
100
101// retryPolicy defines the go-native version of the retry policy defined by the
102// service config here:
103// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
104type retryPolicy struct {
105	// MaxAttempts is the maximum number of attempts, including the original RPC.
106	//
107	// This field is required and must be two or greater.
108	maxAttempts int
109
110	// Exponential backoff parameters. The initial retry attempt will occur at
111	// random(0, initialBackoffMS). In general, the nth attempt will occur at
112	// random(0,
113	//   min(initialBackoffMS*backoffMultiplier**(n-1), maxBackoffMS)).
114	//
115	// These fields are required and must be greater than zero.
116	initialBackoff    time.Duration
117	maxBackoff        time.Duration
118	backoffMultiplier float64
119
120	// The set of status codes which may be retried.
121	//
122	// Status codes are specified as strings, e.g., "UNAVAILABLE".
123	//
124	// This field is required and must be non-empty.
125	// Note: a set is used to store this for easy lookup.
126	retryableStatusCodes map[codes.Code]bool
127}
128
129type jsonRetryPolicy struct {
130	MaxAttempts          int
131	InitialBackoff       string
132	MaxBackoff           string
133	BackoffMultiplier    float64
134	RetryableStatusCodes []codes.Code
135}
136
137// retryThrottlingPolicy defines the go-native version of the retry throttling
138// policy defined by the service config here:
139// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
140type retryThrottlingPolicy struct {
141	// The number of tokens starts at maxTokens. The token_count will always be
142	// between 0 and maxTokens.
143	//
144	// This field is required and must be greater than zero.
145	MaxTokens float64
146	// The amount of tokens to add on each successful RPC. Typically this will
147	// be some number between 0 and 1, e.g., 0.1.
148	//
149	// This field is required and must be greater than zero. Up to 3 decimal
150	// places are supported.
151	TokenRatio float64
152}
153
154func parseDuration(s *string) (*time.Duration, error) {
155	if s == nil {
156		return nil, nil
157	}
158	if !strings.HasSuffix(*s, "s") {
159		return nil, fmt.Errorf("malformed duration %q", *s)
160	}
161	ss := strings.SplitN((*s)[:len(*s)-1], ".", 3)
162	if len(ss) > 2 {
163		return nil, fmt.Errorf("malformed duration %q", *s)
164	}
165	// hasDigits is set if either the whole or fractional part of the number is
166	// present, since both are optional but one is required.
167	hasDigits := false
168	var d time.Duration
169	if len(ss[0]) > 0 {
170		i, err := strconv.ParseInt(ss[0], 10, 32)
171		if err != nil {
172			return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
173		}
174		d = time.Duration(i) * time.Second
175		hasDigits = true
176	}
177	if len(ss) == 2 && len(ss[1]) > 0 {
178		if len(ss[1]) > 9 {
179			return nil, fmt.Errorf("malformed duration %q", *s)
180		}
181		f, err := strconv.ParseInt(ss[1], 10, 64)
182		if err != nil {
183			return nil, fmt.Errorf("malformed duration %q: %v", *s, err)
184		}
185		for i := 9; i > len(ss[1]); i-- {
186			f *= 10
187		}
188		d += time.Duration(f)
189		hasDigits = true
190	}
191	if !hasDigits {
192		return nil, fmt.Errorf("malformed duration %q", *s)
193	}
194
195	return &d, nil
196}
197
198type jsonName struct {
199	Service *string
200	Method  *string
201}
202
203func (j jsonName) generatePath() (string, bool) {
204	if j.Service == nil {
205		return "", false
206	}
207	res := "/" + *j.Service + "/"
208	if j.Method != nil {
209		res += *j.Method
210	}
211	return res, true
212}
213
214// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
215type jsonMC struct {
216	Name                    *[]jsonName
217	WaitForReady            *bool
218	Timeout                 *string
219	MaxRequestMessageBytes  *int64
220	MaxResponseMessageBytes *int64
221	RetryPolicy             *jsonRetryPolicy
222}
223
224// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
225type jsonSC struct {
226	LoadBalancingPolicy *string
227	MethodConfig        *[]jsonMC
228	RetryThrottling     *retryThrottlingPolicy
229}
230
231func parseServiceConfig(js string) (ServiceConfig, error) {
232	var rsc jsonSC
233	err := json.Unmarshal([]byte(js), &rsc)
234	if err != nil {
235		grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
236		return ServiceConfig{}, err
237	}
238	sc := ServiceConfig{
239		LB:              rsc.LoadBalancingPolicy,
240		Methods:         make(map[string]MethodConfig),
241		retryThrottling: rsc.RetryThrottling,
242	}
243	if rsc.MethodConfig == nil {
244		return sc, nil
245	}
246
247	for _, m := range *rsc.MethodConfig {
248		if m.Name == nil {
249			continue
250		}
251		d, err := parseDuration(m.Timeout)
252		if err != nil {
253			grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
254			return ServiceConfig{}, err
255		}
256
257		mc := MethodConfig{
258			WaitForReady: m.WaitForReady,
259			Timeout:      d,
260		}
261		if mc.retryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
262			grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err)
263			return ServiceConfig{}, err
264		}
265		if m.MaxRequestMessageBytes != nil {
266			if *m.MaxRequestMessageBytes > int64(maxInt) {
267				mc.MaxReqSize = newInt(maxInt)
268			} else {
269				mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes))
270			}
271		}
272		if m.MaxResponseMessageBytes != nil {
273			if *m.MaxResponseMessageBytes > int64(maxInt) {
274				mc.MaxRespSize = newInt(maxInt)
275			} else {
276				mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes))
277			}
278		}
279		for _, n := range *m.Name {
280			if path, valid := n.generatePath(); valid {
281				sc.Methods[path] = mc
282			}
283		}
284	}
285
286	if sc.retryThrottling != nil {
287		if sc.retryThrottling.MaxTokens <= 0 ||
288			sc.retryThrottling.MaxTokens >= 1000 ||
289			sc.retryThrottling.TokenRatio <= 0 {
290			// Illegal throttling config; disable throttling.
291			sc.retryThrottling = nil
292		}
293	}
294	return sc, nil
295}
296
297func convertRetryPolicy(jrp *jsonRetryPolicy) (p *retryPolicy, err error) {
298	if jrp == nil {
299		return nil, nil
300	}
301	ib, err := parseDuration(&jrp.InitialBackoff)
302	if err != nil {
303		return nil, err
304	}
305	mb, err := parseDuration(&jrp.MaxBackoff)
306	if err != nil {
307		return nil, err
308	}
309
310	if jrp.MaxAttempts <= 1 ||
311		*ib <= 0 ||
312		*mb <= 0 ||
313		jrp.BackoffMultiplier <= 0 ||
314		len(jrp.RetryableStatusCodes) == 0 {
315		grpclog.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp)
316		return nil, nil
317	}
318
319	rp := &retryPolicy{
320		maxAttempts:          jrp.MaxAttempts,
321		initialBackoff:       *ib,
322		maxBackoff:           *mb,
323		backoffMultiplier:    jrp.BackoffMultiplier,
324		retryableStatusCodes: make(map[codes.Code]bool),
325	}
326	if rp.maxAttempts > 5 {
327		// TODO(retry): Make the max maxAttempts configurable.
328		rp.maxAttempts = 5
329	}
330	for _, code := range jrp.RetryableStatusCodes {
331		rp.retryableStatusCodes[code] = true
332	}
333	return rp, nil
334}
335
336func min(a, b *int) *int {
337	if *a < *b {
338		return a
339	}
340	return b
341}
342
343func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
344	if mcMax == nil && doptMax == nil {
345		return &defaultVal
346	}
347	if mcMax != nil && doptMax != nil {
348		return min(mcMax, doptMax)
349	}
350	if mcMax != nil {
351		return mcMax
352	}
353	return doptMax
354}
355
356func newInt(b int) *int {
357	return &b
358}
359