1// +build go1.12 2 3/* 4 * 5 * Copyright 2020 gRPC authors. 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 20package load 21 22import ( 23 "fmt" 24 "sort" 25 "sync" 26 "testing" 27 28 "github.com/google/go-cmp/cmp" 29 "github.com/google/go-cmp/cmp/cmpopts" 30) 31 32var ( 33 dropCategories = []string{"drop_for_real", "drop_for_fun"} 34 localities = []string{"locality-A", "locality-B"} 35 errTest = fmt.Errorf("test error") 36) 37 38// rpcData wraps the rpc counts and load data to be pushed to the store. 39type rpcData struct { 40 start, success, failure int 41 serverData map[string]float64 // Will be reported with successful RPCs. 42} 43 44// TestDrops spawns a bunch of goroutines which report drop data. After the 45// goroutines have exited, the test dumps the stats from the Store and makes 46// sure they are as expected. 47func TestDrops(t *testing.T) { 48 var ( 49 drops = map[string]int{ 50 dropCategories[0]: 30, 51 dropCategories[1]: 40, 52 "": 10, 53 } 54 wantStoreData = &Data{ 55 TotalDrops: 80, 56 Drops: map[string]uint64{ 57 dropCategories[0]: 30, 58 dropCategories[1]: 40, 59 }, 60 } 61 ) 62 63 ls := perClusterStore{} 64 var wg sync.WaitGroup 65 for category, count := range drops { 66 for i := 0; i < count; i++ { 67 wg.Add(1) 68 go func(c string) { 69 ls.CallDropped(c) 70 wg.Done() 71 }(category) 72 } 73 } 74 wg.Wait() 75 76 gotStoreData := ls.stats() 77 if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { 78 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 79 } 80} 81 82// TestLocalityStats spawns a bunch of goroutines which report rpc and load 83// data. After the goroutines have exited, the test dumps the stats from the 84// Store and makes sure they are as expected. 85func TestLocalityStats(t *testing.T) { 86 var ( 87 localityData = map[string]rpcData{ 88 localities[0]: { 89 start: 40, 90 success: 20, 91 failure: 10, 92 serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, 93 }, 94 localities[1]: { 95 start: 80, 96 success: 40, 97 failure: 20, 98 serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, 99 }, 100 } 101 wantStoreData = &Data{ 102 LocalityStats: map[string]LocalityData{ 103 localities[0]: { 104 RequestStats: RequestData{Succeeded: 20, Errored: 10, InProgress: 10}, 105 LoadStats: map[string]ServerLoadData{ 106 "net": {Count: 20, Sum: 20}, 107 "disk": {Count: 20, Sum: 40}, 108 "cpu": {Count: 20, Sum: 60}, 109 "mem": {Count: 20, Sum: 80}, 110 }, 111 }, 112 localities[1]: { 113 RequestStats: RequestData{Succeeded: 40, Errored: 20, InProgress: 20}, 114 LoadStats: map[string]ServerLoadData{ 115 "net": {Count: 40, Sum: 40}, 116 "disk": {Count: 40, Sum: 80}, 117 "cpu": {Count: 40, Sum: 120}, 118 "mem": {Count: 40, Sum: 160}, 119 }, 120 }, 121 }, 122 } 123 ) 124 125 ls := perClusterStore{} 126 var wg sync.WaitGroup 127 for locality, data := range localityData { 128 wg.Add(data.start) 129 for i := 0; i < data.start; i++ { 130 go func(l string) { 131 ls.CallStarted(l) 132 wg.Done() 133 }(locality) 134 } 135 // The calls to callStarted() need to happen before the other calls are 136 // made. Hence the wait here. 137 wg.Wait() 138 139 wg.Add(data.success) 140 for i := 0; i < data.success; i++ { 141 go func(l string, serverData map[string]float64) { 142 ls.CallFinished(l, nil) 143 for n, d := range serverData { 144 ls.CallServerLoad(l, n, d) 145 } 146 wg.Done() 147 }(locality, data.serverData) 148 } 149 wg.Add(data.failure) 150 for i := 0; i < data.failure; i++ { 151 go func(l string) { 152 ls.CallFinished(l, errTest) 153 wg.Done() 154 }(locality) 155 } 156 wg.Wait() 157 } 158 159 gotStoreData := ls.stats() 160 if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { 161 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 162 } 163} 164 165func TestResetAfterStats(t *testing.T) { 166 // Push a bunch of drops, call stats and load stats, and leave inProgress to be non-zero. 167 // Dump the stats. Verify expexted 168 // Push the same set of loads as before 169 // Now dump and verify the newly expected ones. 170 var ( 171 drops = map[string]int{ 172 dropCategories[0]: 30, 173 dropCategories[1]: 40, 174 } 175 localityData = map[string]rpcData{ 176 localities[0]: { 177 start: 40, 178 success: 20, 179 failure: 10, 180 serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, 181 }, 182 localities[1]: { 183 start: 80, 184 success: 40, 185 failure: 20, 186 serverData: map[string]float64{"net": 1, "disk": 2, "cpu": 3, "mem": 4}, 187 }, 188 } 189 wantStoreData = &Data{ 190 TotalDrops: 70, 191 Drops: map[string]uint64{ 192 dropCategories[0]: 30, 193 dropCategories[1]: 40, 194 }, 195 LocalityStats: map[string]LocalityData{ 196 localities[0]: { 197 RequestStats: RequestData{Succeeded: 20, Errored: 10, InProgress: 10}, 198 LoadStats: map[string]ServerLoadData{ 199 "net": {Count: 20, Sum: 20}, 200 "disk": {Count: 20, Sum: 40}, 201 "cpu": {Count: 20, Sum: 60}, 202 "mem": {Count: 20, Sum: 80}, 203 }, 204 }, 205 localities[1]: { 206 RequestStats: RequestData{Succeeded: 40, Errored: 20, InProgress: 20}, 207 LoadStats: map[string]ServerLoadData{ 208 "net": {Count: 40, Sum: 40}, 209 "disk": {Count: 40, Sum: 80}, 210 "cpu": {Count: 40, Sum: 120}, 211 "mem": {Count: 40, Sum: 160}, 212 }, 213 }, 214 }, 215 } 216 ) 217 218 reportLoad := func(ls *perClusterStore) { 219 for category, count := range drops { 220 for i := 0; i < count; i++ { 221 ls.CallDropped(category) 222 } 223 } 224 for locality, data := range localityData { 225 for i := 0; i < data.start; i++ { 226 ls.CallStarted(locality) 227 } 228 for i := 0; i < data.success; i++ { 229 ls.CallFinished(locality, nil) 230 for n, d := range data.serverData { 231 ls.CallServerLoad(locality, n, d) 232 } 233 } 234 for i := 0; i < data.failure; i++ { 235 ls.CallFinished(locality, errTest) 236 } 237 } 238 } 239 240 ls := perClusterStore{} 241 reportLoad(&ls) 242 gotStoreData := ls.stats() 243 if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { 244 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 245 } 246 247 // The above call to stats() should have reset all load reports except the 248 // inProgress rpc count. We are now going to push the same load data into 249 // the store. So, we should expect to see twice the count for inProgress. 250 for _, l := range localities { 251 ls := wantStoreData.LocalityStats[l] 252 ls.RequestStats.InProgress *= 2 253 wantStoreData.LocalityStats[l] = ls 254 } 255 reportLoad(&ls) 256 gotStoreData = ls.stats() 257 if diff := cmp.Diff(wantStoreData, gotStoreData, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval")); diff != "" { 258 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 259 } 260} 261 262var sortDataSlice = cmp.Transformer("SortDataSlice", func(in []*Data) []*Data { 263 out := append([]*Data(nil), in...) // Copy input to avoid mutating it 264 sort.Slice(out, 265 func(i, j int) bool { 266 if out[i].Cluster < out[j].Cluster { 267 return true 268 } 269 if out[i].Cluster == out[j].Cluster { 270 return out[i].Service < out[j].Service 271 } 272 return false 273 }, 274 ) 275 return out 276}) 277 278// Test all load are returned for the given clusters, and all clusters are 279// reported if no cluster is specified. 280func TestStoreStats(t *testing.T) { 281 var ( 282 testClusters = []string{"c0", "c1", "c2"} 283 testServices = []string{"s0", "s1"} 284 testLocality = "test-locality" 285 ) 286 287 store := NewStore() 288 for _, c := range testClusters { 289 for _, s := range testServices { 290 store.PerCluster(c, s).CallStarted(testLocality) 291 store.PerCluster(c, s).CallServerLoad(testLocality, "abc", 123) 292 store.PerCluster(c, s).CallDropped("dropped") 293 store.PerCluster(c, s).CallFinished(testLocality, nil) 294 } 295 } 296 297 wantC0 := []*Data{ 298 { 299 Cluster: "c0", Service: "s0", 300 TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, 301 LocalityStats: map[string]LocalityData{ 302 "test-locality": { 303 RequestStats: RequestData{Succeeded: 1}, 304 LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, 305 }, 306 }, 307 }, 308 { 309 Cluster: "c0", Service: "s1", 310 TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, 311 LocalityStats: map[string]LocalityData{ 312 "test-locality": { 313 RequestStats: RequestData{Succeeded: 1}, 314 LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, 315 }, 316 }, 317 }, 318 } 319 // Call Stats with just "c0", this should return data for "c0", and not 320 // touch data for other clusters. 321 gotC0 := store.Stats([]string{"c0"}) 322 if diff := cmp.Diff(wantC0, gotC0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { 323 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 324 } 325 326 wantOther := []*Data{ 327 { 328 Cluster: "c1", Service: "s0", 329 TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, 330 LocalityStats: map[string]LocalityData{ 331 "test-locality": { 332 RequestStats: RequestData{Succeeded: 1}, 333 LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, 334 }, 335 }, 336 }, 337 { 338 Cluster: "c1", Service: "s1", 339 TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, 340 LocalityStats: map[string]LocalityData{ 341 "test-locality": { 342 RequestStats: RequestData{Succeeded: 1}, 343 LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, 344 }, 345 }, 346 }, 347 { 348 Cluster: "c2", Service: "s0", 349 TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, 350 LocalityStats: map[string]LocalityData{ 351 "test-locality": { 352 RequestStats: RequestData{Succeeded: 1}, 353 LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, 354 }, 355 }, 356 }, 357 { 358 Cluster: "c2", Service: "s1", 359 TotalDrops: 1, Drops: map[string]uint64{"dropped": 1}, 360 LocalityStats: map[string]LocalityData{ 361 "test-locality": { 362 RequestStats: RequestData{Succeeded: 1}, 363 LoadStats: map[string]ServerLoadData{"abc": {Count: 1, Sum: 123}}, 364 }, 365 }, 366 }, 367 } 368 // Call Stats with empty slice, this should return data for all the 369 // remaining clusters, and not include c0 (because c0 data was cleared). 370 gotOther := store.Stats(nil) 371 if diff := cmp.Diff(wantOther, gotOther, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { 372 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 373 } 374} 375 376// Test the cases that if a cluster doesn't have load to report, its data is not 377// appended to the slice returned by Stats(). 378func TestStoreStatsEmptyDataNotReported(t *testing.T) { 379 var ( 380 testServices = []string{"s0", "s1"} 381 testLocality = "test-locality" 382 ) 383 384 store := NewStore() 385 // "c0"'s RPCs all finish with success. 386 for _, s := range testServices { 387 store.PerCluster("c0", s).CallStarted(testLocality) 388 store.PerCluster("c0", s).CallFinished(testLocality, nil) 389 } 390 // "c1"'s RPCs never finish (always inprocess). 391 for _, s := range testServices { 392 store.PerCluster("c1", s).CallStarted(testLocality) 393 } 394 395 want0 := []*Data{ 396 { 397 Cluster: "c0", Service: "s0", 398 LocalityStats: map[string]LocalityData{ 399 "test-locality": {RequestStats: RequestData{Succeeded: 1}}, 400 }, 401 }, 402 { 403 Cluster: "c0", Service: "s1", 404 LocalityStats: map[string]LocalityData{ 405 "test-locality": {RequestStats: RequestData{Succeeded: 1}}, 406 }, 407 }, 408 { 409 Cluster: "c1", Service: "s0", 410 LocalityStats: map[string]LocalityData{ 411 "test-locality": {RequestStats: RequestData{InProgress: 1}}, 412 }, 413 }, 414 { 415 Cluster: "c1", Service: "s1", 416 LocalityStats: map[string]LocalityData{ 417 "test-locality": {RequestStats: RequestData{InProgress: 1}}, 418 }, 419 }, 420 } 421 // Call Stats with empty slice, this should return data for all the 422 // clusters. 423 got0 := store.Stats(nil) 424 if diff := cmp.Diff(want0, got0, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { 425 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 426 } 427 428 want1 := []*Data{ 429 { 430 Cluster: "c1", Service: "s0", 431 LocalityStats: map[string]LocalityData{ 432 "test-locality": {RequestStats: RequestData{InProgress: 1}}, 433 }, 434 }, 435 { 436 Cluster: "c1", Service: "s1", 437 LocalityStats: map[string]LocalityData{ 438 "test-locality": {RequestStats: RequestData{InProgress: 1}}, 439 }, 440 }, 441 } 442 // Call Stats with empty slice again, this should return data only for "c1", 443 // because "c0" data was cleared, but "c1" has in-progress RPCs. 444 got1 := store.Stats(nil) 445 if diff := cmp.Diff(want1, got1, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(Data{}, "ReportInterval"), sortDataSlice); diff != "" { 446 t.Errorf("store.stats() returned unexpected diff (-want +got):\n%s", diff) 447 } 448} 449