1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package rls
20
21import (
22	"errors"
23	"time"
24
25	"google.golang.org/grpc/balancer"
26	"google.golang.org/grpc/balancer/rls/internal/cache"
27	"google.golang.org/grpc/balancer/rls/internal/keys"
28	"google.golang.org/grpc/metadata"
29)
30
31var errRLSThrottled = errors.New("RLS call throttled at client side")
32
33// RLS rlsPicker selects the subConn to be used for a particular RPC. It does
34// not manage subConns directly and usually deletegates to pickers provided by
35// child policies.
36//
37// The RLS LB policy creates a new rlsPicker object whenever its ServiceConfig
38// is updated and provides a bunch of hooks for the rlsPicker to get the latest
39// state that it can used to make its decision.
40type rlsPicker struct {
41	// The keyBuilder map used to generate RLS keys for the RPC. This is built
42	// by the LB policy based on the received ServiceConfig.
43	kbm keys.BuilderMap
44
45	// The following hooks are setup by the LB policy to enable the rlsPicker to
46	// access state stored in the policy. This approach has the following
47	// advantages:
48	// 1. The rlsPicker is loosely coupled with the LB policy in the sense that
49	//    updates happening on the LB policy like the receipt of an RLS
50	//    response, or an update to the default rlsPicker etc are not explicitly
51	//    pushed to the rlsPicker, but are readily available to the rlsPicker
52	//    when it invokes these hooks. And the LB policy takes care of
53	//    synchronizing access to these shared state.
54	// 2. It makes unit testing the rlsPicker easy since any number of these
55	//    hooks could be overridden.
56
57	// readCache is used to read from the data cache and the pending request
58	// map in an atomic fashion. The first return parameter is the entry in the
59	// data cache, and the second indicates whether an entry for the same key
60	// is present in the pending cache.
61	readCache func(cache.Key) (*cache.Entry, bool)
62	// shouldThrottle decides if the current RPC should be throttled at the
63	// client side. It uses an adaptive throttling algorithm.
64	shouldThrottle func() bool
65	// startRLS kicks off an RLS request in the background for the provided RPC
66	// path and keyMap. An entry in the pending request map is created before
67	// sending out the request and an entry in the data cache is created or
68	// updated upon receipt of a response. See implementation in the LB policy
69	// for details.
70	startRLS func(string, keys.KeyMap)
71	// defaultPick enables the rlsPicker to delegate the pick decision to the
72	// rlsPicker returned by the child LB policy pointing to the default target
73	// specified in the service config.
74	defaultPick func(balancer.PickInfo) (balancer.PickResult, error)
75}
76
77// Pick makes the routing decision for every outbound RPC.
78func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
79	// For every incoming request, we first build the RLS keys using the
80	// keyBuilder we received from the LB policy. If no metadata is present in
81	// the context, we end up using an empty key.
82	km := keys.KeyMap{}
83	md, ok := metadata.FromOutgoingContext(info.Ctx)
84	if ok {
85		km = p.kbm.RLSKey(md, info.FullMethodName)
86	}
87
88	// We use the LB policy hook to read the data cache and the pending request
89	// map (whether or not an entry exists) for the RPC path and the generated
90	// RLS keys. We will end up kicking off an RLS request only if there is no
91	// pending request for the current RPC path and keys, and either we didn't
92	// find an entry in the data cache or the entry was stale and it wasn't in
93	// backoff.
94	startRequest := false
95	now := time.Now()
96	entry, pending := p.readCache(cache.Key{Path: info.FullMethodName, KeyMap: km.Str})
97	if entry == nil {
98		startRequest = true
99	} else {
100		entry.Mu.Lock()
101		defer entry.Mu.Unlock()
102		if entry.StaleTime.Before(now) && entry.BackoffTime.Before(now) {
103			// This is the proactive cache refresh.
104			startRequest = true
105		}
106	}
107
108	if startRequest && !pending {
109		if p.shouldThrottle() {
110			// The entry doesn't exist or has expired and the new RLS request
111			// has been throttled. Treat it as an error and delegate to default
112			// pick, if one exists, or fail the pick.
113			if entry == nil || entry.ExpiryTime.Before(now) {
114				if p.defaultPick != nil {
115					return p.defaultPick(info)
116				}
117				return balancer.PickResult{}, errRLSThrottled
118			}
119			// The proactive refresh has been throttled. Nothing to worry, just
120			// keep using the existing entry.
121		} else {
122			p.startRLS(info.FullMethodName, km)
123		}
124	}
125
126	if entry != nil {
127		if entry.ExpiryTime.After(now) {
128			// This is the jolly good case where we have found a valid entry in
129			// the data cache. We delegate to the LB policy associated with
130			// this cache entry.
131			return entry.ChildPicker.Pick(info)
132		} else if entry.BackoffTime.After(now) {
133			// The entry has expired, but is in backoff. We delegate to the
134			// default pick, if one exists, or return the error from the last
135			// failed RLS request for this entry.
136			if p.defaultPick != nil {
137				return p.defaultPick(info)
138			}
139			return balancer.PickResult{}, entry.CallStatus
140		}
141	}
142
143	// We get here only in the following cases:
144	// * No data cache entry or expired entry, RLS request sent out
145	// * No valid data cache entry and Pending cache entry exists
146	// We need to queue to pick which will be handled once the RLS response is
147	// received.
148	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
149}
150