1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package rls 20 21import ( 22 "errors" 23 "time" 24 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/balancer/rls/internal/cache" 27 "google.golang.org/grpc/balancer/rls/internal/keys" 28 "google.golang.org/grpc/metadata" 29) 30 31var errRLSThrottled = errors.New("RLS call throttled at client side") 32 33// RLS rlsPicker selects the subConn to be used for a particular RPC. It does 34// not manage subConns directly and usually deletegates to pickers provided by 35// child policies. 36// 37// The RLS LB policy creates a new rlsPicker object whenever its ServiceConfig 38// is updated and provides a bunch of hooks for the rlsPicker to get the latest 39// state that it can used to make its decision. 40type rlsPicker struct { 41 // The keyBuilder map used to generate RLS keys for the RPC. This is built 42 // by the LB policy based on the received ServiceConfig. 43 kbm keys.BuilderMap 44 45 // The following hooks are setup by the LB policy to enable the rlsPicker to 46 // access state stored in the policy. This approach has the following 47 // advantages: 48 // 1. The rlsPicker is loosely coupled with the LB policy in the sense that 49 // updates happening on the LB policy like the receipt of an RLS 50 // response, or an update to the default rlsPicker etc are not explicitly 51 // pushed to the rlsPicker, but are readily available to the rlsPicker 52 // when it invokes these hooks. And the LB policy takes care of 53 // synchronizing access to these shared state. 54 // 2. It makes unit testing the rlsPicker easy since any number of these 55 // hooks could be overridden. 56 57 // readCache is used to read from the data cache and the pending request 58 // map in an atomic fashion. The first return parameter is the entry in the 59 // data cache, and the second indicates whether an entry for the same key 60 // is present in the pending cache. 61 readCache func(cache.Key) (*cache.Entry, bool) 62 // shouldThrottle decides if the current RPC should be throttled at the 63 // client side. It uses an adaptive throttling algorithm. 64 shouldThrottle func() bool 65 // startRLS kicks off an RLS request in the background for the provided RPC 66 // path and keyMap. An entry in the pending request map is created before 67 // sending out the request and an entry in the data cache is created or 68 // updated upon receipt of a response. See implementation in the LB policy 69 // for details. 70 startRLS func(string, keys.KeyMap) 71 // defaultPick enables the rlsPicker to delegate the pick decision to the 72 // rlsPicker returned by the child LB policy pointing to the default target 73 // specified in the service config. 74 defaultPick func(balancer.PickInfo) (balancer.PickResult, error) 75} 76 77// Pick makes the routing decision for every outbound RPC. 78func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { 79 // For every incoming request, we first build the RLS keys using the 80 // keyBuilder we received from the LB policy. If no metadata is present in 81 // the context, we end up using an empty key. 82 km := keys.KeyMap{} 83 md, ok := metadata.FromOutgoingContext(info.Ctx) 84 if ok { 85 km = p.kbm.RLSKey(md, info.FullMethodName) 86 } 87 88 // We use the LB policy hook to read the data cache and the pending request 89 // map (whether or not an entry exists) for the RPC path and the generated 90 // RLS keys. We will end up kicking off an RLS request only if there is no 91 // pending request for the current RPC path and keys, and either we didn't 92 // find an entry in the data cache or the entry was stale and it wasn't in 93 // backoff. 94 startRequest := false 95 now := time.Now() 96 entry, pending := p.readCache(cache.Key{Path: info.FullMethodName, KeyMap: km.Str}) 97 if entry == nil { 98 startRequest = true 99 } else { 100 entry.Mu.Lock() 101 defer entry.Mu.Unlock() 102 if entry.StaleTime.Before(now) && entry.BackoffTime.Before(now) { 103 // This is the proactive cache refresh. 104 startRequest = true 105 } 106 } 107 108 if startRequest && !pending { 109 if p.shouldThrottle() { 110 // The entry doesn't exist or has expired and the new RLS request 111 // has been throttled. Treat it as an error and delegate to default 112 // pick, if one exists, or fail the pick. 113 if entry == nil || entry.ExpiryTime.Before(now) { 114 if p.defaultPick != nil { 115 return p.defaultPick(info) 116 } 117 return balancer.PickResult{}, errRLSThrottled 118 } 119 // The proactive refresh has been throttled. Nothing to worry, just 120 // keep using the existing entry. 121 } else { 122 p.startRLS(info.FullMethodName, km) 123 } 124 } 125 126 if entry != nil { 127 if entry.ExpiryTime.After(now) { 128 // This is the jolly good case where we have found a valid entry in 129 // the data cache. We delegate to the LB policy associated with 130 // this cache entry. 131 return entry.ChildPicker.Pick(info) 132 } else if entry.BackoffTime.After(now) { 133 // The entry has expired, but is in backoff. We delegate to the 134 // default pick, if one exists, or return the error from the last 135 // failed RLS request for this entry. 136 if p.defaultPick != nil { 137 return p.defaultPick(info) 138 } 139 return balancer.PickResult{}, entry.CallStatus 140 } 141 } 142 143 // We get here only in the following cases: 144 // * No data cache entry or expired entry, RLS request sent out 145 // * No valid data cache entry and Pending cache entry exists 146 // We need to queue to pick which will be handled once the RLS response is 147 // received. 148 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable 149} 150