1package health 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "strings" 8 "testing" 9 "time" 10 11 "github.com/google/go-cmp/cmp" 12 "github.com/hashicorp/go-hclog" 13 "github.com/hashicorp/go-uuid" 14 "github.com/stretchr/testify/require" 15 "google.golang.org/grpc/codes" 16 "google.golang.org/grpc/status" 17 18 "github.com/hashicorp/consul/agent/structs" 19 "github.com/hashicorp/consul/agent/submatview" 20 "github.com/hashicorp/consul/proto/pbcommon" 21 "github.com/hashicorp/consul/proto/pbservice" 22 "github.com/hashicorp/consul/proto/pbsubscribe" 23 "github.com/hashicorp/consul/types" 24) 25 26func TestSortCheckServiceNodes_OrderIsConsistentWithRPCResponse(t *testing.T) { 27 index := uint64(42) 28 buildTestNode := func(nodeName string, serviceID string) structs.CheckServiceNode { 29 newID, err := uuid.GenerateUUID() 30 require.NoError(t, err) 31 return structs.CheckServiceNode{ 32 Node: &structs.Node{ 33 ID: types.NodeID(strings.ToUpper(newID)), 34 Node: nodeName, 35 Address: nodeName, 36 Datacenter: "dc1", 37 RaftIndex: structs.RaftIndex{ 38 CreateIndex: index, 39 ModifyIndex: index, 40 }, 41 }, 42 Service: &structs.NodeService{ 43 ID: serviceID, 44 Service: "testService", 45 Port: 8080, 46 Weights: &structs.Weights{ 47 Passing: 1, 48 Warning: 1, 49 }, 50 RaftIndex: structs.RaftIndex{ 51 CreateIndex: index, 52 ModifyIndex: index, 53 }, 54 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 55 }, 56 Checks: []*structs.HealthCheck{}, 57 } 58 } 59 zero := buildTestNode("a-zero-node", "testService:1") 60 one := buildTestNode("node1", "testService:1") 61 two := buildTestNode("node1", "testService:2") 62 three := buildTestNode("node2", "testService") 63 result := structs.IndexedCheckServiceNodes{ 64 Nodes: structs.CheckServiceNodes{three, two, zero, one}, 65 QueryMeta: structs.QueryMeta{Index: index}, 66 } 67 sortCheckServiceNodes(&result) 68 expected := structs.CheckServiceNodes{zero, one, two, three} 69 require.Equal(t, expected, result.Nodes) 70} 71 72func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { 73 if testing.Short() { 74 t.Skip("too slow for testing.Short") 75 } 76 77 namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace) 78 streamClient := newStreamClient(validateNamespace(namespace)) 79 80 ctx, cancel := context.WithCancel(context.Background()) 81 defer cancel() 82 83 store := submatview.NewStore(hclog.New(nil)) 84 go store.Run(ctx) 85 86 // Initially there are no services registered. Server should send an 87 // EndOfSnapshot message immediately with index of 1. 88 streamClient.QueueEvents(newEndOfSnapshotEvent(1)) 89 90 req := serviceRequestStub{ 91 serviceRequest: serviceRequest{ 92 ServiceSpecificRequest: structs.ServiceSpecificRequest{ 93 Datacenter: "dc1", 94 ServiceName: "web", 95 EnterpriseMeta: structs.NewEnterpriseMeta(namespace), 96 QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, 97 }, 98 }, 99 streamClient: streamClient, 100 } 101 empty := &structs.IndexedCheckServiceNodes{ 102 Nodes: structs.CheckServiceNodes{}, 103 QueryMeta: structs.QueryMeta{ 104 Index: 1, 105 Backend: structs.QueryBackendStreaming, 106 }, 107 } 108 109 runStep(t, "empty snapshot returned", func(t *testing.T) { 110 result, err := store.Get(ctx, req) 111 require.NoError(t, err) 112 113 require.Equal(t, uint64(1), result.Index) 114 require.Equal(t, empty, result.Value) 115 116 req.QueryOptions.MinQueryIndex = result.Index 117 }) 118 119 runStep(t, "blocks for timeout", func(t *testing.T) { 120 // Subsequent fetch should block for the timeout 121 start := time.Now() 122 req.QueryOptions.MaxQueryTime = 200 * time.Millisecond 123 result, err := store.Get(ctx, req) 124 require.NoError(t, err) 125 elapsed := time.Since(start) 126 require.True(t, elapsed >= 200*time.Millisecond, 127 "Fetch should have blocked until timeout") 128 129 require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") 130 require.Equal(t, empty, result.Value, "result value should not have changed") 131 132 req.QueryOptions.MinQueryIndex = result.Index 133 }) 134 135 var lastResultValue structs.CheckServiceNodes 136 137 runStep(t, "blocks until update", func(t *testing.T) { 138 // Make another blocking query with a longer timeout and trigger an update 139 // event part way through. 140 start := time.Now() 141 go func() { 142 time.Sleep(200 * time.Millisecond) 143 streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) 144 }() 145 146 req.QueryOptions.MaxQueryTime = time.Second 147 result, err := store.Get(ctx, req) 148 require.NoError(t, err) 149 elapsed := time.Since(start) 150 require.True(t, elapsed >= 200*time.Millisecond, 151 "Fetch should have blocked until the event was delivered") 152 require.True(t, elapsed < time.Second, 153 "Fetch should have returned before the timeout") 154 155 require.Equal(t, uint64(4), result.Index, "result index should not have changed") 156 lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes 157 require.Len(t, lastResultValue, 1, 158 "result value should contain the new registration") 159 160 req.QueryOptions.MinQueryIndex = result.Index 161 }) 162 163 runStep(t, "reconnects and resumes after temporary error", func(t *testing.T) { 164 streamClient.QueueErr(tempError("broken pipe")) 165 166 // Next fetch will continue to block until timeout and receive the same 167 // result. 168 start := time.Now() 169 req.QueryOptions.MaxQueryTime = 200 * time.Millisecond 170 result, err := store.Get(ctx, req) 171 require.NoError(t, err) 172 elapsed := time.Since(start) 173 require.True(t, elapsed >= 200*time.Millisecond, 174 "Fetch should have blocked until timeout") 175 176 require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, 177 "result index should not have changed") 178 require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 179 "result value should not have changed") 180 181 req.QueryOptions.MinQueryIndex = result.Index 182 183 // But an update should still be noticed due to reconnection 184 streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) 185 186 start = time.Now() 187 req.QueryOptions.MaxQueryTime = time.Second 188 result, err = store.Get(ctx, req) 189 require.NoError(t, err) 190 elapsed = time.Since(start) 191 require.True(t, elapsed < time.Second, 192 "Fetch should have returned before the timeout") 193 194 require.Equal(t, uint64(10), result.Index, "result index should not have changed") 195 lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes 196 require.Len(t, lastResultValue, 2, 197 "result value should contain the new registration") 198 199 req.QueryOptions.MinQueryIndex = result.Index 200 }) 201 202 runStep(t, "returns non-temporary error to watchers", func(t *testing.T) { 203 // Wait and send the error while fetcher is waiting 204 go func() { 205 time.Sleep(200 * time.Millisecond) 206 streamClient.QueueErr(errors.New("invalid request")) 207 }() 208 209 // Next fetch should return the error 210 start := time.Now() 211 req.QueryOptions.MaxQueryTime = time.Second 212 result, err := store.Get(ctx, req) 213 require.Error(t, err) 214 elapsed := time.Since(start) 215 require.True(t, elapsed >= 200*time.Millisecond, 216 "Fetch should have blocked until error was sent") 217 require.True(t, elapsed < time.Second, 218 "Fetch should have returned before the timeout") 219 220 require.Equal(t, req.QueryOptions.MinQueryIndex, result.Index, "result index should not have changed") 221 require.Equal(t, lastResultValue, result.Value.(*structs.IndexedCheckServiceNodes).Nodes) 222 223 req.QueryOptions.MinQueryIndex = result.Index 224 225 // But an update should still be noticed due to reconnection 226 streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web")) 227 228 req.QueryOptions.MaxQueryTime = time.Second 229 result, err = store.Get(ctx, req) 230 require.NoError(t, err) 231 elapsed = time.Since(start) 232 require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") 233 234 require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed") 235 require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, 236 "result value should contain the new registration") 237 238 req.QueryOptions.MinQueryIndex = result.Index 239 }) 240} 241 242type tempError string 243 244func (e tempError) Error() string { 245 return string(e) 246} 247 248func (e tempError) Temporary() bool { 249 return true 250} 251 252func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { 253 if testing.Short() { 254 t.Skip("too slow for testing.Short") 255 } 256 257 namespace := getNamespace("ns2") 258 client := newStreamClient(validateNamespace(namespace)) 259 260 ctx, cancel := context.WithCancel(context.Background()) 261 defer cancel() 262 263 store := submatview.NewStore(hclog.New(nil)) 264 265 // Create an initial snapshot of 3 instances on different nodes 266 registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { 267 return newEventServiceHealthRegister(index, nodeNum, "web") 268 } 269 client.QueueEvents( 270 registerServiceWeb(5, 1), 271 registerServiceWeb(5, 2), 272 registerServiceWeb(5, 3), 273 newEndOfSnapshotEvent(5)) 274 275 req := serviceRequestStub{ 276 serviceRequest: serviceRequest{ 277 ServiceSpecificRequest: structs.ServiceSpecificRequest{ 278 Datacenter: "dc1", 279 ServiceName: "web", 280 EnterpriseMeta: structs.NewEnterpriseMeta(namespace), 281 QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, 282 }, 283 }, 284 streamClient: client, 285 } 286 287 runStep(t, "full snapshot returned", func(t *testing.T) { 288 result, err := store.Get(ctx, req) 289 require.NoError(t, err) 290 291 require.Equal(t, uint64(5), result.Index) 292 expected := newExpectedNodes("node1", "node2", "node3") 293 expected.Index = 5 294 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 295 296 req.QueryOptions.MinQueryIndex = result.Index 297 }) 298 299 runStep(t, "blocks until deregistration", func(t *testing.T) { 300 // Make another blocking query with a longer timeout and trigger an update 301 // event part way through. 302 start := time.Now() 303 go func() { 304 time.Sleep(200 * time.Millisecond) 305 306 // Deregister instance on node1 307 client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web")) 308 }() 309 310 req.QueryOptions.MaxQueryTime = time.Second 311 result, err := store.Get(ctx, req) 312 require.NoError(t, err) 313 elapsed := time.Since(start) 314 require.True(t, elapsed >= 200*time.Millisecond, 315 "Fetch should have blocked until the event was delivered") 316 require.True(t, elapsed < time.Second, 317 "Fetch should have returned before the timeout") 318 319 require.Equal(t, uint64(20), result.Index) 320 expected := newExpectedNodes("node2", "node3") 321 expected.Index = 20 322 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 323 324 req.QueryOptions.MinQueryIndex = result.Index 325 }) 326 327 runStep(t, "server reload is respected", func(t *testing.T) { 328 // Simulates the server noticing the request's ACL token privs changing. To 329 // detect this we'll queue up the new snapshot as a different set of nodes 330 // to the first. 331 client.QueueErr(status.Error(codes.Aborted, "reset by server")) 332 333 client.QueueEvents( 334 registerServiceWeb(50, 3), // overlap existing node 335 registerServiceWeb(50, 4), 336 registerServiceWeb(50, 5), 337 newEndOfSnapshotEvent(50)) 338 339 // Make another blocking query with THE SAME index. It should immediately 340 // return the new snapshot. 341 start := time.Now() 342 req.QueryOptions.MaxQueryTime = time.Second 343 result, err := store.Get(ctx, req) 344 require.NoError(t, err) 345 elapsed := time.Since(start) 346 require.True(t, elapsed < time.Second, 347 "Fetch should have returned before the timeout") 348 349 require.Equal(t, uint64(50), result.Index) 350 expected := newExpectedNodes("node3", "node4", "node5") 351 expected.Index = 50 352 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 353 354 req.QueryOptions.MinQueryIndex = result.Index 355 }) 356 357 runStep(t, "reconnects and receives new snapshot when server state has changed", func(t *testing.T) { 358 client.QueueErr(tempError("temporary connection error")) 359 360 client.QueueEvents( 361 newNewSnapshotToFollowEvent(), 362 registerServiceWeb(50, 3), // overlap existing node 363 registerServiceWeb(50, 4), 364 registerServiceWeb(50, 5), 365 newEndOfSnapshotEvent(50)) 366 367 start := time.Now() 368 req.QueryOptions.MinQueryIndex = 49 369 req.QueryOptions.MaxQueryTime = time.Second 370 result, err := store.Get(ctx, req) 371 require.NoError(t, err) 372 elapsed := time.Since(start) 373 require.True(t, elapsed < time.Second, 374 "Fetch should have returned before the timeout") 375 376 require.Equal(t, uint64(50), result.Index) 377 expected := newExpectedNodes("node3", "node4", "node5") 378 expected.Index = 50 379 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 380 }) 381} 382 383func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { 384 result := &structs.IndexedCheckServiceNodes{} 385 result.QueryMeta.Backend = structs.QueryBackendStreaming 386 for _, node := range nodes { 387 result.Nodes = append(result.Nodes, structs.CheckServiceNode{ 388 Node: &structs.Node{Node: node}, 389 }) 390 } 391 return result 392} 393 394// cmpCheckServiceNodeNames does a shallow comparison of structs.CheckServiceNode 395// by Node name. 396var cmpCheckServiceNodeNames = cmp.Options{ 397 cmp.Comparer(func(x, y structs.CheckServiceNode) bool { 398 return x.Node.Node == y.Node.Node 399 }), 400} 401 402func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { 403 t.Helper() 404 if diff := cmp.Diff(x, y, opts...); diff != "" { 405 t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) 406 } 407} 408 409func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { 410 namespace := getNamespace("ns3") 411 client := newStreamClient(validateNamespace(namespace)) 412 413 ctx, cancel := context.WithCancel(context.Background()) 414 defer cancel() 415 416 store := submatview.NewStore(hclog.New(nil)) 417 418 // Create an initial snapshot of 3 instances but in a single event batch 419 batchEv := newEventBatchWithEvents( 420 newEventServiceHealthRegister(5, 1, "web"), 421 newEventServiceHealthRegister(5, 2, "web"), 422 newEventServiceHealthRegister(5, 3, "web")) 423 client.QueueEvents( 424 batchEv, 425 newEndOfSnapshotEvent(5)) 426 427 req := serviceRequestStub{ 428 serviceRequest: serviceRequest{ 429 ServiceSpecificRequest: structs.ServiceSpecificRequest{ 430 Datacenter: "dc1", 431 ServiceName: "web", 432 EnterpriseMeta: structs.NewEnterpriseMeta(namespace), 433 QueryOptions: structs.QueryOptions{MaxQueryTime: time.Second}, 434 }, 435 }, 436 streamClient: client, 437 } 438 439 runStep(t, "full snapshot returned", func(t *testing.T) { 440 result, err := store.Get(ctx, req) 441 require.NoError(t, err) 442 443 require.Equal(t, uint64(5), result.Index) 444 445 expected := newExpectedNodes("node1", "node2", "node3") 446 expected.Index = 5 447 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 448 req.QueryOptions.MinQueryIndex = result.Index 449 }) 450 451 runStep(t, "batched updates work too", func(t *testing.T) { 452 // Simulate multiple registrations happening in one Txn (so all have same 453 // index) 454 batchEv := newEventBatchWithEvents( 455 // Deregister an existing node 456 newEventServiceHealthDeregister(20, 1, "web"), 457 // Register another 458 newEventServiceHealthRegister(20, 4, "web"), 459 ) 460 client.QueueEvents(batchEv) 461 req.QueryOptions.MaxQueryTime = time.Second 462 result, err := store.Get(ctx, req) 463 require.NoError(t, err) 464 465 require.Equal(t, uint64(20), result.Index) 466 expected := newExpectedNodes("node2", "node3", "node4") 467 expected.Index = 20 468 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 469 470 req.QueryOptions.MinQueryIndex = result.Index 471 }) 472} 473 474func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { 475 namespace := getNamespace("ns3") 476 streamClient := newStreamClient(validateNamespace(namespace)) 477 478 ctx, cancel := context.WithCancel(context.Background()) 479 defer cancel() 480 481 store := submatview.NewStore(hclog.New(nil)) 482 go store.Run(ctx) 483 484 req := serviceRequestStub{ 485 serviceRequest: serviceRequest{ 486 ServiceSpecificRequest: structs.ServiceSpecificRequest{ 487 Datacenter: "dc1", 488 ServiceName: "web", 489 EnterpriseMeta: structs.NewEnterpriseMeta(namespace), 490 QueryOptions: structs.QueryOptions{ 491 Filter: `Node.Node == "node2"`, 492 MaxQueryTime: time.Second, 493 }, 494 }, 495 }, 496 streamClient: streamClient, 497 } 498 499 // Create an initial snapshot of 3 instances but in a single event batch 500 batchEv := newEventBatchWithEvents( 501 newEventServiceHealthRegister(5, 1, "web"), 502 newEventServiceHealthRegister(5, 2, "web"), 503 newEventServiceHealthRegister(5, 3, "web")) 504 streamClient.QueueEvents( 505 batchEv, 506 newEndOfSnapshotEvent(5)) 507 508 runStep(t, "filtered snapshot returned", func(t *testing.T) { 509 result, err := store.Get(ctx, req) 510 require.NoError(t, err) 511 512 require.Equal(t, uint64(5), result.Index) 513 expected := newExpectedNodes("node2") 514 expected.Index = 5 515 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 516 517 req.QueryOptions.MinQueryIndex = result.Index 518 }) 519 520 runStep(t, "filtered updates work too", func(t *testing.T) { 521 // Simulate multiple registrations happening in one Txn (all have same index) 522 batchEv := newEventBatchWithEvents( 523 // Deregister an existing node 524 newEventServiceHealthDeregister(20, 1, "web"), 525 // Register another 526 newEventServiceHealthRegister(20, 4, "web"), 527 ) 528 streamClient.QueueEvents(batchEv) 529 result, err := store.Get(ctx, req) 530 require.NoError(t, err) 531 532 require.Equal(t, uint64(20), result.Index) 533 expected := newExpectedNodes("node2") 534 expected.Index = 20 535 assertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) 536 }) 537} 538 539// serviceRequestStub overrides NewMaterializer so that test can use a fake 540// StreamClient. 541type serviceRequestStub struct { 542 serviceRequest 543 streamClient submatview.StreamClient 544} 545 546func (r serviceRequestStub) NewMaterializer() (*submatview.Materializer, error) { 547 view, err := newHealthView(r.ServiceSpecificRequest) 548 if err != nil { 549 return nil, err 550 } 551 return submatview.NewMaterializer(submatview.Deps{ 552 View: view, 553 Client: r.streamClient, 554 Logger: hclog.New(nil), 555 Request: newMaterializerRequest(r.ServiceSpecificRequest), 556 }), nil 557} 558 559func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { 560 node := fmt.Sprintf("node%d", nodeNum) 561 nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) 562 addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) 563 564 return &pbsubscribe.Event{ 565 Index: index, 566 Payload: &pbsubscribe.Event_ServiceHealth{ 567 ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ 568 Op: pbsubscribe.CatalogOp_Register, 569 CheckServiceNode: &pbservice.CheckServiceNode{ 570 Node: &pbservice.Node{ 571 ID: nodeID, 572 Node: node, 573 Address: addr, 574 Datacenter: "dc1", 575 RaftIndex: pbcommon.RaftIndex{ 576 CreateIndex: index, 577 ModifyIndex: index, 578 }, 579 }, 580 Service: &pbservice.NodeService{ 581 ID: svc, 582 Service: svc, 583 Port: 8080, 584 RaftIndex: pbcommon.RaftIndex{ 585 CreateIndex: index, 586 ModifyIndex: index, 587 }, 588 }, 589 }, 590 }, 591 }, 592 } 593} 594 595func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { 596 node := fmt.Sprintf("node%d", nodeNum) 597 598 return &pbsubscribe.Event{ 599 Index: index, 600 Payload: &pbsubscribe.Event_ServiceHealth{ 601 ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ 602 Op: pbsubscribe.CatalogOp_Deregister, 603 CheckServiceNode: &pbservice.CheckServiceNode{ 604 Node: &pbservice.Node{ 605 Node: node, 606 }, 607 Service: &pbservice.NodeService{ 608 ID: svc, 609 Service: svc, 610 Port: 8080, 611 Weights: &pbservice.Weights{ 612 Passing: 1, 613 Warning: 1, 614 }, 615 RaftIndex: pbcommon.RaftIndex{ 616 // The original insertion index since a delete doesn't update 617 // this. This magic value came from state store tests where we 618 // setup at index 10 and then mutate at index 100. It can be 619 // modified by the caller later and makes it easier than having 620 // yet another argument in the common case. 621 CreateIndex: 10, 622 ModifyIndex: 10, 623 }, 624 }, 625 }, 626 }, 627 }, 628 } 629} 630 631func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event { 632 events := make([]*pbsubscribe.Event, len(evs)+1) 633 events[0] = first 634 for i := range evs { 635 events[i+1] = evs[i] 636 } 637 return &pbsubscribe.Event{ 638 Index: first.Index, 639 Payload: &pbsubscribe.Event_EventBatch{ 640 EventBatch: &pbsubscribe.EventBatch{Events: events}, 641 }, 642 } 643} 644 645func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { 646 return &pbsubscribe.Event{ 647 Index: index, 648 Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, 649 } 650} 651 652func newNewSnapshotToFollowEvent() *pbsubscribe.Event { 653 return &pbsubscribe.Event{ 654 Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, 655 } 656} 657 658// getNamespace returns a namespace if namespace support exists, otherwise 659// returns the empty string. It allows the same tests to work in both oss and ent 660// without duplicating the tests. 661func getNamespace(ns string) string { 662 meta := structs.NewEnterpriseMeta(ns) 663 return meta.NamespaceOrEmpty() 664} 665 666func validateNamespace(ns string) func(request *pbsubscribe.SubscribeRequest) error { 667 return func(request *pbsubscribe.SubscribeRequest) error { 668 if request.Namespace != ns { 669 return fmt.Errorf("expected request.Namespace %v, got %v", ns, request.Namespace) 670 } 671 return nil 672 } 673} 674 675func runStep(t *testing.T, name string, fn func(t *testing.T)) { 676 t.Helper() 677 if !t.Run(name, fn) { 678 t.FailNow() 679 } 680} 681 682func TestNewFilterEvaluator(t *testing.T) { 683 type testCase struct { 684 name string 685 req structs.ServiceSpecificRequest 686 data structs.CheckServiceNode 687 expected bool 688 } 689 690 fn := func(t *testing.T, tc testCase) { 691 e, err := newFilterEvaluator(tc.req) 692 require.NoError(t, err) 693 actual, err := e.Evaluate(tc.data) 694 require.NoError(t, err) 695 require.Equal(t, tc.expected, actual) 696 } 697 698 var testCases = []testCase{ 699 { 700 name: "single ServiceTags match", 701 req: structs.ServiceSpecificRequest{ 702 ServiceTags: []string{"match"}, 703 TagFilter: true, 704 }, 705 data: structs.CheckServiceNode{ 706 Service: &structs.NodeService{ 707 Tags: []string{"extra", "match"}, 708 }, 709 }, 710 expected: true, 711 }, 712 { 713 name: "single deprecated ServiceTag match", 714 req: structs.ServiceSpecificRequest{ 715 ServiceTag: "match", 716 TagFilter: true, 717 }, 718 data: structs.CheckServiceNode{ 719 Service: &structs.NodeService{ 720 Tags: []string{"extra", "match"}, 721 }, 722 }, 723 expected: true, 724 }, 725 { 726 name: "single ServiceTags mismatch", 727 req: structs.ServiceSpecificRequest{ 728 ServiceTags: []string{"other"}, 729 TagFilter: true, 730 }, 731 data: structs.CheckServiceNode{ 732 Service: &structs.NodeService{ 733 Tags: []string{"extra", "match"}, 734 }, 735 }, 736 expected: false, 737 }, 738 { 739 name: "multiple ServiceTags match", 740 req: structs.ServiceSpecificRequest{ 741 ServiceTags: []string{"match", "second"}, 742 TagFilter: true, 743 }, 744 data: structs.CheckServiceNode{ 745 Service: &structs.NodeService{ 746 Tags: []string{"extra", "match", "second"}, 747 }, 748 }, 749 expected: true, 750 }, 751 { 752 name: "multiple ServiceTags mismatch", 753 req: structs.ServiceSpecificRequest{ 754 ServiceTags: []string{"match", "not"}, 755 TagFilter: true, 756 }, 757 data: structs.CheckServiceNode{ 758 Service: &structs.NodeService{ 759 Tags: []string{"extra", "match"}, 760 }, 761 }, 762 expected: false, 763 }, 764 { 765 name: "single NodeMetaFilter match", 766 req: structs.ServiceSpecificRequest{ 767 NodeMetaFilters: map[string]string{"meta1": "match"}, 768 }, 769 data: structs.CheckServiceNode{ 770 Node: &structs.Node{ 771 Meta: map[string]string{ 772 "meta1": "match", 773 "extra": "some", 774 }, 775 }, 776 }, 777 expected: true, 778 }, 779 { 780 name: "single NodeMetaFilter mismatch", 781 req: structs.ServiceSpecificRequest{ 782 NodeMetaFilters: map[string]string{ 783 "meta1": "match", 784 }, 785 }, 786 data: structs.CheckServiceNode{ 787 Node: &structs.Node{ 788 Meta: map[string]string{ 789 "meta1": "other", 790 "extra": "some", 791 }, 792 }, 793 }, 794 expected: false, 795 }, 796 { 797 name: "multiple NodeMetaFilter match", 798 req: structs.ServiceSpecificRequest{ 799 NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"}, 800 }, 801 data: structs.CheckServiceNode{ 802 Node: &structs.Node{ 803 Meta: map[string]string{ 804 "meta1": "match", 805 "meta2": "a", 806 "extra": "some", 807 }, 808 }, 809 }, 810 expected: true, 811 }, 812 { 813 name: "multiple NodeMetaFilter mismatch", 814 req: structs.ServiceSpecificRequest{ 815 NodeMetaFilters: map[string]string{ 816 "meta1": "match", 817 "meta2": "beta", 818 }, 819 }, 820 data: structs.CheckServiceNode{ 821 Node: &structs.Node{ 822 Meta: map[string]string{ 823 "meta1": "other", 824 "meta2": "gamma", 825 }, 826 }, 827 }, 828 expected: false, 829 }, 830 { 831 name: "QueryOptions.Filter match", 832 req: structs.ServiceSpecificRequest{ 833 QueryOptions: structs.QueryOptions{ 834 Filter: `Node.Node == "node3"`, 835 }, 836 }, 837 data: structs.CheckServiceNode{ 838 Node: &structs.Node{Node: "node3"}, 839 }, 840 expected: true, 841 }, 842 { 843 name: "QueryOptions.Filter mismatch", 844 req: structs.ServiceSpecificRequest{ 845 QueryOptions: structs.QueryOptions{ 846 Filter: `Node.Node == "node2"`, 847 }, 848 }, 849 data: structs.CheckServiceNode{ 850 Node: &structs.Node{Node: "node3"}, 851 }, 852 expected: false, 853 }, 854 { 855 name: "all match", 856 req: structs.ServiceSpecificRequest{ 857 QueryOptions: structs.QueryOptions{ 858 Filter: `Node.Node == "node3"`, 859 }, 860 ServiceTags: []string{"tag1", "tag2"}, 861 NodeMetaFilters: map[string]string{ 862 "meta1": "match1", 863 "meta2": "match2", 864 }, 865 }, 866 data: structs.CheckServiceNode{ 867 Node: &structs.Node{ 868 Node: "node3", 869 Meta: map[string]string{ 870 "meta1": "match1", 871 "meta2": "match2", 872 "extra": "other", 873 }, 874 }, 875 Service: &structs.NodeService{ 876 Tags: []string{"tag1", "tag2", "extra"}, 877 }, 878 }, 879 expected: true, 880 }, 881 } 882 883 for _, tc := range testCases { 884 t.Run(tc.name, func(t *testing.T) { 885 fn(t, tc) 886 }) 887 } 888} 889