1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "fmt" 23 "net" 24 "reflect" 25 "time" 26 27 "golang.org/x/net/context" 28 "google.golang.org/grpc/balancer" 29 "google.golang.org/grpc/channelz" 30 31 "google.golang.org/grpc/connectivity" 32 lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" 33 "google.golang.org/grpc/grpclog" 34 "google.golang.org/grpc/metadata" 35 "google.golang.org/grpc/resolver" 36) 37 38// processServerList updates balaner's internal state, create/remove SubConns 39// and regenerates picker using the received serverList. 40func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { 41 grpclog.Infof("lbBalancer: processing server list: %+v", l) 42 lb.mu.Lock() 43 defer lb.mu.Unlock() 44 45 // Set serverListReceived to true so fallback will not take effect if it has 46 // not hit timeout. 47 lb.serverListReceived = true 48 49 // If the new server list == old server list, do nothing. 50 if reflect.DeepEqual(lb.fullServerList, l.Servers) { 51 grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring") 52 return 53 } 54 lb.fullServerList = l.Servers 55 56 var backendAddrs []resolver.Address 57 for _, s := range l.Servers { 58 if s.DropForLoadBalancing || s.DropForRateLimiting { 59 continue 60 } 61 62 md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken) 63 ip := net.IP(s.IpAddress) 64 ipStr := ip.String() 65 if ip.To4() == nil { 66 // Add square brackets to ipv6 addresses, otherwise net.Dial() and 67 // net.SplitHostPort() will return too many colons error. 68 ipStr = fmt.Sprintf("[%s]", ipStr) 69 } 70 addr := resolver.Address{ 71 Addr: fmt.Sprintf("%s:%d", ipStr, s.Port), 72 Metadata: &md, 73 } 74 75 backendAddrs = append(backendAddrs, addr) 76 } 77 78 // Call refreshSubConns to create/remove SubConns. 79 lb.refreshSubConns(backendAddrs) 80 // Regenerate and update picker no matter if there's update on backends (if 81 // any SubConn will be newed/removed). Because since the full serverList was 82 // different, there might be updates in drops or pick weights(different 83 // number of duplicates). We need to update picker with the fulllist. 84 // 85 // Now with cache, even if SubConn was newed/removed, there might be no 86 // state changes. 87 lb.regeneratePicker() 88 lb.cc.UpdateBalancerState(lb.state, lb.picker) 89} 90 91// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool 92// indicating whether the backendAddrs are different from the cached 93// backendAddrs (whether any SubConn was newed/removed). 94// Caller must hold lb.mu. 95func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool { 96 lb.backendAddrs = nil 97 var backendsUpdated bool 98 // addrsSet is the set converted from backendAddrs, it's used to quick 99 // lookup for an address. 100 addrsSet := make(map[resolver.Address]struct{}) 101 // Create new SubConns. 102 for _, addr := range backendAddrs { 103 addrWithoutMD := addr 104 addrWithoutMD.Metadata = nil 105 addrsSet[addrWithoutMD] = struct{}{} 106 lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD) 107 108 if _, ok := lb.subConns[addrWithoutMD]; !ok { 109 backendsUpdated = true 110 111 // Use addrWithMD to create the SubConn. 112 sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) 113 if err != nil { 114 grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err) 115 continue 116 } 117 lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map. 118 if _, ok := lb.scStates[sc]; !ok { 119 // Only set state of new sc to IDLE. The state could already be 120 // READY for cached SubConns. 121 lb.scStates[sc] = connectivity.Idle 122 } 123 sc.Connect() 124 } 125 } 126 127 for a, sc := range lb.subConns { 128 // a was removed by resolver. 129 if _, ok := addrsSet[a]; !ok { 130 backendsUpdated = true 131 132 lb.cc.RemoveSubConn(sc) 133 delete(lb.subConns, a) 134 // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. 135 // The entry will be deleted in HandleSubConnStateChange. 136 } 137 } 138 139 return backendsUpdated 140} 141 142func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error { 143 for { 144 reply, err := s.Recv() 145 if err != nil { 146 return fmt.Errorf("grpclb: failed to recv server list: %v", err) 147 } 148 if serverList := reply.GetServerList(); serverList != nil { 149 lb.processServerList(serverList) 150 } 151 } 152} 153 154func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) { 155 ticker := time.NewTicker(interval) 156 defer ticker.Stop() 157 for { 158 select { 159 case <-ticker.C: 160 case <-s.Context().Done(): 161 return 162 } 163 stats := lb.clientStats.toClientStats() 164 t := time.Now() 165 stats.Timestamp = &lbpb.Timestamp{ 166 Seconds: t.Unix(), 167 Nanos: int32(t.Nanosecond()), 168 } 169 if err := s.Send(&lbpb.LoadBalanceRequest{ 170 LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{ 171 ClientStats: stats, 172 }, 173 }); err != nil { 174 return 175 } 176 } 177} 178 179func (lb *lbBalancer) callRemoteBalancer() error { 180 lbClient := &loadBalancerClient{cc: lb.ccRemoteLB} 181 ctx, cancel := context.WithCancel(context.Background()) 182 defer cancel() 183 stream, err := lbClient.BalanceLoad(ctx, FailFast(false)) 184 if err != nil { 185 return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) 186 } 187 188 // grpclb handshake on the stream. 189 initReq := &lbpb.LoadBalanceRequest{ 190 LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ 191 InitialRequest: &lbpb.InitialLoadBalanceRequest{ 192 Name: lb.target, 193 }, 194 }, 195 } 196 if err := stream.Send(initReq); err != nil { 197 return fmt.Errorf("grpclb: failed to send init request: %v", err) 198 } 199 reply, err := stream.Recv() 200 if err != nil { 201 return fmt.Errorf("grpclb: failed to recv init response: %v", err) 202 } 203 initResp := reply.GetInitialResponse() 204 if initResp == nil { 205 return fmt.Errorf("grpclb: reply from remote balancer did not include initial response") 206 } 207 if initResp.LoadBalancerDelegate != "" { 208 return fmt.Errorf("grpclb: Delegation is not supported") 209 } 210 211 go func() { 212 if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { 213 lb.sendLoadReport(stream, d) 214 } 215 }() 216 return lb.readServerList(stream) 217} 218 219func (lb *lbBalancer) watchRemoteBalancer() { 220 for { 221 err := lb.callRemoteBalancer() 222 select { 223 case <-lb.doneCh: 224 return 225 default: 226 if err != nil { 227 grpclog.Error(err) 228 } 229 } 230 231 } 232} 233 234func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { 235 var dopts []DialOption 236 if creds := lb.opt.DialCreds; creds != nil { 237 if err := creds.OverrideServerName(remoteLBName); err == nil { 238 dopts = append(dopts, WithTransportCredentials(creds)) 239 } else { 240 grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err) 241 dopts = append(dopts, WithInsecure()) 242 } 243 } else { 244 dopts = append(dopts, WithInsecure()) 245 } 246 if lb.opt.Dialer != nil { 247 // WithDialer takes a different type of function, so we instead use a 248 // special DialOption here. 249 dopts = append(dopts, withContextDialer(lb.opt.Dialer)) 250 } 251 // Explicitly set pickfirst as the balancer. 252 dopts = append(dopts, WithBalancerName(PickFirstBalancerName)) 253 dopts = append(dopts, withResolverBuilder(lb.manualResolver)) 254 if channelz.IsOn() { 255 dopts = append(dopts, WithChannelzParentID(lb.opt.ChannelzParentID)) 256 } 257 258 // DialContext using manualResolver.Scheme, which is a random scheme generated 259 // when init grpclb. The target name is not important. 260 cc, err := DialContext(context.Background(), "grpclb:///grpclb.server", dopts...) 261 if err != nil { 262 grpclog.Fatalf("failed to dial: %v", err) 263 } 264 lb.ccRemoteLB = cc 265 go lb.watchRemoteBalancer() 266} 267