1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package edsbalancer contains EDS balancer implementation. 20package edsbalancer 21 22import ( 23 "context" 24 "encoding/json" 25 "fmt" 26 "time" 27 28 "github.com/google/go-cmp/cmp" 29 "google.golang.org/grpc/balancer" 30 "google.golang.org/grpc/balancer/roundrobin" 31 "google.golang.org/grpc/connectivity" 32 "google.golang.org/grpc/internal/buffer" 33 "google.golang.org/grpc/internal/grpclog" 34 "google.golang.org/grpc/resolver" 35 "google.golang.org/grpc/serviceconfig" 36 "google.golang.org/grpc/xds/internal/balancer/lrs" 37 xdsclient "google.golang.org/grpc/xds/internal/client" 38) 39 40const ( 41 defaultTimeout = 10 * time.Second 42 edsName = "eds_experimental" 43) 44 45var ( 46 newEDSBalancer = func(cc balancer.ClientConn, enqueueState func(priorityType, balancer.State), loadStore lrs.Store, logger *grpclog.PrefixLogger) edsBalancerImplInterface { 47 return newEDSBalancerImpl(cc, enqueueState, loadStore, logger) 48 } 49) 50 51func init() { 52 balancer.Register(&edsBalancerBuilder{}) 53} 54 55type edsBalancerBuilder struct{} 56 57// Build helps implement the balancer.Builder interface. 58func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { 59 ctx, cancel := context.WithCancel(context.Background()) 60 x := &edsBalancer{ 61 ctx: ctx, 62 cancel: cancel, 63 cc: cc, 64 buildOpts: opts, 65 grpcUpdate: make(chan interface{}), 66 xdsClientUpdate: make(chan interface{}), 67 childPolicyUpdate: buffer.NewUnbounded(), 68 } 69 loadStore := lrs.NewStore() 70 x.logger = grpclog.NewPrefixLogger(loggingPrefix(x)) 71 x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, loadStore, x.logger) 72 x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore, x.logger) 73 x.logger.Infof("Created") 74 go x.run() 75 return x 76} 77 78func (b *edsBalancerBuilder) Name() string { 79 return edsName 80} 81 82func (b *edsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { 83 var cfg EDSConfig 84 if err := json.Unmarshal(c, &cfg); err != nil { 85 return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err) 86 } 87 return &cfg, nil 88} 89 90// edsBalancerImplInterface defines the interface that edsBalancerImpl must 91// implement to communicate with edsBalancer. 92// 93// It's implemented by the real eds balancer and a fake testing eds balancer. 94// 95// TODO: none of the methods in this interface needs to be exported. 96type edsBalancerImplInterface interface { 97 // HandleEDSResponse passes the received EDS message from traffic director to eds balancer. 98 HandleEDSResponse(edsResp *xdsclient.EDSUpdate) 99 // HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use. 100 HandleChildPolicy(name string, config json.RawMessage) 101 // HandleSubConnStateChange handles state change for SubConn. 102 HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) 103 // updateState handle a balancer state update from the priority. 104 updateState(priority priorityType, s balancer.State) 105 // Close closes the eds balancer. 106 Close() 107} 108 109var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer 110 111// edsBalancer manages xdsClient and the actual EDS balancer implementation that 112// does load balancing. 113// 114// It currently has only an edsBalancer. Later, we may add fallback. 115type edsBalancer struct { 116 cc balancer.ClientConn // *xdsClientConn 117 buildOpts balancer.BuildOptions 118 ctx context.Context 119 cancel context.CancelFunc 120 121 logger *grpclog.PrefixLogger 122 123 // edsBalancer continuously monitor the channels below, and will handle events from them in sync. 124 grpcUpdate chan interface{} 125 xdsClientUpdate chan interface{} 126 childPolicyUpdate *buffer.Unbounded 127 128 client *xdsclientWrapper // may change when passed a different service config 129 config *EDSConfig // may change when passed a different service config 130 edsImpl edsBalancerImplInterface 131} 132 133// run gets executed in a goroutine once edsBalancer is created. It monitors updates from grpc, 134// xdsClient and load balancer. It synchronizes the operations that happen inside edsBalancer. It 135// exits when edsBalancer is closed. 136func (x *edsBalancer) run() { 137 for { 138 select { 139 case update := <-x.grpcUpdate: 140 x.handleGRPCUpdate(update) 141 case update := <-x.xdsClientUpdate: 142 x.handleXDSClientUpdate(update) 143 case update := <-x.childPolicyUpdate.Get(): 144 x.childPolicyUpdate.Load() 145 u := update.(*balancerStateWithPriority) 146 x.edsImpl.updateState(u.priority, u.s) 147 case <-x.ctx.Done(): 148 if x.client != nil { 149 x.client.close() 150 } 151 if x.edsImpl != nil { 152 x.edsImpl.Close() 153 } 154 return 155 } 156 } 157} 158 159func (x *edsBalancer) handleGRPCUpdate(update interface{}) { 160 switch u := update.(type) { 161 case *subConnStateUpdate: 162 if x.edsImpl != nil { 163 x.edsImpl.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) 164 } 165 case *balancer.ClientConnState: 166 x.logger.Infof("Receive update from resolver, balancer config: %+v", u.BalancerConfig) 167 cfg, _ := u.BalancerConfig.(*EDSConfig) 168 if cfg == nil { 169 // service config parsing failed. should never happen. 170 return 171 } 172 173 x.client.handleUpdate(cfg, u.ResolverState.Attributes) 174 175 if x.config == nil { 176 x.config = cfg 177 return 178 } 179 180 // We will update the edsImpl with the new child policy, if we got a 181 // different one. 182 if x.edsImpl != nil && !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) { 183 if cfg.ChildPolicy != nil { 184 x.edsImpl.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) 185 } else { 186 x.edsImpl.HandleChildPolicy(roundrobin.Name, nil) 187 } 188 } 189 190 x.config = cfg 191 default: 192 // unreachable path 193 panic("wrong update type") 194 } 195} 196 197func (x *edsBalancer) handleXDSClientUpdate(update interface{}) { 198 switch u := update.(type) { 199 // TODO: this func should accept (*xdsclient.EDSUpdate, error), and process 200 // the error, instead of having a separate loseContact signal. 201 case *xdsclient.EDSUpdate: 202 x.edsImpl.HandleEDSResponse(u) 203 case *loseContact: 204 // loseContact can be useful for going into fallback. 205 default: 206 panic("unexpected xds client update type") 207 } 208} 209 210type subConnStateUpdate struct { 211 sc balancer.SubConn 212 state balancer.SubConnState 213} 214 215func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { 216 x.logger.Errorf("UpdateSubConnState should be called instead of HandleSubConnStateChange") 217} 218 219func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { 220 x.logger.Errorf("UpdateResolverState should be called instead of HandleResolvedAddrs") 221} 222 223func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { 224 update := &subConnStateUpdate{ 225 sc: sc, 226 state: state, 227 } 228 select { 229 case x.grpcUpdate <- update: 230 case <-x.ctx.Done(): 231 } 232} 233 234func (x *edsBalancer) ResolverError(error) { 235 // TODO: Need to distinguish between connection errors and resource removed 236 // errors. For the former, we will need to handle it later on for fallback. 237 // For the latter, handle it by stopping the watch, closing sub-balancers 238 // and pickers. 239} 240 241func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { 242 select { 243 case x.grpcUpdate <- &s: 244 case <-x.ctx.Done(): 245 } 246 return nil 247} 248 249func (x *edsBalancer) handleEDSUpdate(resp *xdsclient.EDSUpdate) error { 250 // TODO: this function should take (resp, error), and send them together on 251 // the channel. There doesn't need to be a separate `loseContact` function. 252 select { 253 case x.xdsClientUpdate <- resp: 254 case <-x.ctx.Done(): 255 } 256 257 return nil 258} 259 260type loseContact struct { 261} 262 263// TODO: delete loseContact when handleEDSUpdate takes (resp, error). 264func (x *edsBalancer) loseContact() { 265 select { 266 case x.xdsClientUpdate <- &loseContact{}: 267 case <-x.ctx.Done(): 268 } 269} 270 271type balancerStateWithPriority struct { 272 priority priorityType 273 s balancer.State 274} 275 276func (x *edsBalancer) enqueueChildBalancerState(p priorityType, s balancer.State) { 277 x.childPolicyUpdate.Put(&balancerStateWithPriority{ 278 priority: p, 279 s: s, 280 }) 281} 282 283func (x *edsBalancer) Close() { 284 x.cancel() 285 x.logger.Infof("Shutdown") 286} 287