1/* 2 * 3 * Copyright 2020 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package rls 20 21import ( 22 "sync" 23 24 "google.golang.org/grpc" 25 "google.golang.org/grpc/balancer" 26 "google.golang.org/grpc/grpclog" 27 "google.golang.org/grpc/internal/grpcsync" 28) 29 30var ( 31 _ balancer.Balancer = (*rlsBalancer)(nil) 32 33 // For overriding in tests. 34 newRLSClientFunc = newRLSClient 35 logger = grpclog.Component("rls") 36) 37 38// rlsBalancer implements the RLS LB policy. 39type rlsBalancer struct { 40 done *grpcsync.Event 41 cc balancer.ClientConn 42 opts balancer.BuildOptions 43 44 // Mutex protects all the state maintained by the LB policy. 45 // TODO(easwars): Once we add the cache, we will also have another lock for 46 // the cache alone. 47 mu sync.Mutex 48 lbCfg *lbConfig // Most recently received service config. 49 rlsCC *grpc.ClientConn // ClientConn to the RLS server. 50 rlsC *rlsClient // RLS client wrapper. 51 52 ccUpdateCh chan *balancer.ClientConnState 53} 54 55// run is a long running goroutine which handles all the updates that the 56// balancer wishes to handle. The appropriate updateHandler will push the update 57// on to a channel that this goroutine will select on, thereby the handling of 58// the update will happen asynchronously. 59func (lb *rlsBalancer) run() { 60 for { 61 // TODO(easwars): Handle other updates like subConn state changes, RLS 62 // responses from the server etc. 63 select { 64 case u := <-lb.ccUpdateCh: 65 lb.handleClientConnUpdate(u) 66 case <-lb.done.Done(): 67 return 68 } 69 } 70} 71 72// handleClientConnUpdate handles updates to the service config. 73// If the RLS server name or the RLS RPC timeout changes, it updates the control 74// channel accordingly. 75// TODO(easwars): Handle updates to other fields in the service config. 76func (lb *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) { 77 logger.Infof("rls: service config: %+v", ccs.BalancerConfig) 78 lb.mu.Lock() 79 defer lb.mu.Unlock() 80 81 if lb.done.HasFired() { 82 logger.Warning("rls: received service config after balancer close") 83 return 84 } 85 86 newCfg := ccs.BalancerConfig.(*lbConfig) 87 if lb.lbCfg.Equal(newCfg) { 88 logger.Info("rls: new service config matches existing config") 89 return 90 } 91 92 lb.updateControlChannel(newCfg) 93 lb.lbCfg = newCfg 94} 95 96// UpdateClientConnState pushes the received ClientConnState update on the 97// update channel which will be processed asynchronously by the run goroutine. 98// Implements balancer.Balancer interface. 99func (lb *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { 100 select { 101 case lb.ccUpdateCh <- &ccs: 102 case <-lb.done.Done(): 103 } 104 return nil 105} 106 107// ResolverErr implements balancer.Balancer interface. 108func (lb *rlsBalancer) ResolverError(error) { 109 // ResolverError is called by gRPC when the name resolver reports an error. 110 // TODO(easwars): How do we handle this? 111 logger.Fatal("rls: ResolverError is not yet unimplemented") 112} 113 114// UpdateSubConnState implements balancer.Balancer interface. 115func (lb *rlsBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) { 116 logger.Fatal("rls: UpdateSubConnState is not yet implemented") 117} 118 119// Cleans up the resources allocated by the LB policy including the clientConn 120// to the RLS server. 121// Implements balancer.Balancer. 122func (lb *rlsBalancer) Close() { 123 lb.mu.Lock() 124 defer lb.mu.Unlock() 125 126 lb.done.Fire() 127 if lb.rlsCC != nil { 128 lb.rlsCC.Close() 129 } 130} 131 132// updateControlChannel updates the RLS client if required. 133// Caller must hold lb.mu. 134func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) { 135 oldCfg := lb.lbCfg 136 if newCfg.lookupService == oldCfg.lookupService && newCfg.lookupServiceTimeout == oldCfg.lookupServiceTimeout { 137 return 138 } 139 140 // Use RPC timeout from new config, if different from existing one. 141 timeout := oldCfg.lookupServiceTimeout 142 if timeout != newCfg.lookupServiceTimeout { 143 timeout = newCfg.lookupServiceTimeout 144 } 145 146 if newCfg.lookupService == oldCfg.lookupService { 147 // This is the case where only the timeout has changed. We will continue 148 // to use the existing clientConn. but will create a new rlsClient with 149 // the new timeout. 150 lb.rlsC = newRLSClientFunc(lb.rlsCC, lb.opts.Target.Endpoint, timeout) 151 return 152 } 153 154 // This is the case where the RLS server name has changed. We need to create 155 // a new clientConn and close the old one. 156 var dopts []grpc.DialOption 157 if dialer := lb.opts.Dialer; dialer != nil { 158 dopts = append(dopts, grpc.WithContextDialer(dialer)) 159 } 160 dopts = append(dopts, dialCreds(lb.opts)) 161 162 cc, err := grpc.Dial(newCfg.lookupService, dopts...) 163 if err != nil { 164 logger.Errorf("rls: dialRLS(%s, %v): %v", newCfg.lookupService, lb.opts, err) 165 // An error from a non-blocking dial indicates something serious. We 166 // should continue to use the old control channel if one exists, and 167 // return so that the rest of the config updates can be processes. 168 return 169 } 170 if lb.rlsCC != nil { 171 lb.rlsCC.Close() 172 } 173 lb.rlsCC = cc 174 lb.rlsC = newRLSClientFunc(cc, lb.opts.Target.Endpoint, timeout) 175} 176 177func dialCreds(opts balancer.BuildOptions) grpc.DialOption { 178 // The control channel should use the same authority as that of the parent 179 // channel. This ensures that the identify of the RLS server and that of the 180 // backend is the same, so if the RLS config is injected by an attacker, it 181 // cannot cause leakage of private information contained in headers set by 182 // the application. 183 server := opts.Target.Authority 184 switch { 185 case opts.DialCreds != nil: 186 if err := opts.DialCreds.OverrideServerName(server); err != nil { 187 logger.Warningf("rls: OverrideServerName(%s) = (%v), using Insecure", server, err) 188 return grpc.WithInsecure() 189 } 190 return grpc.WithTransportCredentials(opts.DialCreds) 191 case opts.CredsBundle != nil: 192 return grpc.WithTransportCredentials(opts.CredsBundle.TransportCredentials()) 193 default: 194 logger.Warning("rls: no credentials available, using Insecure") 195 return grpc.WithInsecure() 196 } 197} 198