1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "sync" 23 "sync/atomic" 24 25 "golang.org/x/net/context" 26 "google.golang.org/grpc/balancer" 27 "google.golang.org/grpc/codes" 28 lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" 29 "google.golang.org/grpc/status" 30) 31 32type rpcStats struct { 33 NumCallsStarted int64 34 NumCallsFinished int64 35 NumCallsFinishedWithDropForRateLimiting int64 36 NumCallsFinishedWithDropForLoadBalancing int64 37 NumCallsFinishedWithClientFailedToSend int64 38 NumCallsFinishedKnownReceived int64 39} 40 41// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats. 42func (s *rpcStats) toClientStats() *lbpb.ClientStats { 43 stats := &lbpb.ClientStats{ 44 NumCallsStarted: atomic.SwapInt64(&s.NumCallsStarted, 0), 45 NumCallsFinished: atomic.SwapInt64(&s.NumCallsFinished, 0), 46 NumCallsFinishedWithDropForRateLimiting: atomic.SwapInt64(&s.NumCallsFinishedWithDropForRateLimiting, 0), 47 NumCallsFinishedWithDropForLoadBalancing: atomic.SwapInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 0), 48 NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.NumCallsFinishedWithClientFailedToSend, 0), 49 NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.NumCallsFinishedKnownReceived, 0), 50 } 51 return stats 52} 53 54func (s *rpcStats) dropForRateLimiting() { 55 atomic.AddInt64(&s.NumCallsStarted, 1) 56 atomic.AddInt64(&s.NumCallsFinishedWithDropForRateLimiting, 1) 57 atomic.AddInt64(&s.NumCallsFinished, 1) 58} 59 60func (s *rpcStats) dropForLoadBalancing() { 61 atomic.AddInt64(&s.NumCallsStarted, 1) 62 atomic.AddInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 1) 63 atomic.AddInt64(&s.NumCallsFinished, 1) 64} 65 66func (s *rpcStats) failedToSend() { 67 atomic.AddInt64(&s.NumCallsStarted, 1) 68 atomic.AddInt64(&s.NumCallsFinishedWithClientFailedToSend, 1) 69 atomic.AddInt64(&s.NumCallsFinished, 1) 70} 71 72func (s *rpcStats) knownReceived() { 73 atomic.AddInt64(&s.NumCallsStarted, 1) 74 atomic.AddInt64(&s.NumCallsFinishedKnownReceived, 1) 75 atomic.AddInt64(&s.NumCallsFinished, 1) 76} 77 78type errPicker struct { 79 // Pick always returns this err. 80 err error 81} 82 83func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { 84 return nil, nil, p.err 85} 86 87// rrPicker does roundrobin on subConns. It's typically used when there's no 88// response from remote balancer, and grpclb falls back to the resolved 89// backends. 90// 91// It guaranteed that len(subConns) > 0. 92type rrPicker struct { 93 mu sync.Mutex 94 subConns []balancer.SubConn // The subConns that were READY when taking the snapshot. 95 subConnsNext int 96} 97 98func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { 99 p.mu.Lock() 100 defer p.mu.Unlock() 101 sc := p.subConns[p.subConnsNext] 102 p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns) 103 return sc, nil, nil 104} 105 106// lbPicker does two layers of picks: 107// 108// First layer: roundrobin on all servers in serverList, including drops and backends. 109// - If it picks a drop, the RPC will fail as being dropped. 110// - If it picks a backend, do a second layer pick to pick the real backend. 111// 112// Second layer: roundrobin on all READY backends. 113// 114// It's guaranteed that len(serverList) > 0. 115type lbPicker struct { 116 mu sync.Mutex 117 serverList []*lbpb.Server 118 serverListNext int 119 subConns []balancer.SubConn // The subConns that were READY when taking the snapshot. 120 subConnsNext int 121 122 stats *rpcStats 123} 124 125func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { 126 p.mu.Lock() 127 defer p.mu.Unlock() 128 129 // Layer one roundrobin on serverList. 130 s := p.serverList[p.serverListNext] 131 p.serverListNext = (p.serverListNext + 1) % len(p.serverList) 132 133 // If it's a drop, return an error and fail the RPC. 134 if s.DropForRateLimiting { 135 p.stats.dropForRateLimiting() 136 return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb") 137 } 138 if s.DropForLoadBalancing { 139 p.stats.dropForLoadBalancing() 140 return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb") 141 } 142 143 // If not a drop but there's no ready subConns. 144 if len(p.subConns) <= 0 { 145 return nil, nil, balancer.ErrNoSubConnAvailable 146 } 147 148 // Return the next ready subConn in the list, also collect rpc stats. 149 sc := p.subConns[p.subConnsNext] 150 p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns) 151 done := func(info balancer.DoneInfo) { 152 if !info.BytesSent { 153 p.stats.failedToSend() 154 } else if info.BytesReceived { 155 p.stats.knownReceived() 156 } 157 } 158 return sc, done, nil 159} 160