1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package edsbalancer contains EDS balancer implementation. 20package edsbalancer 21 22import ( 23 "context" 24 "encoding/json" 25 "fmt" 26 "reflect" 27 "time" 28 29 "google.golang.org/grpc/balancer" 30 "google.golang.org/grpc/balancer/roundrobin" 31 "google.golang.org/grpc/connectivity" 32 "google.golang.org/grpc/grpclog" 33 "google.golang.org/grpc/resolver" 34 "google.golang.org/grpc/serviceconfig" 35 "google.golang.org/grpc/xds/internal/balancer/lrs" 36 xdsclient "google.golang.org/grpc/xds/internal/client" 37) 38 39const ( 40 defaultTimeout = 10 * time.Second 41 edsName = "experimental_eds" 42) 43 44var ( 45 newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerImplInterface { 46 return newEDSBalancerImpl(cc, loadStore) 47 } 48) 49 50func init() { 51 balancer.Register(&edsBalancerBuilder{}) 52} 53 54type edsBalancerBuilder struct{} 55 56// Build helps implement the balancer.Builder interface. 57func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 58 ctx, cancel := context.WithCancel(context.Background()) 59 x := &edsBalancer{ 60 ctx: ctx, 61 cancel: cancel, 62 cc: cc, 63 buildOpts: opts, 64 grpcUpdate: make(chan interface{}), 65 xdsClientUpdate: make(chan interface{}), 66 } 67 loadStore := lrs.NewStore() 68 x.edsImpl = newEDSBalancer(x.cc, loadStore) 69 x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore) 70 go x.run() 71 return x 72} 73 74func (b *edsBalancerBuilder) Name() string { 75 return edsName 76} 77 78func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { 79 var cfg EDSConfig 80 if err := json.Unmarshal(c, &cfg); err != nil { 81 return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err) 82 } 83 return &cfg, nil 84} 85 86// edsBalancerImplInterface defines the interface that edsBalancerImpl must 87// implement to communicate with edsBalancer. 88// 89// It's implemented by the real eds balancer and a fake testing eds balancer. 90type edsBalancerImplInterface interface { 91 // HandleEDSResponse passes the received EDS message from traffic director to eds balancer. 92 HandleEDSResponse(edsResp *xdsclient.EDSUpdate) 93 // HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use. 94 HandleChildPolicy(name string, config json.RawMessage) 95 // HandleSubConnStateChange handles state change for SubConn. 96 HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) 97 // Close closes the eds balancer. 98 Close() 99} 100 101var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer 102 103// edsBalancer manages xdsClient and the actual EDS balancer implementation that 104// does load balancing. 105// 106// It currently has only an edsBalancer. Later, we may add fallback. 107type edsBalancer struct { 108 cc balancer.ClientConn // *xdsClientConn 109 buildOpts balancer.BuildOptions 110 ctx context.Context 111 cancel context.CancelFunc 112 113 // edsBalancer continuously monitor the channels below, and will handle events from them in sync. 114 grpcUpdate chan interface{} 115 xdsClientUpdate chan interface{} 116 117 client *xdsclientWrapper // may change when passed a different service config 118 config *EDSConfig // may change when passed a different service config 119 edsImpl edsBalancerImplInterface 120} 121 122// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc, 123// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It 124// exits when edsBalancer is closed. 125func (x *edsBalancer) run() { 126 for { 127 select { 128 case update := <-x.grpcUpdate: 129 x.handleGRPCUpdate(update) 130 case update := <-x.xdsClientUpdate: 131 x.handleXDSClientUpdate(update) 132 case <-x.ctx.Done(): 133 if x.client != nil { 134 x.client.close() 135 } 136 if x.edsImpl != nil { 137 x.edsImpl.Close() 138 } 139 return 140 } 141 } 142} 143 144func (x *edsBalancer) handleGRPCUpdate(update interface{}) { 145 switch u := update.(type) { 146 case *subConnStateUpdate: 147 if x.edsImpl != nil { 148 x.edsImpl.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) 149 } 150 case *balancer.ClientConnState: 151 cfg, _ := u.BalancerConfig.(*EDSConfig) 152 if cfg == nil { 153 // service config parsing failed. should never happen. 154 return 155 } 156 157 x.client.handleUpdate(cfg, u.ResolverState.Attributes) 158 159 if x.config == nil { 160 x.config = cfg 161 return 162 } 163 164 // We will update the edsImpl with the new child policy, if we got a 165 // different one. 166 if x.edsImpl != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) { 167 if cfg.ChildPolicy != nil { 168 x.edsImpl.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) 169 } else { 170 x.edsImpl.HandleChildPolicy(roundrobin.Name, nil) 171 } 172 } 173 174 x.config = cfg 175 default: 176 // unreachable path 177 panic("wrong update type") 178 } 179} 180 181func (x *edsBalancer) handleXDSClientUpdate(update interface{}) { 182 switch u := update.(type) { 183 // TODO: this func should accept (*xdsclient.EDSUpdate, error), and process 184 // the error, instead of having a separate loseContact signal. 185 case *xdsclient.EDSUpdate: 186 x.edsImpl.HandleEDSResponse(u) 187 case *loseContact: 188 // loseContact can be useful for going into fallback. 189 default: 190 panic("unexpected xds client update type") 191 } 192} 193 194type subConnStateUpdate struct { 195 sc balancer.SubConn 196 state balancer.SubConnState 197} 198 199func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 200 grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange") 201} 202 203func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 204 grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs") 205} 206 207func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { 208 update := &subConnStateUpdate{ 209 sc: sc, 210 state: state, 211 } 212 select { 213 case x.grpcUpdate <- update: 214 case <-x.ctx.Done(): 215 } 216} 217 218func (x *edsBalancer) ResolverError(error) { 219 // TODO: Need to distinguish between connection errors and resource removed 220 // errors. For the former, we will need to handle it later on for fallback. 221 // For the latter, handle it by stopping the watch, closing sub-balancers 222 // and pickers. 223} 224 225func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { 226 select { 227 case x.grpcUpdate <- &s: 228 case <-x.ctx.Done(): 229 } 230 return nil 231} 232 233func (x *edsBalancer) handleEDSUpdate(resp *xdsclient.EDSUpdate) error { 234 // TODO: this function should take (resp, error), and send them together on 235 // the channel. There doesn't need to be a separate `loseContact` function. 236 select { 237 case x.xdsClientUpdate <- resp: 238 case <-x.ctx.Done(): 239 } 240 241 return nil 242} 243 244type loseContact struct { 245} 246 247// TODO: delete loseContact when handleEDSUpdate takes (resp, error). 248func (x *edsBalancer) loseContact() { 249 select { 250 case x.xdsClientUpdate <- &loseContact{}: 251 case <-x.ctx.Done(): 252 } 253} 254 255func (x *edsBalancer) Close() { 256 x.cancel() 257} 258