1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package edsbalancer contains EDS balancer implementation.
20package edsbalancer
21
22import (
23	"context"
24	"encoding/json"
25	"fmt"
26	"time"
27
28	"github.com/google/go-cmp/cmp"
29	"google.golang.org/grpc/balancer"
30	"google.golang.org/grpc/balancer/roundrobin"
31	"google.golang.org/grpc/connectivity"
32	"google.golang.org/grpc/internal/buffer"
33	"google.golang.org/grpc/internal/grpclog"
34	"google.golang.org/grpc/resolver"
35	"google.golang.org/grpc/serviceconfig"
36	"google.golang.org/grpc/xds/internal/balancer/lrs"
37	xdsclient "google.golang.org/grpc/xds/internal/client"
38)
39
40const (
41	defaultTimeout = 10 * time.Second
42	edsName        = "eds_experimental"
43)
44
45var (
46	newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
47		return newEDSBalancerImpl(cc, enqueueState, loadStore, logger)
48	}
49)
50
51func init() {
52	balancer.Register(&edsBalancerBuilder{})
53}
54
55type edsBalancerBuilder struct{}
56
57// Build helps implement the balancer.Builder interface.
58func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
59	ctx, cancel := context.WithCancel(context.Background())
60	x := &edsBalancer{
61		ctx:               ctx,
62		cancel:            cancel,
63		cc:                cc,
64		buildOpts:         opts,
65		grpcUpdate:        make(chan interface{}),
66		xdsClientUpdate:   make(chan interface{}),
67		childPolicyUpdate: buffer.NewUnbounded(),
68	}
69	loadStore := lrs.NewStore()
70	x.logger = grpclog.NewPrefixLogger(loggingPrefix(x))
71	x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, loadStore, x.logger)
72	x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore, x.logger)
73	x.logger.Infof("Created")
74	go x.run()
75	return x
76}
77
78func (b *edsBalancerBuilder) Name() string {
79	return edsName
80}
81
82func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
83	var cfg EDSConfig
84	if err := json.Unmarshal(c, &cfg); err != nil {
85		return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err)
86	}
87	return &cfg, nil
88}
89
90// edsBalancerImplInterface defines the interface that edsBalancerImpl must
91// implement to communicate with edsBalancer.
92//
93// It's implemented by the real eds balancer and a fake testing eds balancer.
94//
95// TODO: none of the methods in this interface needs to be exported.
96type edsBalancerImplInterface interface {
97	// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
98	HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
99	// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
100	HandleChildPolicy(name string, config json.RawMessage)
101	// HandleSubConnStateChange handles state change for SubConn.
102	HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
103	// updateState handle a balancer state update from the priority.
104	updateState(priority priorityType, s balancer.State)
105	// Close closes the eds balancer.
106	Close()
107}
108
109var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer
110
111// edsBalancer manages xdsClient and the actual EDS balancer implementation that
112// does load balancing.
113//
114// It currently has only an edsBalancer. Later, we may add fallback.
115type edsBalancer struct {
116	cc        balancer.ClientConn // *xdsClientConn
117	buildOpts balancer.BuildOptions
118	ctx       context.Context
119	cancel    context.CancelFunc
120
121	logger *grpclog.PrefixLogger
122
123	// edsBalancer continuously monitor the channels below, and will handle events from them in sync.
124	grpcUpdate        chan interface{}
125	xdsClientUpdate   chan interface{}
126	childPolicyUpdate *buffer.Unbounded
127
128	client  *xdsclientWrapper // may change when passed a different service config
129	config  *EDSConfig        // may change when passed a different service config
130	edsImpl edsBalancerImplInterface
131}
132
133// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc,
134// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It
135// exits when edsBalancer is closed.
136func (x *edsBalancer) run() {
137	for {
138		select {
139		case update := <-x.grpcUpdate:
140			x.handleGRPCUpdate(update)
141		case update := <-x.xdsClientUpdate:
142			x.handleXDSClientUpdate(update)
143		case update := <-x.childPolicyUpdate.Get():
144			x.childPolicyUpdate.Load()
145			u := update.(*balancerStateWithPriority)
146			x.edsImpl.updateState(u.priority, u.s)
147		case <-x.ctx.Done():
148			if x.client != nil {
149				x.client.close()
150			}
151			if x.edsImpl != nil {
152				x.edsImpl.Close()
153			}
154			return
155		}
156	}
157}
158
159func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
160	switch u := update.(type) {
161	case *subConnStateUpdate:
162		if x.edsImpl != nil {
163			x.edsImpl.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
164		}
165	case *balancer.ClientConnState:
166		x.logger.Infof("Receive update from resolver, balancer config: %+v", u.BalancerConfig)
167		cfg, _ := u.BalancerConfig.(*EDSConfig)
168		if cfg == nil {
169			// service config parsing failed. should never happen.
170			return
171		}
172
173		x.client.handleUpdate(cfg, u.ResolverState.Attributes)
174
175		if x.config == nil {
176			x.config = cfg
177			return
178		}
179
180		// We will update the edsImpl with the new child policy, if we got a
181		// different one.
182		if x.edsImpl != nil && !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) {
183			if cfg.ChildPolicy != nil {
184				x.edsImpl.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
185			} else {
186				x.edsImpl.HandleChildPolicy(roundrobin.Name, nil)
187			}
188		}
189
190		x.config = cfg
191	default:
192		// unreachable path
193		panic("wrong update type")
194	}
195}
196
197func (x *edsBalancer) handleXDSClientUpdate(update interface{}) {
198	switch u := update.(type) {
199	// TODO: this func should accept (*xdsclient.EDSUpdate, error), and process
200	// the error, instead of having a separate loseContact signal.
201	case *xdsclient.EDSUpdate:
202		x.edsImpl.HandleEDSResponse(u)
203	case *loseContact:
204		// loseContact can be useful for going into fallback.
205	default:
206		panic("unexpected xds client update type")
207	}
208}
209
210type subConnStateUpdate struct {
211	sc    balancer.SubConn
212	state balancer.SubConnState
213}
214
215func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
216	x.logger.Errorf("UpdateSubConnState should be called instead of HandleSubConnStateChange")
217}
218
219func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
220	x.logger.Errorf("UpdateResolverState should be called instead of HandleResolvedAddrs")
221}
222
223func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
224	update := &subConnStateUpdate{
225		sc:    sc,
226		state: state,
227	}
228	select {
229	case x.grpcUpdate <- update:
230	case <-x.ctx.Done():
231	}
232}
233
234func (x *edsBalancer) ResolverError(error) {
235	// TODO: Need to distinguish between connection errors and resource removed
236	// errors. For the former, we will need to handle it later on for fallback.
237	// For the latter, handle it by stopping the watch, closing sub-balancers
238	// and pickers.
239}
240
241func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
242	select {
243	case x.grpcUpdate <- &s:
244	case <-x.ctx.Done():
245	}
246	return nil
247}
248
249func (x *edsBalancer) handleEDSUpdate(resp *xdsclient.EDSUpdate) error {
250	// TODO: this function should take (resp, error), and send them together on
251	// the channel. There doesn't need to be a separate `loseContact` function.
252	select {
253	case x.xdsClientUpdate <- resp:
254	case <-x.ctx.Done():
255	}
256
257	return nil
258}
259
260type loseContact struct {
261}
262
263// TODO: delete loseContact when handleEDSUpdate takes (resp, error).
264func (x *edsBalancer) loseContact() {
265	select {
266	case x.xdsClientUpdate <- &loseContact{}:
267	case <-x.ctx.Done():
268	}
269}
270
271type balancerStateWithPriority struct {
272	priority priorityType
273	s        balancer.State
274}
275
276func (x *edsBalancer) enqueueChildBalancerState(p priorityType, s balancer.State) {
277	x.childPolicyUpdate.Put(&balancerStateWithPriority{
278		priority: p,
279		s:        s,
280	})
281}
282
283func (x *edsBalancer) Close() {
284	x.cancel()
285	x.logger.Infof("Shutdown")
286}
287