1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"fmt"
23	"net"
24	"reflect"
25	"time"
26
27	"golang.org/x/net/context"
28	"google.golang.org/grpc/balancer"
29	"google.golang.org/grpc/channelz"
30
31	"google.golang.org/grpc/connectivity"
32	lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
33	"google.golang.org/grpc/grpclog"
34	"google.golang.org/grpc/metadata"
35	"google.golang.org/grpc/resolver"
36)
37
38// processServerList updates balaner's internal state, create/remove SubConns
39// and regenerates picker using the received serverList.
40func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
41	grpclog.Infof("lbBalancer: processing server list: %+v", l)
42	lb.mu.Lock()
43	defer lb.mu.Unlock()
44
45	// Set serverListReceived to true so fallback will not take effect if it has
46	// not hit timeout.
47	lb.serverListReceived = true
48
49	// If the new server list == old server list, do nothing.
50	if reflect.DeepEqual(lb.fullServerList, l.Servers) {
51		grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
52		return
53	}
54	lb.fullServerList = l.Servers
55
56	var backendAddrs []resolver.Address
57	for _, s := range l.Servers {
58		if s.DropForLoadBalancing || s.DropForRateLimiting {
59			continue
60		}
61
62		md := metadata.Pairs(lbTokeyKey, s.LoadBalanceToken)
63		ip := net.IP(s.IpAddress)
64		ipStr := ip.String()
65		if ip.To4() == nil {
66			// Add square brackets to ipv6 addresses, otherwise net.Dial() and
67			// net.SplitHostPort() will return too many colons error.
68			ipStr = fmt.Sprintf("[%s]", ipStr)
69		}
70		addr := resolver.Address{
71			Addr:     fmt.Sprintf("%s:%d", ipStr, s.Port),
72			Metadata: &md,
73		}
74
75		backendAddrs = append(backendAddrs, addr)
76	}
77
78	// Call refreshSubConns to create/remove SubConns.
79	lb.refreshSubConns(backendAddrs)
80	// Regenerate and update picker no matter if there's update on backends (if
81	// any SubConn will be newed/removed). Because since the full serverList was
82	// different, there might be updates in drops or pick weights(different
83	// number of duplicates). We need to update picker with the fulllist.
84	//
85	// Now with cache, even if SubConn was newed/removed, there might be no
86	// state changes.
87	lb.regeneratePicker()
88	lb.cc.UpdateBalancerState(lb.state, lb.picker)
89}
90
91// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
92// indicating whether the backendAddrs are different from the cached
93// backendAddrs (whether any SubConn was newed/removed).
94// Caller must hold lb.mu.
95func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
96	lb.backendAddrs = nil
97	var backendsUpdated bool
98	// addrsSet is the set converted from backendAddrs, it's used to quick
99	// lookup for an address.
100	addrsSet := make(map[resolver.Address]struct{})
101	// Create new SubConns.
102	for _, addr := range backendAddrs {
103		addrWithoutMD := addr
104		addrWithoutMD.Metadata = nil
105		addrsSet[addrWithoutMD] = struct{}{}
106		lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
107
108		if _, ok := lb.subConns[addrWithoutMD]; !ok {
109			backendsUpdated = true
110
111			// Use addrWithMD to create the SubConn.
112			sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
113			if err != nil {
114				grpclog.Warningf("roundrobinBalancer: failed to create new SubConn: %v", err)
115				continue
116			}
117			lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
118			if _, ok := lb.scStates[sc]; !ok {
119				// Only set state of new sc to IDLE. The state could already be
120				// READY for cached SubConns.
121				lb.scStates[sc] = connectivity.Idle
122			}
123			sc.Connect()
124		}
125	}
126
127	for a, sc := range lb.subConns {
128		// a was removed by resolver.
129		if _, ok := addrsSet[a]; !ok {
130			backendsUpdated = true
131
132			lb.cc.RemoveSubConn(sc)
133			delete(lb.subConns, a)
134			// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
135			// The entry will be deleted in HandleSubConnStateChange.
136		}
137	}
138
139	return backendsUpdated
140}
141
142func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
143	for {
144		reply, err := s.Recv()
145		if err != nil {
146			return fmt.Errorf("grpclb: failed to recv server list: %v", err)
147		}
148		if serverList := reply.GetServerList(); serverList != nil {
149			lb.processServerList(serverList)
150		}
151	}
152}
153
154func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
155	ticker := time.NewTicker(interval)
156	defer ticker.Stop()
157	for {
158		select {
159		case <-ticker.C:
160		case <-s.Context().Done():
161			return
162		}
163		stats := lb.clientStats.toClientStats()
164		t := time.Now()
165		stats.Timestamp = &lbpb.Timestamp{
166			Seconds: t.Unix(),
167			Nanos:   int32(t.Nanosecond()),
168		}
169		if err := s.Send(&lbpb.LoadBalanceRequest{
170			LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
171				ClientStats: stats,
172			},
173		}); err != nil {
174			return
175		}
176	}
177}
178
179func (lb *lbBalancer) callRemoteBalancer() error {
180	lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
181	ctx, cancel := context.WithCancel(context.Background())
182	defer cancel()
183	stream, err := lbClient.BalanceLoad(ctx, FailFast(false))
184	if err != nil {
185		return fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
186	}
187
188	// grpclb handshake on the stream.
189	initReq := &lbpb.LoadBalanceRequest{
190		LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
191			InitialRequest: &lbpb.InitialLoadBalanceRequest{
192				Name: lb.target,
193			},
194		},
195	}
196	if err := stream.Send(initReq); err != nil {
197		return fmt.Errorf("grpclb: failed to send init request: %v", err)
198	}
199	reply, err := stream.Recv()
200	if err != nil {
201		return fmt.Errorf("grpclb: failed to recv init response: %v", err)
202	}
203	initResp := reply.GetInitialResponse()
204	if initResp == nil {
205		return fmt.Errorf("grpclb: reply from remote balancer did not include initial response")
206	}
207	if initResp.LoadBalancerDelegate != "" {
208		return fmt.Errorf("grpclb: Delegation is not supported")
209	}
210
211	go func() {
212		if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
213			lb.sendLoadReport(stream, d)
214		}
215	}()
216	return lb.readServerList(stream)
217}
218
219func (lb *lbBalancer) watchRemoteBalancer() {
220	for {
221		err := lb.callRemoteBalancer()
222		select {
223		case <-lb.doneCh:
224			return
225		default:
226			if err != nil {
227				grpclog.Error(err)
228			}
229		}
230
231	}
232}
233
234func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
235	var dopts []DialOption
236	if creds := lb.opt.DialCreds; creds != nil {
237		if err := creds.OverrideServerName(remoteLBName); err == nil {
238			dopts = append(dopts, WithTransportCredentials(creds))
239		} else {
240			grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
241			dopts = append(dopts, WithInsecure())
242		}
243	} else {
244		dopts = append(dopts, WithInsecure())
245	}
246	if lb.opt.Dialer != nil {
247		// WithDialer takes a different type of function, so we instead use a
248		// special DialOption here.
249		dopts = append(dopts, withContextDialer(lb.opt.Dialer))
250	}
251	// Explicitly set pickfirst as the balancer.
252	dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
253	dopts = append(dopts, withResolverBuilder(lb.manualResolver))
254	if channelz.IsOn() {
255		dopts = append(dopts, WithChannelzParentID(lb.opt.ChannelzParentID))
256	}
257
258	// DialContext using manualResolver.Scheme, which is a random scheme generated
259	// when init grpclb. The target name is not important.
260	cc, err := DialContext(context.Background(), "grpclb:///grpclb.server", dopts...)
261	if err != nil {
262		grpclog.Fatalf("failed to dial: %v", err)
263	}
264	lb.ccRemoteLB = cc
265	go lb.watchRemoteBalancer()
266}
267