1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package edsbalancer contains EDS balancer implementation.
20package edsbalancer
21
22import (
23	"context"
24	"encoding/json"
25	"fmt"
26	"reflect"
27	"time"
28
29	"google.golang.org/grpc/balancer"
30	"google.golang.org/grpc/balancer/roundrobin"
31	"google.golang.org/grpc/connectivity"
32	"google.golang.org/grpc/grpclog"
33	"google.golang.org/grpc/resolver"
34	"google.golang.org/grpc/serviceconfig"
35	"google.golang.org/grpc/xds/internal/balancer/lrs"
36	xdsclient "google.golang.org/grpc/xds/internal/client"
37)
38
39const (
40	defaultTimeout = 10 * time.Second
41	edsName        = "experimental_eds"
42)
43
44var (
45	newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface {
46		return newEDSBalancerImpl(cc, loadStore)
47	}
48)
49
50func init() {
51	balancer.Register(&edsBalancerBuilder{})
52}
53
54type edsBalancerBuilder struct{}
55
56// Build helps implement the balancer.Builder interface.
57func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
58	ctx, cancel := context.WithCancel(context.Background())
59	x := &edsBalancer{
60		ctx:             ctx,
61		cancel:          cancel,
62		cc:              cc,
63		buildOpts:       opts,
64		grpcUpdate:      make(chan interface{}),
65		xdsClientUpdate: make(chan interface{}),
66	}
67	loadStore := lrs.NewStore()
68	x.edsImpl = newEDSBalancer(x.cc, loadStore)
69	x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore)
70	go x.run()
71	return x
72}
73
74func (b *edsBalancerBuilder) Name() string {
75	return edsName
76}
77
78func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
79	var cfg EDSConfig
80	if err := json.Unmarshal(c, &cfg); err != nil {
81		return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err)
82	}
83	return &cfg, nil
84}
85
86// edsBalancerImplInterface defines the interface that edsBalancerImpl must
87// implement to communicate with edsBalancer.
88//
89// It's implemented by the real eds balancer and a fake testing eds balancer.
90type edsBalancerImplInterface interface {
91	// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
92	HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
93	// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
94	HandleChildPolicy(name string, config json.RawMessage)
95	// HandleSubConnStateChange handles state change for SubConn.
96	HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State)
97	// Close closes the eds balancer.
98	Close()
99}
100
101var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer
102
103// edsBalancer manages xdsClient and the actual EDS balancer implementation that
104// does load balancing.
105//
106// It currently has only an edsBalancer. Later, we may add fallback.
107type edsBalancer struct {
108	cc        balancer.ClientConn // *xdsClientConn
109	buildOpts balancer.BuildOptions
110	ctx       context.Context
111	cancel    context.CancelFunc
112
113	// edsBalancer continuously monitor the channels below, and will handle events from them in sync.
114	grpcUpdate      chan interface{}
115	xdsClientUpdate chan interface{}
116
117	client  *xdsclientWrapper // may change when passed a different service config
118	config  *EDSConfig        // may change when passed a different service config
119	edsImpl edsBalancerImplInterface
120}
121
122// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc,
123// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It
124// exits when edsBalancer is closed.
125func (x *edsBalancer) run() {
126	for {
127		select {
128		case update := <-x.grpcUpdate:
129			x.handleGRPCUpdate(update)
130		case update := <-x.xdsClientUpdate:
131			x.handleXDSClientUpdate(update)
132		case <-x.ctx.Done():
133			if x.client != nil {
134				x.client.close()
135			}
136			if x.edsImpl != nil {
137				x.edsImpl.Close()
138			}
139			return
140		}
141	}
142}
143
144func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
145	switch u := update.(type) {
146	case *subConnStateUpdate:
147		if x.edsImpl != nil {
148			x.edsImpl.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
149		}
150	case *balancer.ClientConnState:
151		cfg, _ := u.BalancerConfig.(*EDSConfig)
152		if cfg == nil {
153			// service config parsing failed. should never happen.
154			return
155		}
156
157		x.client.handleUpdate(cfg, u.ResolverState.Attributes)
158
159		if x.config == nil {
160			x.config = cfg
161			return
162		}
163
164		// We will update the edsImpl with the new child policy, if we got a
165		// different one.
166		if x.edsImpl != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) {
167			if cfg.ChildPolicy != nil {
168				x.edsImpl.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
169			} else {
170				x.edsImpl.HandleChildPolicy(roundrobin.Name, nil)
171			}
172		}
173
174		x.config = cfg
175	default:
176		// unreachable path
177		panic("wrong update type")
178	}
179}
180
181func (x *edsBalancer) handleXDSClientUpdate(update interface{}) {
182	switch u := update.(type) {
183	// TODO: this func should accept (*xdsclient.EDSUpdate, error), and process
184	// the error, instead of having a separate loseContact signal.
185	case *xdsclient.EDSUpdate:
186		x.edsImpl.HandleEDSResponse(u)
187	case *loseContact:
188		// loseContact can be useful for going into fallback.
189	default:
190		panic("unexpected xds client update type")
191	}
192}
193
194type subConnStateUpdate struct {
195	sc    balancer.SubConn
196	state balancer.SubConnState
197}
198
199func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
200	grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange")
201}
202
203func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
204	grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs")
205}
206
207func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
208	update := &subConnStateUpdate{
209		sc:    sc,
210		state: state,
211	}
212	select {
213	case x.grpcUpdate <- update:
214	case <-x.ctx.Done():
215	}
216}
217
218func (x *edsBalancer) ResolverError(error) {
219	// TODO: Need to distinguish between connection errors and resource removed
220	// errors. For the former, we will need to handle it later on for fallback.
221	// For the latter, handle it by stopping the watch, closing sub-balancers
222	// and pickers.
223}
224
225func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
226	select {
227	case x.grpcUpdate <- &s:
228	case <-x.ctx.Done():
229	}
230	return nil
231}
232
233func (x *edsBalancer) handleEDSUpdate(resp *xdsclient.EDSUpdate) error {
234	// TODO: this function should take (resp, error), and send them together on
235	// the channel. There doesn't need to be a separate `loseContact` function.
236	select {
237	case x.xdsClientUpdate <- resp:
238	case <-x.ctx.Done():
239	}
240
241	return nil
242}
243
244type loseContact struct {
245}
246
247// TODO: delete loseContact when handleEDSUpdate takes (resp, error).
248func (x *edsBalancer) loseContact() {
249	select {
250	case x.xdsClientUpdate <- &loseContact{}:
251	case <-x.ctx.Done():
252	}
253}
254
255func (x *edsBalancer) Close() {
256	x.cancel()
257}
258