1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// Package lrs implements load reporting service for xds balancer.
18package lrs
19
20import (
21	"context"
22	"sync"
23	"sync/atomic"
24	"time"
25
26	corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
27	endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
28	lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
29	lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2"
30	"github.com/golang/protobuf/ptypes"
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/grpclog"
33	"google.golang.org/grpc/internal/backoff"
34	"google.golang.org/grpc/xds/internal"
35)
36
37const negativeOneUInt64 = ^uint64(0)
38
39// Store defines the interface for a load store. It keeps loads and can report
40// them to a server when requested.
41type Store interface {
42	CallDropped(category string)
43	CallStarted(l internal.Locality)
44	CallFinished(l internal.Locality, err error)
45	CallServerLoad(l internal.Locality, name string, d float64)
46	// Report the load of clusterName to cc.
47	ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node)
48}
49
50type rpcCountData struct {
51	// Only atomic accesses are allowed for the fields.
52	succeeded  *uint64
53	errored    *uint64
54	inProgress *uint64
55
56	// Map from load name to load data (sum+count). Loading data from map is
57	// atomic, but updating data takes a lock, which could cause contention when
58	// multiple RPCs try to report loads for the same name.
59	//
60	// To fix the contention, shard this map.
61	serverLoads sync.Map // map[string]*rpcLoadData
62}
63
64func newRPCCountData() *rpcCountData {
65	return &rpcCountData{
66		succeeded:  new(uint64),
67		errored:    new(uint64),
68		inProgress: new(uint64),
69	}
70}
71
72func (rcd *rpcCountData) incrSucceeded() {
73	atomic.AddUint64(rcd.succeeded, 1)
74}
75
76func (rcd *rpcCountData) loadAndClearSucceeded() uint64 {
77	return atomic.SwapUint64(rcd.succeeded, 0)
78}
79
80func (rcd *rpcCountData) incrErrored() {
81	atomic.AddUint64(rcd.errored, 1)
82}
83
84func (rcd *rpcCountData) loadAndClearErrored() uint64 {
85	return atomic.SwapUint64(rcd.errored, 0)
86}
87
88func (rcd *rpcCountData) incrInProgress() {
89	atomic.AddUint64(rcd.inProgress, 1)
90}
91
92func (rcd *rpcCountData) decrInProgress() {
93	atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1)
94}
95
96func (rcd *rpcCountData) loadInProgress() uint64 {
97	return atomic.LoadUint64(rcd.inProgress) // InProgress count is not clear when reading.
98}
99
100func (rcd *rpcCountData) addServerLoad(name string, d float64) {
101	loads, ok := rcd.serverLoads.Load(name)
102	if !ok {
103		tl := newRPCLoadData()
104		loads, _ = rcd.serverLoads.LoadOrStore(name, tl)
105	}
106	loads.(*rpcLoadData).add(d)
107}
108
109// Data for server loads (from trailers or oob). Fields in this struct must be
110// updated consistently.
111//
112// The current solution is to hold a lock, which could cause contention. To fix,
113// shard serverLoads map in rpcCountData.
114type rpcLoadData struct {
115	mu    sync.Mutex
116	sum   float64
117	count uint64
118}
119
120func newRPCLoadData() *rpcLoadData {
121	return &rpcLoadData{}
122}
123
124func (rld *rpcLoadData) add(v float64) {
125	rld.mu.Lock()
126	rld.sum += v
127	rld.count++
128	rld.mu.Unlock()
129}
130
131func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
132	rld.mu.Lock()
133	s = rld.sum
134	rld.sum = 0
135	c = rld.count
136	rld.count = 0
137	rld.mu.Unlock()
138	return
139}
140
141// lrsStore collects loads from xds balancer, and periodically sends load to the
142// server.
143type lrsStore struct {
144	backoff      backoff.Strategy
145	lastReported time.Time
146
147	drops            sync.Map // map[string]*uint64
148	localityRPCCount sync.Map // map[internal.Locality]*rpcCountData
149}
150
151// NewStore creates a store for load reports.
152func NewStore() Store {
153	return &lrsStore{
154		backoff:      backoff.DefaultExponential,
155		lastReported: time.Now(),
156	}
157}
158
159// Update functions are called by picker for each RPC. To avoid contention, all
160// updates are done atomically.
161
162// CallDropped adds one drop record with the given category to store.
163func (ls *lrsStore) CallDropped(category string) {
164	p, ok := ls.drops.Load(category)
165	if !ok {
166		tp := new(uint64)
167		p, _ = ls.drops.LoadOrStore(category, tp)
168	}
169	atomic.AddUint64(p.(*uint64), 1)
170}
171
172func (ls *lrsStore) CallStarted(l internal.Locality) {
173	p, ok := ls.localityRPCCount.Load(l)
174	if !ok {
175		tp := newRPCCountData()
176		p, _ = ls.localityRPCCount.LoadOrStore(l, tp)
177	}
178	p.(*rpcCountData).incrInProgress()
179}
180
181func (ls *lrsStore) CallFinished(l internal.Locality, err error) {
182	p, ok := ls.localityRPCCount.Load(l)
183	if !ok {
184		// The map is never cleared, only values in the map are reset. So the
185		// case where entry for call-finish is not found should never happen.
186		return
187	}
188	p.(*rpcCountData).decrInProgress()
189	if err == nil {
190		p.(*rpcCountData).incrSucceeded()
191	} else {
192		p.(*rpcCountData).incrErrored()
193	}
194}
195
196func (ls *lrsStore) CallServerLoad(l internal.Locality, name string, d float64) {
197	p, ok := ls.localityRPCCount.Load(l)
198	if !ok {
199		// The map is never cleared, only values in the map are reset. So the
200		// case where entry for CallServerLoad is not found should never happen.
201		return
202	}
203	p.(*rpcCountData).addServerLoad(name, d)
204}
205
206func (ls *lrsStore) buildStats(clusterName string) []*endpointpb.ClusterStats {
207	var (
208		totalDropped  uint64
209		droppedReqs   []*endpointpb.ClusterStats_DroppedRequests
210		localityStats []*endpointpb.UpstreamLocalityStats
211	)
212	ls.drops.Range(func(category, countP interface{}) bool {
213		tempCount := atomic.SwapUint64(countP.(*uint64), 0)
214		if tempCount == 0 {
215			return true
216		}
217		totalDropped += tempCount
218		droppedReqs = append(droppedReqs, &endpointpb.ClusterStats_DroppedRequests{
219			Category:     category.(string),
220			DroppedCount: tempCount,
221		})
222		return true
223	})
224	ls.localityRPCCount.Range(func(locality, countP interface{}) bool {
225		tempLocality := locality.(internal.Locality)
226		tempCount := countP.(*rpcCountData)
227
228		tempSucceeded := tempCount.loadAndClearSucceeded()
229		tempInProgress := tempCount.loadInProgress()
230		tempErrored := tempCount.loadAndClearErrored()
231		if tempSucceeded == 0 && tempInProgress == 0 && tempErrored == 0 {
232			return true
233		}
234
235		var loadMetricStats []*endpointpb.EndpointLoadMetricStats
236		tempCount.serverLoads.Range(func(name, data interface{}) bool {
237			tempName := name.(string)
238			tempSum, tempCount := data.(*rpcLoadData).loadAndClear()
239			if tempCount == 0 {
240				return true
241			}
242			loadMetricStats = append(loadMetricStats,
243				&endpointpb.EndpointLoadMetricStats{
244					MetricName:                    tempName,
245					NumRequestsFinishedWithMetric: tempCount,
246					TotalMetricValue:              tempSum,
247				},
248			)
249			return true
250		})
251
252		localityStats = append(localityStats, &endpointpb.UpstreamLocalityStats{
253			Locality: &corepb.Locality{
254				Region:  tempLocality.Region,
255				Zone:    tempLocality.Zone,
256				SubZone: tempLocality.SubZone,
257			},
258			TotalSuccessfulRequests: tempSucceeded,
259			TotalRequestsInProgress: tempInProgress,
260			TotalErrorRequests:      tempErrored,
261			LoadMetricStats:         loadMetricStats,
262			UpstreamEndpointStats:   nil, // TODO: populate for per endpoint loads.
263		})
264		return true
265	})
266
267	dur := time.Since(ls.lastReported)
268	ls.lastReported = time.Now()
269
270	var ret []*endpointpb.ClusterStats
271	ret = append(ret, &endpointpb.ClusterStats{
272		ClusterName:           clusterName,
273		UpstreamLocalityStats: localityStats,
274
275		TotalDroppedRequests: totalDropped,
276		DroppedRequests:      droppedReqs,
277		LoadReportInterval:   ptypes.DurationProto(dur),
278	})
279
280	return ret
281}
282
283// ReportTo makes a streaming lrs call to cc and blocks.
284//
285// It retries the call (with backoff) until ctx is canceled.
286func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) {
287	c := lrsgrpc.NewLoadReportingServiceClient(cc)
288	var (
289		retryCount int
290		doBackoff  bool
291	)
292	for {
293		select {
294		case <-ctx.Done():
295			return
296		default:
297		}
298
299		if doBackoff {
300			backoffTimer := time.NewTimer(ls.backoff.Backoff(retryCount))
301			select {
302			case <-backoffTimer.C:
303			case <-ctx.Done():
304				backoffTimer.Stop()
305				return
306			}
307			retryCount++
308		}
309
310		doBackoff = true
311		stream, err := c.StreamLoadStats(ctx)
312		if err != nil {
313			grpclog.Warningf("lrs: failed to create stream: %v", err)
314			continue
315		}
316		if err := stream.Send(&lrspb.LoadStatsRequest{
317			Node: node,
318		}); err != nil {
319			grpclog.Warningf("lrs: failed to send first request: %v", err)
320			continue
321		}
322		first, err := stream.Recv()
323		if err != nil {
324			grpclog.Warningf("lrs: failed to receive first response: %v", err)
325			continue
326		}
327		interval, err := ptypes.Duration(first.LoadReportingInterval)
328		if err != nil {
329			grpclog.Warningf("lrs: failed to convert report interval: %v", err)
330			continue
331		}
332		// The LRS client should join the clusters it knows with the cluster
333		// list from response, and send loads for them.
334		//
335		// But the LRS client now only supports one cluster. TODO: extend it to
336		// support multiple clusters.
337		var clusterFoundInResponse bool
338		for _, c := range first.Clusters {
339			if c == clusterName {
340				clusterFoundInResponse = true
341			}
342		}
343		if !clusterFoundInResponse {
344			grpclog.Warningf("lrs: received clusters %v does not contain expected {%v}", first.Clusters, clusterName)
345			continue
346		}
347		if first.ReportEndpointGranularity {
348			// TODO: fixme to support per endpoint loads.
349			grpclog.Warningf("lrs: endpoint loads requested, but not supported by current implementation")
350			continue
351		}
352
353		// No backoff afterwards.
354		doBackoff = false
355		retryCount = 0
356		ls.sendLoads(ctx, stream, clusterName, interval)
357	}
358}
359
360func (ls *lrsStore) sendLoads(ctx context.Context, stream lrsgrpc.LoadReportingService_StreamLoadStatsClient, clusterName string, interval time.Duration) {
361	tick := time.NewTicker(interval)
362	defer tick.Stop()
363	for {
364		select {
365		case <-tick.C:
366		case <-ctx.Done():
367			return
368		}
369		if err := stream.Send(&lrspb.LoadStatsRequest{
370			ClusterStats: ls.buildStats(clusterName),
371		}); err != nil {
372			grpclog.Warningf("lrs: failed to send report: %v", err)
373			return
374		}
375	}
376}
377