1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package rls
20
21import (
22	"sync"
23
24	"google.golang.org/grpc"
25	"google.golang.org/grpc/balancer"
26	"google.golang.org/grpc/grpclog"
27	"google.golang.org/grpc/internal/grpcsync"
28)
29
30var (
31	_ balancer.Balancer = (*rlsBalancer)(nil)
32
33	// For overriding in tests.
34	newRLSClientFunc = newRLSClient
35	logger           = grpclog.Component("rls")
36)
37
38// rlsBalancer implements the RLS LB policy.
39type rlsBalancer struct {
40	done *grpcsync.Event
41	cc   balancer.ClientConn
42	opts balancer.BuildOptions
43
44	// Mutex protects all the state maintained by the LB policy.
45	// TODO(easwars): Once we add the cache, we will also have another lock for
46	// the cache alone.
47	mu    sync.Mutex
48	lbCfg *lbConfig        // Most recently received service config.
49	rlsCC *grpc.ClientConn // ClientConn to the RLS server.
50	rlsC  *rlsClient       // RLS client wrapper.
51
52	ccUpdateCh chan *balancer.ClientConnState
53}
54
55// run is a long running goroutine which handles all the updates that the
56// balancer wishes to handle. The appropriate updateHandler will push the update
57// on to a channel that this goroutine will select on, thereby the handling of
58// the update will happen asynchronously.
59func (lb *rlsBalancer) run() {
60	for {
61		// TODO(easwars): Handle other updates like subConn state changes, RLS
62		// responses from the server etc.
63		select {
64		case u := <-lb.ccUpdateCh:
65			lb.handleClientConnUpdate(u)
66		case <-lb.done.Done():
67			return
68		}
69	}
70}
71
72// handleClientConnUpdate handles updates to the service config.
73// If the RLS server name or the RLS RPC timeout changes, it updates the control
74// channel accordingly.
75// TODO(easwars): Handle updates to other fields in the service config.
76func (lb *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) {
77	logger.Infof("rls: service config: %+v", ccs.BalancerConfig)
78	lb.mu.Lock()
79	defer lb.mu.Unlock()
80
81	if lb.done.HasFired() {
82		logger.Warning("rls: received service config after balancer close")
83		return
84	}
85
86	newCfg := ccs.BalancerConfig.(*lbConfig)
87	if lb.lbCfg.Equal(newCfg) {
88		logger.Info("rls: new service config matches existing config")
89		return
90	}
91
92	lb.updateControlChannel(newCfg)
93	lb.lbCfg = newCfg
94}
95
96// UpdateClientConnState pushes the received ClientConnState update on the
97// update channel which will be processed asynchronously by the run goroutine.
98// Implements balancer.Balancer interface.
99func (lb *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
100	select {
101	case lb.ccUpdateCh <- &ccs:
102	case <-lb.done.Done():
103	}
104	return nil
105}
106
107// ResolverErr implements balancer.Balancer interface.
108func (lb *rlsBalancer) ResolverError(error) {
109	// ResolverError is called by gRPC when the name resolver reports an error.
110	// TODO(easwars): How do we handle this?
111	logger.Fatal("rls: ResolverError is not yet unimplemented")
112}
113
114// UpdateSubConnState implements balancer.Balancer interface.
115func (lb *rlsBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {
116	logger.Fatal("rls: UpdateSubConnState is not yet implemented")
117}
118
119// Cleans up the resources allocated by the LB policy including the clientConn
120// to the RLS server.
121// Implements balancer.Balancer.
122func (lb *rlsBalancer) Close() {
123	lb.mu.Lock()
124	defer lb.mu.Unlock()
125
126	lb.done.Fire()
127	if lb.rlsCC != nil {
128		lb.rlsCC.Close()
129	}
130}
131
132// updateControlChannel updates the RLS client if required.
133// Caller must hold lb.mu.
134func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) {
135	oldCfg := lb.lbCfg
136	if newCfg.lookupService == oldCfg.lookupService && newCfg.lookupServiceTimeout == oldCfg.lookupServiceTimeout {
137		return
138	}
139
140	// Use RPC timeout from new config, if different from existing one.
141	timeout := oldCfg.lookupServiceTimeout
142	if timeout != newCfg.lookupServiceTimeout {
143		timeout = newCfg.lookupServiceTimeout
144	}
145
146	if newCfg.lookupService == oldCfg.lookupService {
147		// This is the case where only the timeout has changed. We will continue
148		// to use the existing clientConn. but will create a new rlsClient with
149		// the new timeout.
150		lb.rlsC = newRLSClientFunc(lb.rlsCC, lb.opts.Target.Endpoint, timeout)
151		return
152	}
153
154	// This is the case where the RLS server name has changed. We need to create
155	// a new clientConn and close the old one.
156	var dopts []grpc.DialOption
157	if dialer := lb.opts.Dialer; dialer != nil {
158		dopts = append(dopts, grpc.WithContextDialer(dialer))
159	}
160	dopts = append(dopts, dialCreds(lb.opts))
161
162	cc, err := grpc.Dial(newCfg.lookupService, dopts...)
163	if err != nil {
164		logger.Errorf("rls: dialRLS(%s, %v): %v", newCfg.lookupService, lb.opts, err)
165		// An error from a non-blocking dial indicates something serious. We
166		// should continue to use the old control channel if one exists, and
167		// return so that the rest of the config updates can be processes.
168		return
169	}
170	if lb.rlsCC != nil {
171		lb.rlsCC.Close()
172	}
173	lb.rlsCC = cc
174	lb.rlsC = newRLSClientFunc(cc, lb.opts.Target.Endpoint, timeout)
175}
176
177func dialCreds(opts balancer.BuildOptions) grpc.DialOption {
178	// The control channel should use the same authority as that of the parent
179	// channel. This ensures that the identify of the RLS server and that of the
180	// backend is the same, so if the RLS config is injected by an attacker, it
181	// cannot cause leakage of private information contained in headers set by
182	// the application.
183	server := opts.Target.Authority
184	switch {
185	case opts.DialCreds != nil:
186		if err := opts.DialCreds.OverrideServerName(server); err != nil {
187			logger.Warningf("rls: OverrideServerName(%s) = (%v), using Insecure", server, err)
188			return grpc.WithInsecure()
189		}
190		return grpc.WithTransportCredentials(opts.DialCreds)
191	case opts.CredsBundle != nil:
192		return grpc.WithTransportCredentials(opts.CredsBundle.TransportCredentials())
193	default:
194		logger.Warning("rls: no credentials available, using Insecure")
195		return grpc.WithInsecure()
196	}
197}
198