1/*
2 *
3 * Copyright 2020 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package load
19
20import (
21	"fmt"
22	"sort"
23	"sync"
24	"testing"
25
26	"github.com/google/go-cmp/cmp"
27	"github.com/google/go-cmp/cmp/cmpopts"
28)
29
30var (
31	dropCategories = []string{"drop_for_real", "drop_for_fun"}
32	localities     = []string{"locality-A", "locality-B"}
33	errTest        = fmt.Errorf("test error")
34)
35
36// rpcData wraps the rpc counts and load data to be pushed to the store.
37type rpcData struct {
38	start, success, failure int
39	serverData              map[string]float64 // Will be reported with successful RPCs.
40}
41
42// TestDrops spawns a bunch of goroutines which report drop data. After the
43// goroutines have exited, the test dumps the stats from the Store and makes
44// sure they are as expected.
45func TestDrops(t *testing.T) {
46	var (
47		drops = map[string]int{
48			dropCategories[0]: 30,
49			dropCategories[1]: 40,
50			"":                10,
51		}
52		wantStoreData = &Data{
53			TotalDrops: 80,
54			Drops: map[string]uint64{
55				dropCategories[0]: 30,
56				dropCategories[1]: 40,
57			},
58		}
59	)
60
61	ls := perClusterStore{}
62	var wg sync.WaitGroup
63	for category, count := range drops {
64		for i := 0; i < count; i++ {
65			wg.Add(1)
66			go func(c string) {
67				ls.CallDropped(c)
68				wg.Done()
69			}(category)
70		}
71	}
72	wg.Wait()
73
74	gotStoreData := ls.stats()
75	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
76		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
77	}
78}
79
80// TestLocalityStats spawns a bunch of goroutines which report rpc and load
81// data. After the goroutines have exited, the test dumps the stats from the
82// Store and makes sure they are as expected.
83func TestLocalityStats(t *testing.T) {
84	var (
85		localityData = map[string]rpcData{
86			localities[0]: {
87				start:      40,
88				success:    20,
89				failure:    10,
90				serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4},
91			},
92			localities[1]: {
93				start:      80,
94				success:    40,
95				failure:    20,
96				serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4},
97			},
98		}
99		wantStoreData = &Data{
100			LocalityStats: map[string]LocalityData{
101				localities[0]: {
102					RequestStats: RequestData{Succeeded: 20, Errored: 10, InProgress: 10},
103					LoadStats: map[string]ServerLoadData{
104						"net":  {Count: 20, Sum: 20},
105						"disk": {Count: 20, Sum: 40},
106						"cpu":  {Count: 20, Sum: 60},
107						"mem":  {Count: 20, Sum: 80},
108					},
109				},
110				localities[1]: {
111					RequestStats: RequestData{Succeeded: 40, Errored: 20, InProgress: 20},
112					LoadStats: map[string]ServerLoadData{
113						"net":  {Count: 40, Sum: 40},
114						"disk": {Count: 40, Sum: 80},
115						"cpu":  {Count: 40, Sum: 120},
116						"mem":  {Count: 40, Sum: 160},
117					},
118				},
119			},
120		}
121	)
122
123	ls := perClusterStore{}
124	var wg sync.WaitGroup
125	for locality, data := range localityData {
126		wg.Add(data.start)
127		for i := 0; i < data.start; i++ {
128			go func(l string) {
129				ls.CallStarted(l)
130				wg.Done()
131			}(locality)
132		}
133		// The calls to callStarted() need to happen before the other calls are
134		// made. Hence the wait here.
135		wg.Wait()
136
137		wg.Add(data.success)
138		for i := 0; i < data.success; i++ {
139			go func(l string, serverData map[string]float64) {
140				ls.CallFinished(l, nil)
141				for n, d := range serverData {
142					ls.CallServerLoad(l, n, d)
143				}
144				wg.Done()
145			}(locality, data.serverData)
146		}
147		wg.Add(data.failure)
148		for i := 0; i < data.failure; i++ {
149			go func(l string) {
150				ls.CallFinished(l, errTest)
151				wg.Done()
152			}(locality)
153		}
154		wg.Wait()
155	}
156
157	gotStoreData := ls.stats()
158	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
159		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
160	}
161}
162
163func TestResetAfterStats(t *testing.T) {
164	// Push a bunch of drops, call stats and load stats, and leave inProgress to be non-zero.
165	// Dump the stats. Verify expexted
166	// Push the same set of loads as before
167	// Now dump and verify the newly expected ones.
168	var (
169		drops = map[string]int{
170			dropCategories[0]: 30,
171			dropCategories[1]: 40,
172		}
173		localityData = map[string]rpcData{
174			localities[0]: {
175				start:      40,
176				success:    20,
177				failure:    10,
178				serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4},
179			},
180			localities[1]: {
181				start:      80,
182				success:    40,
183				failure:    20,
184				serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4},
185			},
186		}
187		wantStoreData = &Data{
188			TotalDrops: 70,
189			Drops: map[string]uint64{
190				dropCategories[0]: 30,
191				dropCategories[1]: 40,
192			},
193			LocalityStats: map[string]LocalityData{
194				localities[0]: {
195					RequestStats: RequestData{Succeeded: 20, Errored: 10, InProgress: 10},
196					LoadStats: map[string]ServerLoadData{
197						"net":  {Count: 20, Sum: 20},
198						"disk": {Count: 20, Sum: 40},
199						"cpu":  {Count: 20, Sum: 60},
200						"mem":  {Count: 20, Sum: 80},
201					},
202				},
203				localities[1]: {
204					RequestStats: RequestData{Succeeded: 40, Errored: 20, InProgress: 20},
205					LoadStats: map[string]ServerLoadData{
206						"net":  {Count: 40, Sum: 40},
207						"disk": {Count: 40, Sum: 80},
208						"cpu":  {Count: 40, Sum: 120},
209						"mem":  {Count: 40, Sum: 160},
210					},
211				},
212			},
213		}
214	)
215
216	reportLoad := func(ls *perClusterStore) {
217		for category, count := range drops {
218			for i := 0; i < count; i++ {
219				ls.CallDropped(category)
220			}
221		}
222		for locality, data := range localityData {
223			for i := 0; i < data.start; i++ {
224				ls.CallStarted(locality)
225			}
226			for i := 0; i < data.success; i++ {
227				ls.CallFinished(locality, nil)
228				for n, d := range data.serverData {
229					ls.CallServerLoad(locality, n, d)
230				}
231			}
232			for i := 0; i < data.failure; i++ {
233				ls.CallFinished(locality, errTest)
234			}
235		}
236	}
237
238	ls := perClusterStore{}
239	reportLoad(&ls)
240	gotStoreData := ls.stats()
241	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
242		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
243	}
244
245	// The above call to stats() should have reset all load reports except the
246	// inProgress rpc count. We are now going to push the same load data into
247	// the store. So, we should expect to see twice the count for inProgress.
248	for _, l := range localities {
249		ls := wantStoreData.LocalityStats[l]
250		ls.RequestStats.InProgress *= 2
251		wantStoreData.LocalityStats[l] = ls
252	}
253	reportLoad(&ls)
254	gotStoreData = ls.stats()
255	if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" {
256		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
257	}
258}
259
260var sortDataSlice = cmp.Transformer("SortDataSlice", func(in []*Data) []*Data {
261	out := append([]*Data(nil), in...) // Copy input to avoid mutating it
262	sort.Slice(out,
263		func(i, j int) bool {
264			if out[i].Cluster < out[j].Cluster {
265				return true
266			}
267			if out[i].Cluster == out[j].Cluster {
268				return out[i].Service < out[j].Service
269			}
270			return false
271		},
272	)
273	return out
274})
275
276// Test all load are returned for the given clusters, and all clusters are
277// reported if no cluster is specified.
278func TestStoreStats(t *testing.T) {
279	var (
280		testClusters = []string{"c0", "c1", "c2"}
281		testServices = []string{"s0", "s1"}
282		testLocality = "test-locality"
283	)
284
285	store := NewStore()
286	for _, c := range testClusters {
287		for _, s := range testServices {
288			store.PerCluster(c, s).CallStarted(testLocality)
289			store.PerCluster(c, s).CallServerLoad(testLocality, "abc", 123)
290			store.PerCluster(c, s).CallDropped("dropped")
291			store.PerCluster(c, s).CallFinished(testLocality, nil)
292		}
293	}
294
295	wantC0 := []*Data{
296		{
297			Cluster: "c0", Service: "s0",
298			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
299			LocalityStats: map[string]LocalityData{
300				"test-locality": {
301					RequestStats: RequestData{Succeeded: 1},
302					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
303				},
304			},
305		},
306		{
307			Cluster: "c0", Service: "s1",
308			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
309			LocalityStats: map[string]LocalityData{
310				"test-locality": {
311					RequestStats: RequestData{Succeeded: 1},
312					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
313				},
314			},
315		},
316	}
317	// Call Stats with just "c0", this should return data for "c0", and not
318	// touch data for other clusters.
319	gotC0 := store.Stats([]string{"c0"})
320	if diff := cmp.Diff(wantC0, gotC0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
321		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
322	}
323
324	wantOther := []*Data{
325		{
326			Cluster: "c1", Service: "s0",
327			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
328			LocalityStats: map[string]LocalityData{
329				"test-locality": {
330					RequestStats: RequestData{Succeeded: 1},
331					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
332				},
333			},
334		},
335		{
336			Cluster: "c1", Service: "s1",
337			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
338			LocalityStats: map[string]LocalityData{
339				"test-locality": {
340					RequestStats: RequestData{Succeeded: 1},
341					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
342				},
343			},
344		},
345		{
346			Cluster: "c2", Service: "s0",
347			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
348			LocalityStats: map[string]LocalityData{
349				"test-locality": {
350					RequestStats: RequestData{Succeeded: 1},
351					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
352				},
353			},
354		},
355		{
356			Cluster: "c2", Service: "s1",
357			TotalDrops: 1, Drops: map[string]uint64{"dropped": 1},
358			LocalityStats: map[string]LocalityData{
359				"test-locality": {
360					RequestStats: RequestData{Succeeded: 1},
361					LoadStats:    map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}},
362				},
363			},
364		},
365	}
366	// Call Stats with empty slice, this should return data for all the
367	// remaining clusters, and not include c0 (because c0 data was cleared).
368	gotOther := store.Stats(nil)
369	if diff := cmp.Diff(wantOther, gotOther, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
370		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
371	}
372}
373
374// Test the cases that if a cluster doesn't have load to report, its data is not
375// appended to the slice returned by Stats().
376func TestStoreStatsEmptyDataNotReported(t *testing.T) {
377	var (
378		testServices = []string{"s0", "s1"}
379		testLocality = "test-locality"
380	)
381
382	store := NewStore()
383	// "c0"'s RPCs all finish with success.
384	for _, s := range testServices {
385		store.PerCluster("c0", s).CallStarted(testLocality)
386		store.PerCluster("c0", s).CallFinished(testLocality, nil)
387	}
388	// "c1"'s RPCs never finish (always inprocess).
389	for _, s := range testServices {
390		store.PerCluster("c1", s).CallStarted(testLocality)
391	}
392
393	want0 := []*Data{
394		{
395			Cluster: "c0", Service: "s0",
396			LocalityStats: map[string]LocalityData{
397				"test-locality": {RequestStats: RequestData{Succeeded: 1}},
398			},
399		},
400		{
401			Cluster: "c0", Service: "s1",
402			LocalityStats: map[string]LocalityData{
403				"test-locality": {RequestStats: RequestData{Succeeded: 1}},
404			},
405		},
406		{
407			Cluster: "c1", Service: "s0",
408			LocalityStats: map[string]LocalityData{
409				"test-locality": {RequestStats: RequestData{InProgress: 1}},
410			},
411		},
412		{
413			Cluster: "c1", Service: "s1",
414			LocalityStats: map[string]LocalityData{
415				"test-locality": {RequestStats: RequestData{InProgress: 1}},
416			},
417		},
418	}
419	// Call Stats with empty slice, this should return data for all the
420	// clusters.
421	got0 := store.Stats(nil)
422	if diff := cmp.Diff(want0, got0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
423		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
424	}
425
426	want1 := []*Data{
427		{
428			Cluster: "c1", Service: "s0",
429			LocalityStats: map[string]LocalityData{
430				"test-locality": {RequestStats: RequestData{InProgress: 1}},
431			},
432		},
433		{
434			Cluster: "c1", Service: "s1",
435			LocalityStats: map[string]LocalityData{
436				"test-locality": {RequestStats: RequestData{InProgress: 1}},
437			},
438		},
439	}
440	// Call Stats with empty slice again, this should return data only for "c1",
441	// because "c0" data was cleared, but "c1" has in-progress RPCs.
442	got1 := store.Stats(nil)
443	if diff := cmp.Diff(want1, got1, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" {
444		t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff)
445	}
446}
447