1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"sync"
23	"sync/atomic"
24
25	"golang.org/x/net/context"
26	"google.golang.org/grpc/balancer"
27	"google.golang.org/grpc/codes"
28	lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
29	"google.golang.org/grpc/status"
30)
31
32type rpcStats struct {
33	NumCallsStarted                          int64
34	NumCallsFinished                         int64
35	NumCallsFinishedWithDropForRateLimiting  int64
36	NumCallsFinishedWithDropForLoadBalancing int64
37	NumCallsFinishedWithClientFailedToSend   int64
38	NumCallsFinishedKnownReceived            int64
39}
40
41// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
42func (s *rpcStats) toClientStats() *lbpb.ClientStats {
43	stats := &lbpb.ClientStats{
44		NumCallsStarted:                          atomic.SwapInt64(&s.NumCallsStarted, 0),
45		NumCallsFinished:                         atomic.SwapInt64(&s.NumCallsFinished, 0),
46		NumCallsFinishedWithDropForRateLimiting:  atomic.SwapInt64(&s.NumCallsFinishedWithDropForRateLimiting, 0),
47		NumCallsFinishedWithDropForLoadBalancing: atomic.SwapInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 0),
48		NumCallsFinishedWithClientFailedToSend:   atomic.SwapInt64(&s.NumCallsFinishedWithClientFailedToSend, 0),
49		NumCallsFinishedKnownReceived:            atomic.SwapInt64(&s.NumCallsFinishedKnownReceived, 0),
50	}
51	return stats
52}
53
54func (s *rpcStats) dropForRateLimiting() {
55	atomic.AddInt64(&s.NumCallsStarted, 1)
56	atomic.AddInt64(&s.NumCallsFinishedWithDropForRateLimiting, 1)
57	atomic.AddInt64(&s.NumCallsFinished, 1)
58}
59
60func (s *rpcStats) dropForLoadBalancing() {
61	atomic.AddInt64(&s.NumCallsStarted, 1)
62	atomic.AddInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 1)
63	atomic.AddInt64(&s.NumCallsFinished, 1)
64}
65
66func (s *rpcStats) failedToSend() {
67	atomic.AddInt64(&s.NumCallsStarted, 1)
68	atomic.AddInt64(&s.NumCallsFinishedWithClientFailedToSend, 1)
69	atomic.AddInt64(&s.NumCallsFinished, 1)
70}
71
72func (s *rpcStats) knownReceived() {
73	atomic.AddInt64(&s.NumCallsStarted, 1)
74	atomic.AddInt64(&s.NumCallsFinishedKnownReceived, 1)
75	atomic.AddInt64(&s.NumCallsFinished, 1)
76}
77
78type errPicker struct {
79	// Pick always returns this err.
80	err error
81}
82
83func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
84	return nil, nil, p.err
85}
86
87// rrPicker does roundrobin on subConns. It's typically used when there's no
88// response from remote balancer, and grpclb falls back to the resolved
89// backends.
90//
91// It guaranteed that len(subConns) > 0.
92type rrPicker struct {
93	mu           sync.Mutex
94	subConns     []balancer.SubConn // The subConns that were READY when taking the snapshot.
95	subConnsNext int
96}
97
98func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
99	p.mu.Lock()
100	defer p.mu.Unlock()
101	sc := p.subConns[p.subConnsNext]
102	p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
103	return sc, nil, nil
104}
105
106// lbPicker does two layers of picks:
107//
108// First layer: roundrobin on all servers in serverList, including drops and backends.
109// - If it picks a drop, the RPC will fail as being dropped.
110// - If it picks a backend, do a second layer pick to pick the real backend.
111//
112// Second layer: roundrobin on all READY backends.
113//
114// It's guaranteed that len(serverList) > 0.
115type lbPicker struct {
116	mu             sync.Mutex
117	serverList     []*lbpb.Server
118	serverListNext int
119	subConns       []balancer.SubConn // The subConns that were READY when taking the snapshot.
120	subConnsNext   int
121
122	stats *rpcStats
123}
124
125func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
126	p.mu.Lock()
127	defer p.mu.Unlock()
128
129	// Layer one roundrobin on serverList.
130	s := p.serverList[p.serverListNext]
131	p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
132
133	// If it's a drop, return an error and fail the RPC.
134	if s.DropForRateLimiting {
135		p.stats.dropForRateLimiting()
136		return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
137	}
138	if s.DropForLoadBalancing {
139		p.stats.dropForLoadBalancing()
140		return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb")
141	}
142
143	// If not a drop but there's no ready subConns.
144	if len(p.subConns) <= 0 {
145		return nil, nil, balancer.ErrNoSubConnAvailable
146	}
147
148	// Return the next ready subConn in the list, also collect rpc stats.
149	sc := p.subConns[p.subConnsNext]
150	p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
151	done := func(info balancer.DoneInfo) {
152		if !info.BytesSent {
153			p.stats.failedToSend()
154		} else if info.BytesReceived {
155			p.stats.knownReceived()
156		}
157	}
158	return sc, done, nil
159}
160