1/* 2 * Copyright 2019 gRPC authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17// Package lrs implements load reporting service for xds balancer. 18package lrs 19 20import ( 21 "context" 22 "sync" 23 "sync/atomic" 24 "time" 25 26 corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 27 endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" 28 lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" 29 lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" 30 "github.com/golang/protobuf/ptypes" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/grpclog" 33 "google.golang.org/grpc/internal/backoff" 34 "google.golang.org/grpc/xds/internal" 35) 36 37const negativeOneUInt64 = ^uint64(0) 38 39// Store defines the interface for a load store. It keeps loads and can report 40// them to a server when requested. 41type Store interface { 42 CallDropped(category string) 43 CallStarted(l internal.Locality) 44 CallFinished(l internal.Locality, err error) 45 CallServerLoad(l internal.Locality, name string, d float64) 46 // Report the load of clusterName to cc. 47 ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) 48} 49 50type rpcCountData struct { 51 // Only atomic accesses are allowed for the fields. 52 succeeded *uint64 53 errored *uint64 54 inProgress *uint64 55 56 // Map from load name to load data (sum+count). Loading data from map is 57 // atomic, but updating data takes a lock, which could cause contention when 58 // multiple RPCs try to report loads for the same name. 59 // 60 // To fix the contention, shard this map. 61 serverLoads sync.Map // map[string]*rpcLoadData 62} 63 64func newRPCCountData() *rpcCountData { 65 return &rpcCountData{ 66 succeeded: new(uint64), 67 errored: new(uint64), 68 inProgress: new(uint64), 69 } 70} 71 72func (rcd *rpcCountData) incrSucceeded() { 73 atomic.AddUint64(rcd.succeeded, 1) 74} 75 76func (rcd *rpcCountData) loadAndClearSucceeded() uint64 { 77 return atomic.SwapUint64(rcd.succeeded, 0) 78} 79 80func (rcd *rpcCountData) incrErrored() { 81 atomic.AddUint64(rcd.errored, 1) 82} 83 84func (rcd *rpcCountData) loadAndClearErrored() uint64 { 85 return atomic.SwapUint64(rcd.errored, 0) 86} 87 88func (rcd *rpcCountData) incrInProgress() { 89 atomic.AddUint64(rcd.inProgress, 1) 90} 91 92func (rcd *rpcCountData) decrInProgress() { 93 atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1) 94} 95 96func (rcd *rpcCountData) loadInProgress() uint64 { 97 return atomic.LoadUint64(rcd.inProgress) // InProgress count is not clear when reading. 98} 99 100func (rcd *rpcCountData) addServerLoad(name string, d float64) { 101 loads, ok := rcd.serverLoads.Load(name) 102 if !ok { 103 tl := newRPCLoadData() 104 loads, _ = rcd.serverLoads.LoadOrStore(name, tl) 105 } 106 loads.(*rpcLoadData).add(d) 107} 108 109// Data for server loads (from trailers or oob). Fields in this struct must be 110// updated consistently. 111// 112// The current solution is to hold a lock, which could cause contention. To fix, 113// shard serverLoads map in rpcCountData. 114type rpcLoadData struct { 115 mu sync.Mutex 116 sum float64 117 count uint64 118} 119 120func newRPCLoadData() *rpcLoadData { 121 return &rpcLoadData{} 122} 123 124func (rld *rpcLoadData) add(v float64) { 125 rld.mu.Lock() 126 rld.sum += v 127 rld.count++ 128 rld.mu.Unlock() 129} 130 131func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) { 132 rld.mu.Lock() 133 s = rld.sum 134 rld.sum = 0 135 c = rld.count 136 rld.count = 0 137 rld.mu.Unlock() 138 return 139} 140 141// lrsStore collects loads from xds balancer, and periodically sends load to the 142// server. 143type lrsStore struct { 144 backoff backoff.Strategy 145 lastReported time.Time 146 147 drops sync.Map // map[string]*uint64 148 localityRPCCount sync.Map // map[internal.Locality]*rpcCountData 149} 150 151// NewStore creates a store for load reports. 152func NewStore() Store { 153 return &lrsStore{ 154 backoff: backoff.DefaultExponential, 155 lastReported: time.Now(), 156 } 157} 158 159// Update functions are called by picker for each RPC. To avoid contention, all 160// updates are done atomically. 161 162// CallDropped adds one drop record with the given category to store. 163func (ls *lrsStore) CallDropped(category string) { 164 p, ok := ls.drops.Load(category) 165 if !ok { 166 tp := new(uint64) 167 p, _ = ls.drops.LoadOrStore(category, tp) 168 } 169 atomic.AddUint64(p.(*uint64), 1) 170} 171 172func (ls *lrsStore) CallStarted(l internal.Locality) { 173 p, ok := ls.localityRPCCount.Load(l) 174 if !ok { 175 tp := newRPCCountData() 176 p, _ = ls.localityRPCCount.LoadOrStore(l, tp) 177 } 178 p.(*rpcCountData).incrInProgress() 179} 180 181func (ls *lrsStore) CallFinished(l internal.Locality, err error) { 182 p, ok := ls.localityRPCCount.Load(l) 183 if !ok { 184 // The map is never cleared, only values in the map are reset. So the 185 // case where entry for call-finish is not found should never happen. 186 return 187 } 188 p.(*rpcCountData).decrInProgress() 189 if err == nil { 190 p.(*rpcCountData).incrSucceeded() 191 } else { 192 p.(*rpcCountData).incrErrored() 193 } 194} 195 196func (ls *lrsStore) CallServerLoad(l internal.Locality, name string, d float64) { 197 p, ok := ls.localityRPCCount.Load(l) 198 if !ok { 199 // The map is never cleared, only values in the map are reset. So the 200 // case where entry for CallServerLoad is not found should never happen. 201 return 202 } 203 p.(*rpcCountData).addServerLoad(name, d) 204} 205 206func (ls *lrsStore) buildStats(clusterName string) []*endpointpb.ClusterStats { 207 var ( 208 totalDropped uint64 209 droppedReqs []*endpointpb.ClusterStats_DroppedRequests 210 localityStats []*endpointpb.UpstreamLocalityStats 211 ) 212 ls.drops.Range(func(category, countP interface{}) bool { 213 tempCount := atomic.SwapUint64(countP.(*uint64), 0) 214 if tempCount == 0 { 215 return true 216 } 217 totalDropped += tempCount 218 droppedReqs = append(droppedReqs, &endpointpb.ClusterStats_DroppedRequests{ 219 Category: category.(string), 220 DroppedCount: tempCount, 221 }) 222 return true 223 }) 224 ls.localityRPCCount.Range(func(locality, countP interface{}) bool { 225 tempLocality := locality.(internal.Locality) 226 tempCount := countP.(*rpcCountData) 227 228 tempSucceeded := tempCount.loadAndClearSucceeded() 229 tempInProgress := tempCount.loadInProgress() 230 tempErrored := tempCount.loadAndClearErrored() 231 if tempSucceeded == 0 && tempInProgress == 0 && tempErrored == 0 { 232 return true 233 } 234 235 var loadMetricStats []*endpointpb.EndpointLoadMetricStats 236 tempCount.serverLoads.Range(func(name, data interface{}) bool { 237 tempName := name.(string) 238 tempSum, tempCount := data.(*rpcLoadData).loadAndClear() 239 if tempCount == 0 { 240 return true 241 } 242 loadMetricStats = append(loadMetricStats, 243 &endpointpb.EndpointLoadMetricStats{ 244 MetricName: tempName, 245 NumRequestsFinishedWithMetric: tempCount, 246 TotalMetricValue: tempSum, 247 }, 248 ) 249 return true 250 }) 251 252 localityStats = append(localityStats, &endpointpb.UpstreamLocalityStats{ 253 Locality: &corepb.Locality{ 254 Region: tempLocality.Region, 255 Zone: tempLocality.Zone, 256 SubZone: tempLocality.SubZone, 257 }, 258 TotalSuccessfulRequests: tempSucceeded, 259 TotalRequestsInProgress: tempInProgress, 260 TotalErrorRequests: tempErrored, 261 LoadMetricStats: loadMetricStats, 262 UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads. 263 }) 264 return true 265 }) 266 267 dur := time.Since(ls.lastReported) 268 ls.lastReported = time.Now() 269 270 var ret []*endpointpb.ClusterStats 271 ret = append(ret, &endpointpb.ClusterStats{ 272 ClusterName: clusterName, 273 UpstreamLocalityStats: localityStats, 274 275 TotalDroppedRequests: totalDropped, 276 DroppedRequests: droppedReqs, 277 LoadReportInterval: ptypes.DurationProto(dur), 278 }) 279 280 return ret 281} 282 283// ReportTo makes a streaming lrs call to cc and blocks. 284// 285// It retries the call (with backoff) until ctx is canceled. 286func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) { 287 c := lrsgrpc.NewLoadReportingServiceClient(cc) 288 var ( 289 retryCount int 290 doBackoff bool 291 ) 292 for { 293 select { 294 case <-ctx.Done(): 295 return 296 default: 297 } 298 299 if doBackoff { 300 backoffTimer := time.NewTimer(ls.backoff.Backoff(retryCount)) 301 select { 302 case <-backoffTimer.C: 303 case <-ctx.Done(): 304 backoffTimer.Stop() 305 return 306 } 307 retryCount++ 308 } 309 310 doBackoff = true 311 stream, err := c.StreamLoadStats(ctx) 312 if err != nil { 313 grpclog.Warningf("lrs: failed to create stream: %v", err) 314 continue 315 } 316 if err := stream.Send(&lrspb.LoadStatsRequest{ 317 ClusterStats: []*endpointpb.ClusterStats{{ 318 ClusterName: clusterName, 319 }}, 320 Node: node, 321 }); err != nil { 322 grpclog.Warningf("lrs: failed to send first request: %v", err) 323 continue 324 } 325 first, err := stream.Recv() 326 if err != nil { 327 grpclog.Warningf("lrs: failed to receive first response: %v", err) 328 continue 329 } 330 interval, err := ptypes.Duration(first.LoadReportingInterval) 331 if err != nil { 332 grpclog.Warningf("lrs: failed to convert report interval: %v", err) 333 continue 334 } 335 if len(first.Clusters) != 1 { 336 grpclog.Warningf("lrs: received multiple clusters %v, expect one cluster", first.Clusters) 337 continue 338 } 339 if first.Clusters[0] != clusterName { 340 grpclog.Warningf("lrs: received cluster is unexpected. Got %v, want %v", first.Clusters[0], clusterName) 341 continue 342 } 343 if first.ReportEndpointGranularity { 344 // TODO: fixme to support per endpoint loads. 345 grpclog.Warningf("lrs: endpoint loads requested, but not supported by current implementation") 346 continue 347 } 348 349 // No backoff afterwards. 350 doBackoff = false 351 retryCount = 0 352 ls.sendLoads(ctx, stream, clusterName, interval) 353 } 354} 355 356func (ls *lrsStore) sendLoads(ctx context.Context, stream lrsgrpc.LoadReportingService_StreamLoadStatsClient, clusterName string, interval time.Duration) { 357 tick := time.NewTicker(interval) 358 defer tick.Stop() 359 for { 360 select { 361 case <-tick.C: 362 case <-ctx.Done(): 363 return 364 } 365 if err := stream.Send(&lrspb.LoadStatsRequest{ 366 ClusterStats: ls.buildStats(clusterName), 367 }); err != nil { 368 grpclog.Warningf("lrs: failed to send report: %v", err) 369 return 370 } 371 } 372} 373